S3 is the canonical landing zone for files that need processing. PDFs uploaded by users, HEIC photos from a mobile app, DOCX exports from an internal tool — they all pile up in a bucket. The pattern is always the same: list → convert → put back. This guide shows how to wire that loop efficiently using the ChangeThisFile API, with a focus on throughput, idempotency, and not paying for conversions you've already done.
TL;DR — throughput numbers first
Before the code, the math:
- Free tier (1K/mo): Good for prototyping. At 200 files you're 20% of your monthly budget.
- Hobby ($29/mo, 10K): ~333 files/day sustained, or burst ~2K files in an hour.
- Startup ($99/mo, 50K): ~1,666 files/day. At 10 concurrent workers, an overnight batch of 10K files completes in ~50 minutes.
- Scale ($499/mo, 500K): The math no longer matters — run as many workers as your S3 bandwidth allows.
Latency per conversion (p50/p95): ~800ms / ~3s for images and documents under 5MB. FFmpeg video conversions run 5–30s depending on duration and codec. Size your concurrency to saturate the API without hitting rate limits (see the rate-limit guide).
The naive approach and why it fails
The obvious first draft looks like this:
import boto3, requests
s3 = boto3.client('s3')
for obj in s3.list_objects_v2(Bucket='my-bucket')['Contents']:
body = s3.get_object(Bucket='my-bucket', Key=obj['Key'])['Body'].read()
resp = requests.post(
'https://changethisfile.com/v1/convert',
headers={'Authorization': 'Bearer ctf_sk_...'},
files={'file': body},
data={'target': 'jpg'},
)
s3.put_object(Bucket='my-bucket', Key=obj['Key'].replace('.png', '.jpg'), Body=resp.content)
This has four failure modes at scale:
- Serial bottleneck: One conversion at a time. At 800ms per file, 10K files takes 2.2 hours.
- Pagination ignored:
list_objects_v2returns max 1,000 keys. Buckets with 10K+ objects silently truncate. - No idempotency: A crash halfway through means you re-convert everything from the start, wasting conversions and money.
- No error handling: A single 429 or 5xx kills the entire batch with no retry.
Production pattern: asyncio worker pool
The right architecture uses three components: a paginator to stream the full bucket listing, a bounded asyncio semaphore to cap concurrency, and a progress file to checkpoint completed keys so you can resume after failure.
Key design decisions:
- Concurrency = 10: Matches the API's per-key rate limit comfortably. Increase to 20 on Startup/Scale plans.
- Output key is deterministic:
converted/{original_stem}.jpg— so re-running the job skips already-converted files. - Download via presigned URL: You can pass a presigned URL directly as the
file_urlform field and avoid proxying bytes through your pipeline machine.
Full pipeline code
#!/usr/bin/env python3
"""S3 bulk conversion pipeline — list → convert → put back."""
import asyncio
import json
import os
from pathlib import Path
import boto3
import httpx
API_KEY = os.environ['CTF_API_KEY']
BUCKET = os.environ['S3_BUCKET']
SOURCE_PREFIX = os.environ.get('SOURCE_PREFIX', 'uploads/')
TARGET_FORMAT = os.environ.get('TARGET_FORMAT', 'jpg')
CONCURRENCY = int(os.environ.get('CONCURRENCY', '10'))
CHECKPOINT_FILE = Path('converted_keys.json')
API_URL = 'https://changethisfile.com/v1/convert'
s3 = boto3.client('s3')
def load_checkpoint() -> set:
if CHECKPOINT_FILE.exists():
return set(json.loads(CHECKPOINT_FILE.read_text()))
return set()
def save_checkpoint(done: set):
CHECKPOINT_FILE.write_text(json.dumps(sorted(done)))
def list_keys(bucket: str, prefix: str):
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get('Contents', []):
yield obj['Key']
def output_key(source_key: str, target_fmt: str) -> str:
stem = Path(source_key).stem
parent = str(Path(source_key).parent)
return f"converted/{parent}/{stem}.{target_fmt}".replace('//', '/')
async def convert_one(
client: httpx.AsyncClient,
key: str,
sem: asyncio.Semaphore,
done: set,
) -> bool:
out_key = output_key(key, TARGET_FORMAT)
if out_key in done:
return True # Already converted — skip
async with sem:
# Generate presigned URL so the API fetches directly from S3
presigned = s3.generate_presigned_url(
'get_object',
Params={'Bucket': BUCKET, 'Key': key},
ExpiresIn=300,
)
for attempt in range(3):
try:
resp = await client.post(
API_URL,
headers={'Authorization': f'Bearer {API_KEY}'},
data={'file_url': presigned, 'target': TARGET_FORMAT},
timeout=120,
)
if resp.status_code == 429:
retry_after = int(resp.headers.get('Retry-After', '60'))
await asyncio.sleep(retry_after + attempt * 10)
continue
resp.raise_for_status()
s3.put_object(
Bucket=BUCKET,
Key=out_key,
Body=resp.content,
ContentType=f'image/{TARGET_FORMAT}',
)
done.add(out_key)
return True
except (httpx.HTTPStatusError, httpx.TimeoutException) as e:
if attempt == 2:
print(f'FAILED {key}: {e}')
return False
await asyncio.sleep(2 ** attempt)
return False
async def main():
done = load_checkpoint()
keys = list(list_keys(BUCKET, SOURCE_PREFIX))
print(f'Found {len(keys)} objects, {len(done)} already converted')
sem = asyncio.Semaphore(CONCURRENCY)
async with httpx.AsyncClient() as client:
tasks = [convert_one(client, k, sem, done) for k in keys]
results = await asyncio.gather(*tasks)
save_checkpoint(done)
succeeded = sum(results)
print(f'Done: {succeeded}/{len(keys)} converted ({len(keys)-succeeded} failed)')
if __name__ == '__main__':
asyncio.run(main())
Run it:
CTF_API_KEY=ctf_sk_... S3_BUCKET=my-bucket TARGET_FORMAT=jpg python3 s3_bulk_convert.py
After a crash, re-run the same command — the checkpoint file ensures already-converted keys are skipped automatically.
Idempotency: the Idempotency-Key header
Beyond the checkpoint file, the API supports the Idempotency-Key header. If you send the same key twice within 24 hours, the second request returns the cached result immediately without consuming a conversion from your quota.
import hashlib
def idempotency_key(s3_key: str, target_fmt: str) -> str:
# Deterministic: same input + format always produces same key
payload = f"{s3_key}|{target_fmt}"
return hashlib.sha256(payload.encode()).hexdigest()[:32]
resp = await client.post(
API_URL,
headers={
'Authorization': f'Bearer {API_KEY}',
'Idempotency-Key': idempotency_key(key, TARGET_FORMAT),
},
data={'file_url': presigned, 'target': TARGET_FORMAT},
)
This means if your pipeline crashes after the API call but before the S3 put, re-running the job fetches the cached result from the API rather than re-converting. Combine this with the checkpoint file for double-safety.
Error recovery for failed files
Not all files will convert. Password-protected PDFs, corrupted ZIPs, truncated images — the API returns a 4xx with a JSON error body. Collect failures separately so you can review and retry:
FAILURES_FILE = Path('conversion_failures.jsonl')
async def convert_one_with_log(client, key, sem, done):
success = await convert_one(client, key, sem, done)
if not success:
with FAILURES_FILE.open('a') as f:
json.dump({'key': key, 'ts': int(time.time())}, f)
f.write('\n')
return success
After the batch, review conversion_failures.jsonl. Common reasons: file corrupt (download again from source), unsupported format (check the 690 supported routes at changethisfile.com), file over 100MB limit (split or compress first).
Monitoring and observability
For long-running batches, log a progress line every N files so you can estimate completion time:
completed = 0
start_time = time.monotonic()
async def convert_with_progress(client, key, sem, done):
global completed
result = await convert_one(client, key, sem, done)
completed += 1
if completed % 100 == 0:
elapsed = time.monotonic() - start_time
rate = completed / elapsed
remaining = (total - completed) / rate if rate > 0 else 0
print(f'{completed}/{total} ({rate:.1f}/s, ~{remaining/60:.0f}m remaining)')
return result
Key metrics to track: conversions/second, failure rate, p95 latency. If failure rate exceeds 2%, stop and inspect — you likely have a batch of corrupted source files or you're hitting a file-size limit.
For billing awareness, track your monthly conversion count against your plan. The X-CTF-Remaining response header tells you how many conversions you have left in the current billing period.
The S3 bulk conversion pattern reduces to three components: a paginator, a semaphore-bounded worker pool, and a checkpoint file. Once those are in place, crashes become recoverable events rather than catastrophes. Get a free API key and run your first batch — the free tier covers 1K conversions to validate the pipeline before committing to a paid plan.