Report sample
- Last Updated: June 30, 2026
- 5 minute read
- Automate MFT
- Documentation
The following Python sample demonstrates how to use the Reporting API to create, monitor, download, and delete an operational report.
This script serves as a reference implementation for customers who want to automate report generation by using the Automate MFT REST API.
The complete sample script is provided below and can be copied and modified as required for your environment.
Usage examples
To get task runs for
2026-06-26 in
a US environment,
run:python operational_reports_export.py <api-key-pem> <api-key-id> <tenant-id> --base-url "https://api.us.mft.progress.com"--report-kind task-runs --output-type csv --start "2026-06-26T00:00:00Z" --end "2026-06-26T24:00:00Z"To get file activities for
2026-06-26 in a US environment,
run:python operational_reports_export.py <api-key-pem> <api-key-id> <tenant-id> --base-url "https://api.us.mft.progress.com" --report-kind file-activity --output-type csv --start "2026-06-26T00:00:00Z" --end "2026-06-26T24:00:00Z"Full sample script
#!/usr/bin/env python3
"""Export file-activity or task-runs reports to JSON or CSV.
The REST API returns JSON. This script can keep JSON output or convert to CSV locally.
Examples:
python examples/operational_reports_export.py ./private_key.pem <kid> <tenant_id> \
--report-kind file-activity --output-type csv --lookback-hours 24 \
--output ./file-activity.csv
python examples/operational_reports_export.py ./private_key.pem <kid> <tenant_id> \
--report-kind task-runs --output-type json --start 2026-06-01T00:00:00Z --end 2026-06-02T00:00:00Z
Sortable fields:
file-activity: activityTime, taskName, sourceFilename, activity, size, sourcePath, destinationPath
task-runs: taskName, queuedTime, startTime, filesTransferred, totalTransferredBytes, startedBy
"""
from __future__ import annotations
import argparse
import csv
import datetime as dt
import json
import pathlib
import sys
import time
import uuid
from typing import Any, Dict, Iterable, List
import jwt
import requests
MAX_RATE_LIMIT_RETRIES = 10
def request_with_retry(method: str, url: str, **kwargs) -> requests.Response:
"""Send an HTTP request; on 429 honour Retry-After or use exponential backoff."""
for attempt in range(MAX_RATE_LIMIT_RETRIES):
response = requests.request(method, url, **kwargs)
if response.status_code != 429:
response.raise_for_status()
return response
wait = min(2 ** attempt, 60)
print(f"Rate limited (429). waiting {wait}s (attempt {attempt + 1}/{MAX_RATE_LIMIT_RETRIES})...")
time.sleep(wait)
# Final attempt — let it raise normally if still 429
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Export file-activity/task-runs reports")
parser.add_argument("private_key_file", help="Path to ES256 private key PEM")
parser.add_argument("kid", help="API key id (JWT header kid)")
parser.add_argument("tenant_id", help="Tenant id used for iss/sub")
parser.add_argument(
"--base-url",
default="https://api.eu.mft.progress.com",
help="API base URL",
)
parser.add_argument(
"--report-kind",
choices=["file-activity", "task-runs"],
required=True,
help="Which report endpoint to query",
)
parser.add_argument(
"--output-type",
choices=["json", "csv"],
default="json",
help="Output format. API is JSON; CSV conversion is local",
)
parser.add_argument(
"--output",
default=None,
help="Output file path",
)
parser.add_argument(
"--lookback-hours",
type=int,
default=24,
help="Time range start defaults to now-lookback-hours",
)
parser.add_argument("--start", default=None, help="UTC ISO timestamp, e.g. 2026-06-02T10:00:00Z")
parser.add_argument("--end", default=None, help="UTC ISO timestamp, e.g. 2026-06-02T11:00:00Z")
parser.add_argument(
"--take",
type=int,
default=200,
help="Page size, max 200",
)
parser.add_argument(
"--skip",
type=int,
default=0,
help="Initial offset (default: 0)",
)
parser.add_argument(
"--timeout",
type=int,
default=20,
help="HTTP timeout in seconds",
)
# Optional shared filters
parser.add_argument("--task-run-id", default=None)
parser.add_argument("--task-id", action="append", default=[], help="Repeatable")
parser.add_argument("--keyword", action="append", default=[], help="Repeatable")
parser.add_argument("--sort-by", default=None)
parser.add_argument("--sort-desc", action="store_true", default=False)
parser.add_argument(
"--folder-id-filter",
action="append",
default=[],
help="Repeatable. Optional folder scope filter",
)
# file-activity-only optional filters
parser.add_argument(
"--file-activity-status",
action="append",
default=[],
help="Repeatable. file-activity only. Example: Completed",
)
# task-runs-only optional filters
parser.add_argument(
"--task-runs-status",
action="append",
default=[],
help="Repeatable. task-runs only. Example: Completed",
)
parser.add_argument("--started-by", default=None, help="task-runs only")
parser.add_argument(
"--started-by-type",
action="append",
default=[],
help="Repeatable. task-runs only. One of: User,Schedule,PostTaskRun,ApiKey",
)
return parser.parse_args()
def parse_iso_utc(value: str) -> dt.datetime:
v = value.strip()
if v.endswith("Z"):
v = v[:-1] + "+00:00"
parsed = dt.datetime.fromisoformat(v)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def to_iso_utc(value: dt.datetime) -> str:
return value.astimezone(dt.timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def build_time_range(args: argparse.Namespace) -> tuple[str, str]:
now = dt.datetime.now(dt.timezone.utc)
start_dt = parse_iso_utc(args.start) if args.start else now - dt.timedelta(hours=args.lookback_hours)
end_dt = parse_iso_utc(args.end) if args.end else now
if start_dt >= end_dt:
raise ValueError("start must be earlier than end")
return to_iso_utc(start_dt), to_iso_utc(end_dt)
def issue_access_token(base_url: str, private_key_file: str, kid: str, tenant_id: str, timeout: int) -> str:
with open(private_key_file, "r", encoding="utf-8") as f:
private_key = f.read()
now = int(time.time())
assertion = jwt.encode(
{
"iss": tenant_id,
"sub": tenant_id,
"aud": f"{base_url}/v1/oauth/token",
"jti": str(uuid.uuid4()),
"nbf": now,
"exp": now + 20,
},
private_key,
algorithm="ES256",
headers={"kid": kid},
)
response = request_with_retry(
"POST",
f"{base_url}/v1/oauth/token",
headers={"Content-Type": "application/json"},
json={
"grant_type": "client_credentials",
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": assertion,
},
timeout=timeout,
)
payload = response.json()
token = payload.get("access_token")
if not token:
raise RuntimeError(f"Token response missing access_token: {payload}")
return token
def build_query_params(args: argparse.Namespace, skip: int, take: int, start_iso: str, end_iso: str) -> List[tuple[str, str]]:
params: List[tuple[str, str]] = [
("skip", str(skip)),
("take", str(take)),
("start", start_iso),
("end", end_iso),
]
if args.task_run_id:
params.append(("taskRunId", args.task_run_id))
if args.sort_by:
params.append(("sortBy", args.sort_by))
if args.sort_desc:
params.append(("sortDesc", "true"))
for task_id in args.task_id:
params.append(("taskIds", task_id))
for keyword in args.keyword:
params.append(("keywords", keyword))
for folder_id in args.folder_id_filter:
params.append(("folderIdsFilter", folder_id))
if args.report_kind == "file-activity":
for status in args.file_activity_status:
params.append(("status", status))
if args.report_kind == "task-runs":
for status in args.task_runs_status:
params.append(("status", status))
if args.started_by:
params.append(("startedBy", args.started_by))
for started_by_type in args.started_by_type:
params.append(("startedByTypes", started_by_type))
return params
def fetch_all_items(
base_url: str,
access_token: str,
report_kind: str,
args: argparse.Namespace,
start_iso: str,
end_iso: str,
) -> List[Dict[str, Any]]:
endpoint = f"{base_url}/v1/reports/{report_kind}"
items: List[Dict[str, Any]] = []
skip = max(args.skip, 0)
take = min(max(args.take, 1), 200)
total_from_response: int | None = None
while True:
params = build_query_params(args, skip=skip, take=take, start_iso=start_iso, end_iso=end_iso)
response = request_with_retry(
"GET",
endpoint,
headers={"Authorization": f"bearer {access_token}"},
params=params,
timeout=args.timeout,
)
data = response.json()
page_items = data.get("items") or []
if not isinstance(page_items, list):
raise RuntimeError(f"Unexpected response shape: {data}")
if "total" in data and isinstance(data["total"], int):
total_from_response = data["total"]
elif response.headers.get("x-hermes-total-items", "").isdigit():
total_from_response = int(response.headers["x-hermes-total-items"])
items.extend(page_items)
if len(page_items) < take:
break
skip += len(page_items)
if total_from_response is not None and skip >= total_from_response:
break
return items
def flatten_dict(record: Dict[str, Any], parent_key: str = "", sep: str = ".") -> Dict[str, Any]:
output: Dict[str, Any] = {}
for key, value in record.items():
full_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
output.update(flatten_dict(value, parent_key=full_key, sep=sep))
elif isinstance(value, list):
output[full_key] = json.dumps(value, ensure_ascii=False)
else:
output[full_key] = value
return output
def to_csv(items: Iterable[Dict[str, Any]], output_file: pathlib.Path) -> None:
flattened_rows = [flatten_dict(item) for item in items]
headers: List[str] = []
for row in flattened_rows:
for key in row.keys():
if key not in headers:
headers.append(key)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, extrasaction="ignore")
writer.writeheader()
for row in flattened_rows:
writer.writerow(row)
def to_json(items: Iterable[Dict[str, Any]], output_file: pathlib.Path) -> None:
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(list(items), f, indent=2, ensure_ascii=False)
def resolve_output_path(args: argparse.Namespace) -> pathlib.Path:
if args.output:
return pathlib.Path(args.output)
ext = "csv" if args.output_type == "csv" else "json"
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
return pathlib.Path(f"{args.report_kind}-{ts}.{ext}")
def main() -> int:
args = parse_args()
try:
start_iso, end_iso = build_time_range(args)
print("Issuing access token...")
access_token = issue_access_token(
base_url=args.base_url,
private_key_file=args.private_key_file,
kid=args.kid,
tenant_id=args.tenant_id,
timeout=args.timeout,
)
print(f"Fetching {args.report_kind} report items from {start_iso} to {end_iso}...")
items = fetch_all_items(
base_url=args.base_url,
access_token=access_token,
report_kind=args.report_kind,
args=args,
start_iso=start_iso,
end_iso=end_iso,
)
output_path = resolve_output_path(args)
if args.output_type == "csv":
to_csv(items, output_path)
else:
to_json(items, output_path)
print("Done")
print(json.dumps(
{
"reportKind": args.report_kind,
"outputType": args.output_type,
"items": len(items),
"output": str(output_path),
},
indent=2,
))
return 0
except Exception as exc: # broad on purpose for CLI UX
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())