No production batch pipeline has a 100% success rate. Files are corrupted in transit, HEIC variants from older iPhones aren't fully supported, PDFs have nonstandard encoding, video containers claim one codec but contain another. In a 1,000-file batch, a 3% failure rate means 30 files need attention. The question isn't whether failures will happen — it's how your pipeline handles them so you can review and resolve the 30 without re-running the 970.
TL;DR — error taxonomy for conversion APIs
API errors fall into three categories:
| Category | HTTP Codes | Action |
|---|---|---|
| Retryable transient | 429, 500, 502, 503, 504 | Retry with backoff — these resolve on their own |
| Permanent client error | 400, 413, 422 | Quarantine — these won't resolve with retry |
| Auth/quota | 401, 403 | Stop everything — check API key and plan limits |
Timeouts and network errors (ConnectionError, ReadTimeout) are also retryable — they indicate the request didn't reach the server or the server dropped the connection mid-transfer.
Expected failure rates in production batches: 0.5-2% permanent errors (bad source files), 0.1-1% transient errors (resolved on first retry). Total failure after retries: typically under 1%.
What a brittle pipeline looks like
# Brittle: stops on first failure, no error classification
for file in files:
resp = requests.post(
'https://changethisfile.com/v1/convert',
files={'file': open(file, 'rb')},
data={'target': 'jpg'},
headers={'Authorization': f'Bearer {API_KEY}'},
)
resp.raise_for_status() # Raises on first 4xx/5xx — kills the whole batch
save(file, resp.content)
This has three problems: it stops on the first error (one corrupted PDF kills 999 good conversions), it doesn't distinguish between retryable and permanent errors, and it provides no record of which files failed for later review.
Error classification and handling
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import time
class FailureKind(Enum):
RETRYABLE = 'retryable' # Retry with backoff
PERMANENT = 'permanent' # Quarantine — won't fix with retry
AUTH = 'auth' # Stop everything
@dataclass
class ConversionError:
file_path: str
status_code: int
message: str
kind: FailureKind
attempt: int = 1
ts: float = field(default_factory=time.time)
def classify_error(status_code: int, resp_body: str) -> FailureKind:
if status_code in (401, 403):
return FailureKind.AUTH
if status_code in (400, 413, 422):
return FailureKind.PERMANENT
if status_code in (429, 500, 502, 503, 504):
return FailureKind.RETRYABLE
# Unknown codes: treat as permanent to avoid infinite retry loops
return FailureKind.PERMANENT
def describe_permanent_error(status_code: int, resp_body: str) -> str:
"""Human-readable reason for permanent failures."""
if status_code == 400:
return f'Bad request: {resp_body[:200]}'
if status_code == 413:
return 'File too large (max 100MB for sync, 500MB for async)'
if status_code == 422:
body = json.loads(resp_body) if resp_body.startswith('{') else {}
code = body.get('error', {}).get('code', 'unknown')
if code == 'encrypted':
return 'File is password-protected'
if code == 'corrupted':
return 'File appears corrupted or truncated'
if code == 'unsupported_variant':
return 'File format variant not supported'
return f'Unprocessable: {resp_body[:200]}'
return f'HTTP {status_code}'
Resilient pipeline with quarantine queue
#!/usr/bin/env python3
import asyncio
import json
import os
import time
from pathlib import Path
from typing import Optional
import httpx
API_KEY = os.environ['CTF_API_KEY']
API_URL = 'https://changethisfile.com/v1/convert'
CONCURRENCY = 10
MAX_RETRIES = 3
# Persistent state files
DONE_FILE = Path('.pipeline_done.json')
FAILED_FILE = Path('.pipeline_failed.jsonl')
RETRY_FILE = Path('.pipeline_retry.jsonl')
def load_done() -> set:
return set(json.loads(DONE_FILE.read_text())) if DONE_FILE.exists() else set()
def append_failure(kind: str, path: str, reason: str, attempt: int):
with FAILED_FILE.open('a') as f:
json.dump({'kind': kind, 'path': path, 'reason': reason,
'attempt': attempt, 'ts': time.time()}, f)
f.write('\n')
async def convert_file(
client: httpx.AsyncClient,
path: Path,
target: str,
out_path: Path,
sem: asyncio.Semaphore,
done: set,
) -> Optional[bytes]:
if str(out_path) in done or out_path.exists():
return b'' # Skip
async with sem:
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = await client.post(
API_URL,
headers={'Authorization': f'Bearer {API_KEY}'},
content=path.read_bytes(),
params={'target': target},
timeout=120,
)
kind = classify_error(resp.status_code, resp.text if resp.status_code >= 400 else '')
if kind == FailureKind.AUTH:
raise SystemExit(f'Auth failed ({resp.status_code}) — check API key')
if kind == FailureKind.PERMANENT:
reason = describe_permanent_error(resp.status_code, resp.text)
append_failure('permanent', str(path), reason, attempt)
return None
if kind == FailureKind.RETRYABLE:
if resp.status_code == 429:
sleep = int(resp.headers.get('Retry-After', '60'))
else:
sleep = min(2 ** attempt, 64)
if attempt < MAX_RETRIES:
await asyncio.sleep(sleep)
continue
else:
append_failure('retryable_exhausted', str(path), f'HTTP {resp.status_code}', attempt)
return None
# Success
resp.raise_for_status()
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(resp.content)
done.add(str(out_path))
return resp.content
except httpx.TimeoutException as e:
if attempt < MAX_RETRIES:
await asyncio.sleep(min(2 ** attempt, 64))
else:
append_failure('timeout', str(path), str(e), attempt)
return None
except httpx.ConnectError as e:
append_failure('network', str(path), str(e), attempt)
return None
return None
async def main(source_dir: Path, output_dir: Path, target: str):
files = sorted(source_dir.rglob('*') if source_dir.is_dir() else [source_dir])
files = [f for f in files if f.is_file()]
done = load_done()
sem = asyncio.Semaphore(CONCURRENCY)
success = 0
async with httpx.AsyncClient() as client:
tasks = [
convert_file(client, f, target, output_dir / f.relative_to(source_dir).with_suffix(f'.{target}'), sem, done)
for f in files
]
for i, coro in enumerate(asyncio.as_completed(tasks), 1):
result = await coro
if result is not None:
success += 1
if i % 100 == 0:
DONE_FILE.write_text(json.dumps(sorted(done)))
print(f'[{i}/{len(files)}] {success} ok')
DONE_FILE.write_text(json.dumps(sorted(done)))
# Count permanent failures
permanent = retryable = 0
if FAILED_FILE.exists():
for line in FAILED_FILE.read_text().splitlines():
entry = json.loads(line)
if entry['kind'] == 'permanent':
permanent += 1
else:
retryable += 1
print(f'\nResults: {success} ok | {permanent} permanent failures | {retryable} retryable exhausted')
if permanent > 0:
print(f'Permanent failures logged to {FAILED_FILE}')
print('Review and address individually — retrying will not help.')
Reviewing and triaging the failure queue
After a batch, analyze the failures file:
# Count failures by kind
python3 -c "
import json
from collections import Counter
lines = open('.pipeline_failed.jsonl').readlines()
entries = [json.loads(l) for l in lines]
print('By kind:', Counter(e['kind'] for e in entries))
print('By reason:', Counter(e['reason'] for e in entries))
"
Common patterns and remedies:
- "File appears corrupted": Re-download from source. If the source is also corrupted, the file is unrecoverable — log and skip.
- "File is password-protected": Decrypt with the password and retry:
qpdf --decrypt --password=pw input.pdf output.pdf - "File too large": Split (PDFs: pdftk burst; video: ffmpeg segment) and convert in chunks.
- "File format variant not supported": Check if the file's actual format matches its extension. A JPEG renamed to .png will fail — detect the real format with
filecommand or python-magic, rename, and retry.
# Re-run only the failures
python3 -c "
import json
failures = [json.loads(l)['path'] for l in open('.pipeline_failed.jsonl')]
print('\n'.join(failures))
" | xargs -I{} python3 retry_single.py {} jpg
Failure rate monitoring and alerting
For automated pipelines, alert when the failure rate exceeds a threshold:
FAILURE_THRESHOLD = 0.05 # Alert if >5% fail
def check_failure_rate(total: int, failed: int) -> bool:
rate = failed / total if total > 0 else 0
if rate > FAILURE_THRESHOLD:
# Send alert (email, Slack, PagerDuty, etc.)
print(f'ALERT: {rate:.1%} failure rate ({failed}/{total}) exceeds {FAILURE_THRESHOLD:.0%} threshold')
return True
return False
Distinguish between correlated failures (all files from the same source, same format, same time window) and random failures. Correlated failures indicate a systemic issue (bad source data, unsupported format variant, API incident). Random failures at 1-2% are normal and expected.
A pipeline that finishes with 98% success and a clear failure log is far more useful than one that stops at 3% with an unhandled exception. Error classification, quarantine, and structured failure logging turn pipeline failures from incidents into routine maintenance. Free API key for testing your error handling before production loads.