Audit log sample
- Last Updated: June 30, 2026
- 5 minute read
- Automate MFT
- Documentation
The following Python sample demonstrates how to use the Audit Logs Reporting API to create, monitor, download, and delete an audit log report.
This script serves as a reference implementation for customers who want to automate audit log exports by using the Automate MFT REST API.
The complete sample script is provided below and can be copied and modified as required for your environment.
Usage examples
To get audit logs for
2026-06-26
in a US environment,
run:python audit_logs_export.py .\param-tester-staging.pem <api-key-pem> <api-key-id> <tenant-id> --base-url "https://api.us.mft.progress.com --report-type csv --start "2026-06-26T00:00:00Z" --end "2026-06-26T23:00:00Z"Full sample script
#!/usr/bin/env python3
"""Create, poll, download, and delete an Audit Logs report via Hermes REST API.
Example:
python examples/audit_logs_report_flow.py \
./private_key.pem \
<kid> \
<tenant_id> \
--base-url https://api.eu.mft.progress.com \
--report-type csv \
--lookback-hours 24 \
--output ./audit-logs-report.csv
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import pathlib
import sys
import time
import uuid
import jwt
import requests
AUDIT_LOG_ACTION_VALUES = ["Get", "Create", "Update", "Delete"]
AUDIT_LOG_OUTCOME_VALUES = ["Success", "Error"]
AUDIT_LOG_RESOURCE_VALUES = [
"Task",
"Endpoint",
"AuthenticationMethod",
"Schedule",
"Agent",
"Folder",
"KeyCert",
"Script",
"Parameter",
"Tag",
"User",
"Role",
"Report",
"IdentityProvider",
"PublicKey",
"Membership",
"TenantSetting",
"AuditLog",
"System",
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Audit logs report automation")
parser.add_argument("private_key_file", help="Path to ES256 private key (PEM)")
parser.add_argument("kid", help="API key id (JWT header kid)")
parser.add_argument("tenant_id", help="Tenant id used for iss/sub")
parser.add_argument(
"--base-url",
default="https://api.eu.mft.progress.com",
help="API base URL (default: https://api.eu.mft.progress.com)",
)
parser.add_argument(
"--report-type",
choices=["json", "csv"],
default="json",
help="Report format (default: json)",
)
parser.add_argument(
"--lookback-hours",
type=int,
default=24,
help="How many hours back from now for report start time (default: 24)",
)
parser.add_argument(
"--start",
default=None,
help="Optional UTC ISO timestamp, example: 2026-06-02T10:00:00Z",
)
parser.add_argument(
"--end",
default=None,
help="Optional UTC ISO timestamp, example: 2026-06-02T11:00:00Z",
)
parser.add_argument(
"--poll-interval",
type=int,
default=5,
help="Polling interval in seconds (default: 5)",
)
parser.add_argument(
"--max-wait-seconds",
type=int,
default=900,
help="Max wait for report completion (default: 900)",
)
parser.add_argument(
"--output",
default=None,
help="Output file path. If omitted, file name is inferred from report id/type.",
)
parser.add_argument(
"--timeout",
type=int,
default=20,
help="HTTP timeout in seconds (default: 20)",
)
parser.add_argument(
"--actor",
default=None,
help="Optional actor (email substring), max 320 chars",
)
parser.add_argument(
"--actor-type",
choices=["All", "User", "ApiKey"],
default="All",
help="Optional actor type filter (default: All)",
)
parser.add_argument(
"--action",
action="append",
default=[],
choices=AUDIT_LOG_ACTION_VALUES,
help="Repeatable. Optional action filter",
)
parser.add_argument(
"--resource",
action="append",
default=[],
choices=AUDIT_LOG_RESOURCE_VALUES,
help="Repeatable. Optional resource group filter",
)
parser.add_argument(
"--outcome",
action="append",
default=[],
choices=AUDIT_LOG_OUTCOME_VALUES,
help="Repeatable. Optional outcome filter",
)
parser.add_argument(
"--source-ip",
default=None,
help="Optional source IP filter (IPv4 or IPv6)",
)
parser.add_argument(
"--list",
action="store_true",
default=False,
help="List all existing audit log reports and exit (no report creation)",
)
return parser.parse_args()
def parse_iso_utc(value: str) -> dt.datetime:
value = value.strip()
if value.endswith("Z"):
value = value[:-1] + "+00:00"
parsed = dt.datetime.fromisoformat(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def to_iso_utc(value: dt.datetime) -> str:
return value.astimezone(dt.timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def issue_access_token(base_url: str, private_key_file: str, kid: str, tenant_id: str, timeout: int) -> str:
with open(private_key_file, "r", encoding="utf-8") as f:
private_key = f.read()
now = int(time.time())
token = jwt.encode(
{
"iss": tenant_id,
"sub": tenant_id,
"aud": f"{base_url}/v1/oauth/token",
"jti": str(uuid.uuid4()),
"nbf": now,
"exp": now + 20,
},
private_key,
algorithm="ES256",
headers={"kid": kid},
)
response = requests.post(
f"{base_url}/v1/oauth/token",
headers={"Content-Type": "application/json"},
json={
"grant_type": "client_credentials",
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token,
},
timeout=timeout,
)
response.raise_for_status()
payload = response.json()
access_token = payload.get("access_token")
if not access_token:
raise RuntimeError(f"Token response did not contain access_token: {payload}")
return access_token
def create_report(
base_url: str,
access_token: str,
report_type: str,
start_iso: str,
end_iso: str,
timeout: int,
actor: str | None,
actor_type: str,
actions: list[str],
resources: list[str],
outcomes: list[str],
source_ip: str | None,
) -> dict:
# The API enum accepts JSON=0, CSV=1; using numeric values avoids enum string casing issues.
report_type_value = 1 if report_type.lower() == "csv" else 0
body = {
"type": report_type_value,
"start": start_iso,
"end": end_iso,
"actor": actor,
"actorType": actor_type,
"actions": list(dict.fromkeys(actions)),
"resources": list(dict.fromkeys(resources)),
"outcomes": list(dict.fromkeys(outcomes)),
"sourceIp": source_ip,
}
response = requests.post(
f"{base_url}/v1/reports/audit-logs",
headers={
"Authorization": f"bearer {access_token}",
"Content-Type": "application/json",
},
json=body,
timeout=timeout,
)
response.raise_for_status()
return response.json()
def get_report(base_url: str, access_token: str, report_id: str, timeout: int) -> dict:
response = requests.get(
f"{base_url}/v1/reports/audit-logs/{report_id}",
headers={"Authorization": f"bearer {access_token}"},
timeout=timeout,
)
response.raise_for_status()
return response.json()
def list_reports(base_url: str, access_token: str, timeout: int) -> list[dict]:
response = requests.get(
f"{base_url}/v1/reports/audit-logs",
headers={"Authorization": f"bearer {access_token}"},
timeout=timeout,
)
response.raise_for_status()
return response.json().get("items", [])
def wait_until_completed(
base_url: str,
access_token: str,
report_id: str,
poll_interval: int,
max_wait_seconds: int,
timeout: int,
) -> dict:
deadline = time.monotonic() + max_wait_seconds
while True:
report = get_report(base_url, access_token, report_id, timeout)
status = report.get("status")
# Some serializers return enum strings, others numeric values.
status_text = str(status).lower()
is_completed = (status_text == "completed") or (status == 1)
is_in_progress = (status_text == "inprogress") or (status == 0)
if is_completed:
return report
if not is_in_progress:
raise RuntimeError(f"Unexpected report status: {status}. Full response: {report}")
if time.monotonic() >= deadline:
raise TimeoutError(f"Timed out waiting for report {report_id} to complete.")
print(f"Report {report_id} still in progress. Waiting {poll_interval}s...")
time.sleep(poll_interval)
def download_report(report: dict, access_token: str, output_path: pathlib.Path, timeout: int) -> None:
report_url = report.get("url")
if not report_url:
raise RuntimeError(f"Report response did not include URL: {report}")
# Presigned URLs usually do not require Authorization, but keep a fallback.
response = requests.get(report_url, timeout=timeout, stream=True)
if response.status_code in (401, 403):
response = requests.get(
report_url,
headers={"Authorization": f"bearer {access_token}"},
timeout=timeout,
stream=True,
)
response.raise_for_status()
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 64):
if chunk:
f.write(chunk)
def delete_report(base_url: str, access_token: str, report_id: str, timeout: int) -> None:
response = requests.delete(
f"{base_url}/v1/reports/audit-logs/{report_id}",
headers={"Authorization": f"bearer {access_token}"},
timeout=timeout,
)
response.raise_for_status()
def build_time_range(args: argparse.Namespace) -> tuple[str, str]:
now = dt.datetime.now(dt.timezone.utc)
if args.start:
start_dt = parse_iso_utc(args.start)
else:
start_dt = now - dt.timedelta(hours=args.lookback_hours)
if args.end:
end_dt = parse_iso_utc(args.end)
else:
end_dt = now
if start_dt >= end_dt:
raise ValueError("start must be earlier than end")
return to_iso_utc(start_dt), to_iso_utc(end_dt)
def infer_output_path(args: argparse.Namespace, report: dict) -> pathlib.Path:
if args.output:
return pathlib.Path(args.output)
report_id = report.get("id", f"audit-logs-report-{uuid.uuid4()}")
report_type = str(args.report_type).lower()
ext = "csv" if report_type == "csv" else "json"
if report_id.lower().endswith(f".{ext}"):
filename = report_id
else:
filename = f"{report_id}.{ext}"
return pathlib.Path(filename)
def main() -> int:
args = parse_args()
try:
print("Issuing access token...")
access_token = issue_access_token(
base_url=args.base_url,
private_key_file=args.private_key_file,
kid=args.kid,
tenant_id=args.tenant_id,
timeout=args.timeout,
)
if args.list:
print("Listing all audit log reports...")
reports = list_reports(
base_url=args.base_url,
access_token=access_token,
timeout=args.timeout,
)
if not reports:
print("No reports found.")
else:
for r in reports:
print(f"Report {r.get('id')} status: {r.get('status')}")
return 0
start_iso, end_iso = build_time_range(args)
print(f"Creating audit logs report from {start_iso} to {end_iso}...")
created = create_report(
base_url=args.base_url,
access_token=access_token,
report_type=args.report_type,
start_iso=start_iso,
end_iso=end_iso,
timeout=args.timeout,
actor=args.actor,
actor_type=args.actor_type,
actions=args.action,
resources=args.resource,
outcomes=args.outcome,
source_ip=args.source_ip,
)
report_id = created.get("id")
if not report_id:
raise RuntimeError(f"Create response did not include report id: {created}")
print(f"Report created. id={report_id}, status={created.get('status')}")
report = wait_until_completed(
base_url=args.base_url,
access_token=access_token,
report_id=report_id,
poll_interval=args.poll_interval,
max_wait_seconds=args.max_wait_seconds,
timeout=args.timeout,
)
output_path = infer_output_path(args, report)
print(f"Downloading report to {output_path}...")
download_report(report=report, access_token=access_token, output_path=output_path, timeout=args.timeout)
print(f"Deleting report {report_id} from server...")
delete_report(base_url=args.base_url, access_token=access_token, report_id=report_id, timeout=args.timeout)
print("Done")
print(json.dumps({
"reportId": report_id,
"status": report.get("status"),
"size": report.get("size"),
"output": str(output_path),
}, indent=2))
return 0
except Exception as exc: # broad on purpose for CLI UX
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())