S3 is the canonical landing zone for files that need processing. PDFs uploaded by users, HEIC photos from a mobile app, DOCX exports from an internal tool — they all pile up in a bucket. The pattern is always the same: list → convert → put back. This guide shows how to wire that loop efficiently using the ChangeThisFile API, with a focus on throughput, idempotency, and not paying for conversions you've already done.

TL;DR — throughput numbers first

Before the code, the math:

  • Free tier (1K/mo): Good for prototyping. At 200 files you're 20% of your monthly budget.
  • Hobby ($29/mo, 10K): ~333 files/day sustained, or burst ~2K files in an hour.
  • Startup ($99/mo, 50K): ~1,666 files/day. At 10 concurrent workers, an overnight batch of 10K files completes in ~50 minutes.
  • Scale ($499/mo, 500K): The math no longer matters — run as many workers as your S3 bandwidth allows.

Latency per conversion (p50/p95): ~800ms / ~3s for images and documents under 5MB. FFmpeg video conversions run 5–30s depending on duration and codec. Size your concurrency to saturate the API without hitting rate limits (see the rate-limit guide).

The naive approach and why it fails

The obvious first draft looks like this:

import boto3, requests

s3 = boto3.client('s3')
for obj in s3.list_objects_v2(Bucket='my-bucket')['Contents']:
    body = s3.get_object(Bucket='my-bucket', Key=obj['Key'])['Body'].read()
    resp = requests.post(
        'https://changethisfile.com/v1/convert',
        headers={'Authorization': 'Bearer ctf_sk_...'},
        files={'file': body},
        data={'target': 'jpg'},
    )
    s3.put_object(Bucket='my-bucket', Key=obj['Key'].replace('.png', '.jpg'), Body=resp.content)

This has four failure modes at scale:

  1. Serial bottleneck: One conversion at a time. At 800ms per file, 10K files takes 2.2 hours.
  2. Pagination ignored: list_objects_v2 returns max 1,000 keys. Buckets with 10K+ objects silently truncate.
  3. No idempotency: A crash halfway through means you re-convert everything from the start, wasting conversions and money.
  4. No error handling: A single 429 or 5xx kills the entire batch with no retry.

Production pattern: asyncio worker pool

The right architecture uses three components: a paginator to stream the full bucket listing, a bounded asyncio semaphore to cap concurrency, and a progress file to checkpoint completed keys so you can resume after failure.

Key design decisions:

  • Concurrency = 10: Matches the API's per-key rate limit comfortably. Increase to 20 on Startup/Scale plans.
  • Output key is deterministic: converted/{original_stem}.jpg — so re-running the job skips already-converted files.
  • Download via presigned URL: You can pass a presigned URL directly as the file_url form field and avoid proxying bytes through your pipeline machine.

Full pipeline code

#!/usr/bin/env python3
"""S3 bulk conversion pipeline — list → convert → put back."""
import asyncio
import json
import os
from pathlib import Path

import boto3
import httpx

API_KEY = os.environ['CTF_API_KEY']
BUCKET = os.environ['S3_BUCKET']
SOURCE_PREFIX = os.environ.get('SOURCE_PREFIX', 'uploads/')
TARGET_FORMAT = os.environ.get('TARGET_FORMAT', 'jpg')
CONCURRENCY = int(os.environ.get('CONCURRENCY', '10'))
CHECKPOINT_FILE = Path('converted_keys.json')
API_URL = 'https://changethisfile.com/v1/convert'

s3 = boto3.client('s3')


def load_checkpoint() -> set:
    if CHECKPOINT_FILE.exists():
        return set(json.loads(CHECKPOINT_FILE.read_text()))
    return set()


def save_checkpoint(done: set):
    CHECKPOINT_FILE.write_text(json.dumps(sorted(done)))


def list_keys(bucket: str, prefix: str):
    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []):
            yield obj['Key']


def output_key(source_key: str, target_fmt: str) -> str:
    stem = Path(source_key).stem
    parent = str(Path(source_key).parent)
    return f"converted/{parent}/{stem}.{target_fmt}".replace('//', '/')


async def convert_one(
    client: httpx.AsyncClient,
    key: str,
    sem: asyncio.Semaphore,
    done: set,
) -> bool:
    out_key = output_key(key, TARGET_FORMAT)
    if out_key in done:
        return True  # Already converted — skip

    async with sem:
        # Generate presigned URL so the API fetches directly from S3
        presigned = s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': BUCKET, 'Key': key},
            ExpiresIn=300,
        )

        for attempt in range(3):
            try:
                resp = await client.post(
                    API_URL,
                    headers={'Authorization': f'Bearer {API_KEY}'},
                    data={'file_url': presigned, 'target': TARGET_FORMAT},
                    timeout=120,
                )
                if resp.status_code == 429:
                    retry_after = int(resp.headers.get('Retry-After', '60'))
                    await asyncio.sleep(retry_after + attempt * 10)
                    continue
                resp.raise_for_status()
                s3.put_object(
                    Bucket=BUCKET,
                    Key=out_key,
                    Body=resp.content,
                    ContentType=f'image/{TARGET_FORMAT}',
                )
                done.add(out_key)
                return True
            except (httpx.HTTPStatusError, httpx.TimeoutException) as e:
                if attempt == 2:
                    print(f'FAILED {key}: {e}')
                    return False
                await asyncio.sleep(2 ** attempt)

    return False


async def main():
    done = load_checkpoint()
    keys = list(list_keys(BUCKET, SOURCE_PREFIX))
    print(f'Found {len(keys)} objects, {len(done)} already converted')

    sem = asyncio.Semaphore(CONCURRENCY)
    async with httpx.AsyncClient() as client:
        tasks = [convert_one(client, k, sem, done) for k in keys]
        results = await asyncio.gather(*tasks)

    save_checkpoint(done)
    succeeded = sum(results)
    print(f'Done: {succeeded}/{len(keys)} converted ({len(keys)-succeeded} failed)')


if __name__ == '__main__':
    asyncio.run(main())

Run it:

CTF_API_KEY=ctf_sk_... S3_BUCKET=my-bucket TARGET_FORMAT=jpg python3 s3_bulk_convert.py

After a crash, re-run the same command — the checkpoint file ensures already-converted keys are skipped automatically.

Idempotency: the Idempotency-Key header

Beyond the checkpoint file, the API supports the Idempotency-Key header. If you send the same key twice within 24 hours, the second request returns the cached result immediately without consuming a conversion from your quota.

import hashlib

def idempotency_key(s3_key: str, target_fmt: str) -> str:
    # Deterministic: same input + format always produces same key
    payload = f"{s3_key}|{target_fmt}"
    return hashlib.sha256(payload.encode()).hexdigest()[:32]

resp = await client.post(
    API_URL,
    headers={
        'Authorization': f'Bearer {API_KEY}',
        'Idempotency-Key': idempotency_key(key, TARGET_FORMAT),
    },
    data={'file_url': presigned, 'target': TARGET_FORMAT},
)

This means if your pipeline crashes after the API call but before the S3 put, re-running the job fetches the cached result from the API rather than re-converting. Combine this with the checkpoint file for double-safety.

Error recovery for failed files

Not all files will convert. Password-protected PDFs, corrupted ZIPs, truncated images — the API returns a 4xx with a JSON error body. Collect failures separately so you can review and retry:

FAILURES_FILE = Path('conversion_failures.jsonl')

async def convert_one_with_log(client, key, sem, done):
    success = await convert_one(client, key, sem, done)
    if not success:
        with FAILURES_FILE.open('a') as f:
            json.dump({'key': key, 'ts': int(time.time())}, f)
            f.write('\n')
    return success

After the batch, review conversion_failures.jsonl. Common reasons: file corrupt (download again from source), unsupported format (check the 690 supported routes at changethisfile.com), file over 100MB limit (split or compress first).

Monitoring and observability

For long-running batches, log a progress line every N files so you can estimate completion time:

completed = 0
start_time = time.monotonic()

async def convert_with_progress(client, key, sem, done):
    global completed
    result = await convert_one(client, key, sem, done)
    completed += 1
    if completed % 100 == 0:
        elapsed = time.monotonic() - start_time
        rate = completed / elapsed
        remaining = (total - completed) / rate if rate > 0 else 0
        print(f'{completed}/{total} ({rate:.1f}/s, ~{remaining/60:.0f}m remaining)')
    return result

Key metrics to track: conversions/second, failure rate, p95 latency. If failure rate exceeds 2%, stop and inspect — you likely have a batch of corrupted source files or you're hitting a file-size limit.

For billing awareness, track your monthly conversion count against your plan. The X-CTF-Remaining response header tells you how many conversions you have left in the current billing period.

The S3 bulk conversion pattern reduces to three components: a paginator, a semaphore-bounded worker pool, and a checkpoint file. Once those are in place, crashes become recoverable events rather than catastrophes. Get a free API key and run your first batch — the free tier covers 1K conversions to validate the pipeline before committing to a paid plan.