init managetoken

This commit is contained in:
Waylon Walker 2025-11-03 08:35:09 -06:00
parent a75fc49d9f
commit 983a9594e9
3 changed files with 194 additions and 1 deletions

View file

@ -1,6 +1,15 @@
# try-rustfs # try-rustfs
Looking for a minio alternative and trying out rustfs Looking for a minio alternative and trying out rustfs. Currently all the
operations are documented in the justfile.
Startup time is insane... I also checked minio, and it is also crazy fast, I
did not realize how fast these things can start up.
``` bash
just start list-buckets list-dropper 0.81s user 0.29s system 73% cpu 1.494 total
just list-buckets list-dropper 0.67s user 0.22s system 74% cpu 1.203 total
```
## .env ## .env
@ -9,3 +18,9 @@ this would be a bad idea for anything connected to the internet. This only
ever only ever ran on my local system. ever only ever ran on my local system.
**never** commit real secrets to your repo. **never** commit real secrets to your repo.
## Token Management
I ran into issues using the `mcli` to manage tokens. I'm working on creating
something based on a request in the console. Gippity created managetoken.py
for me based on the request.

View file

@ -5,6 +5,9 @@ start:
mkdir -p logs mkdir -p logs
podman run -d -p 9000:9000 -v $(pwd)/data:/data -v $(pwd)/logs:/logs docker.io/rustfs/rustfs:alpha podman run -d -p 9000:9000 -v $(pwd)/data:/data -v $(pwd)/logs:/logs docker.io/rustfs/rustfs:alpha
start-minio:
podman run -d -p 9000:9000 -p 9001:9001 -v $(pwd)/data:/data -v $(pwd)/logs:/logs docker.io/minio/minio server /data
make-bucket: make-bucket:
#!/bin/bash #!/bin/bash
set +e set +e
@ -30,3 +33,9 @@ post-presigned-url:
echo $URL echo $URL
curl -X PUT -T hello.txt $URL curl -X PUT -T hello.txt $URL
ls:
aws s3 ls
aws s3 ls s3://dropper
createtoken:
./managetoken.py

169
managetoken.py Normal file
View file

@ -0,0 +1,169 @@
#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["requests"]
# ///
import datetime as dt
import hashlib
import hmac
import json
import urllib.parse as urlparse
import requests
# ====== CONFIG ======
ENDPOINT = "http://localhost:9000"
ACCESS_KEY = "rustfsadmin"
SECRET_KEY = "rustfsadmin"
SESSION_TOKEN = None # set to your x-amz-security-token if using STS, else leave None
REGION = "us-east-1"
SERVICE = "s3"
# The admin call you shared:
path = "/rustfs/admin/v3/add-canned-policy"
query_params = {"name": "testmepy"}
payload = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ObjectActions",
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": [
"arn:aws:s3:::k8s-pages/dropper/*",
"arn:aws:s3:::k8s-pages/dropper-dev/*",
"arn:aws:s3:::dropper/*",
"arn:aws:s3:::dropper-dev/*",
"arn:aws:s3:::k8s-pages/dropper",
"arn:aws:s3:::k8s-pages/dropper-dev",
"arn:aws:s3:::dropper",
"arn:aws:s3:::dropper-dev",
],
}
],
}
# =====================
def amz_date_now():
now = dt.datetime.utcnow()
return now.strftime("%Y%m%dT%H%M%SZ"), now.strftime("%Y%m%d")
def sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def get_signing_key(secret_key: str, datestamp: str, region: str, service: str) -> bytes:
k_date = hmac_sha256(("AWS4" + secret_key).encode("utf-8"), datestamp)
k_region = hmac_sha256(k_date, region)
k_service = hmac_sha256(k_region, service)
k_signing = hmac_sha256(k_service, "aws4_request")
return k_signing
def canonical_query_string(qs: dict) -> str:
# AWS requires query params sorted by key, then value; values URL-encoded
parts = []
for k in sorted(qs.keys()):
v = qs[k]
parts.append(
urlparse.quote(str(k), safe="-_.~") + "=" + urlparse.quote(str(v), safe="-_.~")
)
return "&".join(parts)
def sign_request(method: str, url: str, host_header: str, params: dict, body: bytes,
access_key: str, secret_key: str, region: str, service: str, session_token: str | None):
amz_datetime, datestamp = amz_date_now()
canonical_uri = urlparse.quote(urlparse.urlparse(url).path or "/", safe="/-_.~")
canonical_qs = canonical_query_string(params or {})
# We intentionally use UNSIGNED-PAYLOAD to match your curl
payload_hash = "UNSIGNED-PAYLOAD"
# Required headers (and only these will be signed)
headers = {
"host": host_header,
"x-amz-content-sha256": payload_hash,
"x-amz-date": amz_datetime,
}
if session_token:
headers["x-amz-security-token"] = session_token
signed_headers_list = ["host", "x-amz-content-sha256", "x-amz-date"]
if session_token:
signed_headers_list.append("x-amz-security-token")
signed_headers = ";".join(signed_headers_list)
canonical_headers = "".join(f"{h}:{headers[h]}\n" for h in signed_headers_list)
canonical_request = "\n".join(
[
method,
canonical_uri,
canonical_qs,
canonical_headers,
signed_headers,
payload_hash,
]
)
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{datestamp}/{region}/{service}/aws4_request"
string_to_sign = "\n".join(
[
algorithm,
amz_datetime,
credential_scope,
sha256_hex(canonical_request.encode("utf-8")),
]
)
signing_key = get_signing_key(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
authorization = (
f"{algorithm} Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)
# Build final headers for the HTTP request
final_headers = {
"Authorization": authorization,
"x-amz-date": amz_datetime,
"x-amz-content-sha256": payload_hash,
"Host": host_header,
"Content-Type": "application/json",
}
if session_token:
final_headers["x-amz-security-token"] = session_token
return final_headers
def main():
full_url = ENDPOINT.rstrip("/") + path
host_header = urlparse.urlparse(ENDPOINT).netloc or "localhost:9000"
body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8")
headers = sign_request(
method="PUT",
url=full_url,
host_header=host_header,
params=query_params,
body=body_bytes,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
region=REGION,
service=SERVICE,
session_token=SESSION_TOKEN,
)
r = requests.put(full_url, params=query_params, data=body_bytes, headers=headers)
print("Status:", r.status_code)
print("Response:", r.text)
if __name__ == "__main__":
main()