#!/usr/bin/env -S uv run --quiet --script # /// script # requires-python = ">=3.12" # dependencies = ["requests"] # /// import datetime as dt import hashlib import hmac import json import urllib.parse as urlparse import requests # ====== CONFIG ====== ENDPOINT = "http://localhost:9000" ACCESS_KEY = "rustfsadmin" SECRET_KEY = "rustfsadmin" SESSION_TOKEN = None # set to your x-amz-security-token if using STS, else leave None REGION = "us-east-1" SERVICE = "s3" # The admin call you shared: path = "/rustfs/admin/v3/add-canned-policy" query_params = {"name": "testmepy"} payload = { "Version": "2012-10-17", "Statement": [ { "Sid": "ObjectActions", "Effect": "Allow", "Action": ["s3:*"], "Resource": [ "arn:aws:s3:::k8s-pages/dropper/*", "arn:aws:s3:::k8s-pages/dropper-dev/*", "arn:aws:s3:::dropper/*", "arn:aws:s3:::dropper-dev/*", "arn:aws:s3:::k8s-pages/dropper", "arn:aws:s3:::k8s-pages/dropper-dev", "arn:aws:s3:::dropper", "arn:aws:s3:::dropper-dev", ], } ], } # ===================== def amz_date_now(): now = dt.datetime.utcnow() return now.strftime("%Y%m%dT%H%M%SZ"), now.strftime("%Y%m%d") def sha256_hex(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def hmac_sha256(key: bytes, msg: str) -> bytes: return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() def get_signing_key(secret_key: str, datestamp: str, region: str, service: str) -> bytes: k_date = hmac_sha256(("AWS4" + secret_key).encode("utf-8"), datestamp) k_region = hmac_sha256(k_date, region) k_service = hmac_sha256(k_region, service) k_signing = hmac_sha256(k_service, "aws4_request") return k_signing def canonical_query_string(qs: dict) -> str: # AWS requires query params sorted by key, then value; values URL-encoded parts = [] for k in sorted(qs.keys()): v = qs[k] parts.append( urlparse.quote(str(k), safe="-_.~") + "=" + urlparse.quote(str(v), safe="-_.~") ) return "&".join(parts) def sign_request(method: str, url: str, host_header: str, params: dict, body: bytes, access_key: str, secret_key: str, region: str, service: str, session_token: str | None): amz_datetime, datestamp = amz_date_now() canonical_uri = urlparse.quote(urlparse.urlparse(url).path or "/", safe="/-_.~") canonical_qs = canonical_query_string(params or {}) # We intentionally use UNSIGNED-PAYLOAD to match your curl payload_hash = "UNSIGNED-PAYLOAD" # Required headers (and only these will be signed) headers = { "host": host_header, "x-amz-content-sha256": payload_hash, "x-amz-date": amz_datetime, } if session_token: headers["x-amz-security-token"] = session_token signed_headers_list = ["host", "x-amz-content-sha256", "x-amz-date"] if session_token: signed_headers_list.append("x-amz-security-token") signed_headers = ";".join(signed_headers_list) canonical_headers = "".join(f"{h}:{headers[h]}\n" for h in signed_headers_list) canonical_request = "\n".join( [ method, canonical_uri, canonical_qs, canonical_headers, signed_headers, payload_hash, ] ) algorithm = "AWS4-HMAC-SHA256" credential_scope = f"{datestamp}/{region}/{service}/aws4_request" string_to_sign = "\n".join( [ algorithm, amz_datetime, credential_scope, sha256_hex(canonical_request.encode("utf-8")), ] ) signing_key = get_signing_key(secret_key, datestamp, region, service) signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() authorization = ( f"{algorithm} Credential={access_key}/{credential_scope}, " f"SignedHeaders={signed_headers}, Signature={signature}" ) # Build final headers for the HTTP request final_headers = { "Authorization": authorization, "x-amz-date": amz_datetime, "x-amz-content-sha256": payload_hash, "Host": host_header, "Content-Type": "application/json", } if session_token: final_headers["x-amz-security-token"] = session_token return final_headers def main(): full_url = ENDPOINT.rstrip("/") + path host_header = urlparse.urlparse(ENDPOINT).netloc or "localhost:9000" body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") headers = sign_request( method="PUT", url=full_url, host_header=host_header, params=query_params, body=body_bytes, access_key=ACCESS_KEY, secret_key=SECRET_KEY, region=REGION, service=SERVICE, session_token=SESSION_TOKEN, ) r = requests.put(full_url, params=query_params, data=body_bytes, headers=headers) print("Status:", r.status_code) print("Response:", r.text) if __name__ == "__main__": main()