169 lines
5.1 KiB
Python
169 lines
5.1 KiB
Python
#!/usr/bin/env -S uv run --quiet --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = ["requests"]
|
|
# ///
|
|
|
|
import datetime as dt
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import urllib.parse as urlparse
|
|
|
|
import requests
|
|
|
|
# ====== CONFIG ======
|
|
ENDPOINT = "http://localhost:9000"
|
|
ACCESS_KEY = "rustfsadmin"
|
|
SECRET_KEY = "rustfsadmin"
|
|
SESSION_TOKEN = None # set to your x-amz-security-token if using STS, else leave None
|
|
REGION = "us-east-1"
|
|
SERVICE = "s3"
|
|
|
|
# The admin call you shared:
|
|
path = "/rustfs/admin/v3/add-canned-policy"
|
|
query_params = {"name": "testmepy"}
|
|
payload = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Sid": "ObjectActions",
|
|
"Effect": "Allow",
|
|
"Action": ["s3:*"],
|
|
"Resource": [
|
|
"arn:aws:s3:::k8s-pages/dropper/*",
|
|
"arn:aws:s3:::k8s-pages/dropper-dev/*",
|
|
"arn:aws:s3:::dropper/*",
|
|
"arn:aws:s3:::dropper-dev/*",
|
|
"arn:aws:s3:::k8s-pages/dropper",
|
|
"arn:aws:s3:::k8s-pages/dropper-dev",
|
|
"arn:aws:s3:::dropper",
|
|
"arn:aws:s3:::dropper-dev",
|
|
],
|
|
}
|
|
],
|
|
}
|
|
# =====================
|
|
|
|
def amz_date_now():
|
|
now = dt.datetime.utcnow()
|
|
return now.strftime("%Y%m%dT%H%M%SZ"), now.strftime("%Y%m%d")
|
|
|
|
def sha256_hex(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
def hmac_sha256(key: bytes, msg: str) -> bytes:
|
|
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
|
|
|
def get_signing_key(secret_key: str, datestamp: str, region: str, service: str) -> bytes:
|
|
k_date = hmac_sha256(("AWS4" + secret_key).encode("utf-8"), datestamp)
|
|
k_region = hmac_sha256(k_date, region)
|
|
k_service = hmac_sha256(k_region, service)
|
|
k_signing = hmac_sha256(k_service, "aws4_request")
|
|
return k_signing
|
|
|
|
def canonical_query_string(qs: dict) -> str:
|
|
# AWS requires query params sorted by key, then value; values URL-encoded
|
|
parts = []
|
|
for k in sorted(qs.keys()):
|
|
v = qs[k]
|
|
parts.append(
|
|
urlparse.quote(str(k), safe="-_.~") + "=" + urlparse.quote(str(v), safe="-_.~")
|
|
)
|
|
return "&".join(parts)
|
|
|
|
def sign_request(method: str, url: str, host_header: str, params: dict, body: bytes,
|
|
access_key: str, secret_key: str, region: str, service: str, session_token: str | None):
|
|
amz_datetime, datestamp = amz_date_now()
|
|
|
|
canonical_uri = urlparse.quote(urlparse.urlparse(url).path or "/", safe="/-_.~")
|
|
canonical_qs = canonical_query_string(params or {})
|
|
|
|
# We intentionally use UNSIGNED-PAYLOAD to match your curl
|
|
payload_hash = "UNSIGNED-PAYLOAD"
|
|
|
|
# Required headers (and only these will be signed)
|
|
headers = {
|
|
"host": host_header,
|
|
"x-amz-content-sha256": payload_hash,
|
|
"x-amz-date": amz_datetime,
|
|
}
|
|
if session_token:
|
|
headers["x-amz-security-token"] = session_token
|
|
|
|
signed_headers_list = ["host", "x-amz-content-sha256", "x-amz-date"]
|
|
if session_token:
|
|
signed_headers_list.append("x-amz-security-token")
|
|
signed_headers = ";".join(signed_headers_list)
|
|
|
|
canonical_headers = "".join(f"{h}:{headers[h]}\n" for h in signed_headers_list)
|
|
|
|
canonical_request = "\n".join(
|
|
[
|
|
method,
|
|
canonical_uri,
|
|
canonical_qs,
|
|
canonical_headers,
|
|
signed_headers,
|
|
payload_hash,
|
|
]
|
|
)
|
|
|
|
algorithm = "AWS4-HMAC-SHA256"
|
|
credential_scope = f"{datestamp}/{region}/{service}/aws4_request"
|
|
string_to_sign = "\n".join(
|
|
[
|
|
algorithm,
|
|
amz_datetime,
|
|
credential_scope,
|
|
sha256_hex(canonical_request.encode("utf-8")),
|
|
]
|
|
)
|
|
|
|
signing_key = get_signing_key(secret_key, datestamp, region, service)
|
|
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
|
|
authorization = (
|
|
f"{algorithm} Credential={access_key}/{credential_scope}, "
|
|
f"SignedHeaders={signed_headers}, Signature={signature}"
|
|
)
|
|
|
|
# Build final headers for the HTTP request
|
|
final_headers = {
|
|
"Authorization": authorization,
|
|
"x-amz-date": amz_datetime,
|
|
"x-amz-content-sha256": payload_hash,
|
|
"Host": host_header,
|
|
"Content-Type": "application/json",
|
|
}
|
|
if session_token:
|
|
final_headers["x-amz-security-token"] = session_token
|
|
|
|
return final_headers
|
|
|
|
def main():
|
|
full_url = ENDPOINT.rstrip("/") + path
|
|
host_header = urlparse.urlparse(ENDPOINT).netloc or "localhost:9000"
|
|
body_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8")
|
|
|
|
headers = sign_request(
|
|
method="PUT",
|
|
url=full_url,
|
|
host_header=host_header,
|
|
params=query_params,
|
|
body=body_bytes,
|
|
access_key=ACCESS_KEY,
|
|
secret_key=SECRET_KEY,
|
|
region=REGION,
|
|
service=SERVICE,
|
|
session_token=SESSION_TOKEN,
|
|
)
|
|
|
|
r = requests.put(full_url, params=query_params, data=body_bytes, headers=headers)
|
|
print("Status:", r.status_code)
|
|
print("Response:", r.text)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|