try-rustfs/managetoken.py

169 lines
5.1 KiB
Python

#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["requests"]
# ///
import datetime as dt
import hashlib
import hmac
import json
import urllib.parse as urlparse
import requests
# ====== CONFIG ======
ENDPOINT = "http://localhost:9000"
ACCESS_KEY = "rustfsadmin"
SECRET_KEY = "rustfsadmin"
SESSION_TOKEN = None # set to your x-amz-security-token if using STS, else leave None
REGION = "us-east-1"
SERVICE = "s3"
# The admin call you shared:
path = "/rustfs/admin/v3/add-canned-policy"
query_params = {"name": "testmepy"}
payload = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ObjectActions",
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": [
"arn:aws:s3:::k8s-pages/dropper/*",
"arn:aws:s3:::k8s-pages/dropper-dev/*",
"arn:aws:s3:::dropper/*",
"arn:aws:s3:::dropper-dev/*",
"arn:aws:s3:::k8s-pages/dropper",
"arn:aws:s3:::k8s-pages/dropper-dev",
"arn:aws:s3:::dropper",
"arn:aws:s3:::dropper-dev",
],
}
],
}
# =====================
def amz_date_now():
now = dt.datetime.utcnow()
return now.strftime("%Y%m%dT%H%M%SZ"), now.strftime("%Y%m%d")
def sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def get_signing_key(secret_key: str, datestamp: str, region: str, service: str) -> bytes:
k_date = hmac_sha256(("AWS4" + secret_key).encode("utf-8"), datestamp)
k_region = hmac_sha256(k_date, region)
k_service = hmac_sha256(k_region, service)
k_signing = hmac_sha256(k_service, "aws4_request")
return k_signing
def canonical_query_string(qs: dict) -> str:
# AWS requires query params sorted by key, then value; values URL-encoded
parts = []
for k in sorted(qs.keys()):
v = qs[k]
parts.append(
urlparse.quote(str(k), safe="-_.~") + "=" + urlparse.quote(str(v), safe="-_.~")
)
return "&".join(parts)
def sign_request(method: str, url: str, host_header: str, params: dict, body: bytes,
access_key: str, secret_key: str, region: str, service: str, session_token: str | None):
amz_datetime, datestamp = amz_date_now()
canonical_uri = urlparse.quote(urlparse.urlparse(url).path or "/", safe="/-_.~")
canonical_qs = canonical_query_string(params or {})
# We intentionally use UNSIGNED-PAYLOAD to match your curl
payload_hash = "UNSIGNED-PAYLOAD"
# Required headers (and only these will be signed)
headers = {
"host": host_header,
"x-amz-content-sha256": payload_hash,
"x-amz-date": amz_datetime,
}
if session_token:
headers["x-amz-security-token"] = session_token
signed_headers_list = ["host", "x-amz-content-sha256", "x-amz-date"]
if session_token:
signed_headers_list.append("x-amz-security-token")
signed_headers = ";".join(signed_headers_list)
canonical_headers = "".join(f"{h}:{headers[h]}\n" for h in signed_headers_list)
canonical_request = "\n".join(
[
method,
canonical_uri,
canonical_qs,
canonical_headers,
signed_headers,
payload_hash,
]
)
algorithm = "AWS4-HMAC-SHA256"
credential_scope = f"{datestamp}/{region}/{service}/aws4_request"
string_to_sign = "\n".join(
[
algorithm,
amz_datetime,
credential_scope,
sha256_hex(canonical_request.encode("utf-8")),
]
)
signing_key = get_signing_key(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
authorization = (
f"{algorithm} Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)
# Build final headers for the HTTP request
final_headers = {
"Authorization": authorization,
"x-amz-date": amz_datetime,
"x-amz-content-sha256": payload_hash,
"Host": host_header,
"Content-Type": "application/json",
}
if session_token:
final_headers["x-amz-security-token"] = session_token
return final_headers
def main():
full_url = ENDPOINT.rstrip("/") + path
host_header = urlparse.urlparse(ENDPOINT).netloc or "localhost:9000"
body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8")
headers = sign_request(
method="PUT",
url=full_url,
host_header=host_header,
params=query_params,
body=body_bytes,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
region=REGION,
service=SERVICE,
session_token=SESSION_TOKEN,
)
r = requests.put(full_url, params=query_params, data=body_bytes, headers=headers)
print("Status:", r.status_code)
print("Response:", r.text)
if __name__ == "__main__":
main()