No production batch pipeline has a 100% success rate. Files are corrupted in transit, HEIC variants from older iPhones aren't fully supported, PDFs have nonstandard encoding, video containers claim one codec but contain another. In a 1,000-file batch, a 3% failure rate means 30 files need attention. The question isn't whether failures will happen — it's how your pipeline handles them so you can review and resolve the 30 without re-running the 970.

TL;DR — error taxonomy for conversion APIs

API errors fall into three categories:

CategoryHTTP CodesAction
Retryable transient429, 500, 502, 503, 504Retry with backoff — these resolve on their own
Permanent client error400, 413, 422Quarantine — these won't resolve with retry
Auth/quota401, 403Stop everything — check API key and plan limits

Timeouts and network errors (ConnectionError, ReadTimeout) are also retryable — they indicate the request didn't reach the server or the server dropped the connection mid-transfer.

Expected failure rates in production batches: 0.5-2% permanent errors (bad source files), 0.1-1% transient errors (resolved on first retry). Total failure after retries: typically under 1%.

What a brittle pipeline looks like

# Brittle: stops on first failure, no error classification
for file in files:
    resp = requests.post(
        'https://changethisfile.com/v1/convert',
        files={'file': open(file, 'rb')},
        data={'target': 'jpg'},
        headers={'Authorization': f'Bearer {API_KEY}'},
    )
    resp.raise_for_status()  # Raises on first 4xx/5xx — kills the whole batch
    save(file, resp.content)

This has three problems: it stops on the first error (one corrupted PDF kills 999 good conversions), it doesn't distinguish between retryable and permanent errors, and it provides no record of which files failed for later review.

Error classification and handling

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import time


class FailureKind(Enum):
    RETRYABLE = 'retryable'      # Retry with backoff
    PERMANENT = 'permanent'      # Quarantine — won't fix with retry
    AUTH = 'auth'                # Stop everything


@dataclass
class ConversionError:
    file_path: str
    status_code: int
    message: str
    kind: FailureKind
    attempt: int = 1
    ts: float = field(default_factory=time.time)


def classify_error(status_code: int, resp_body: str) -> FailureKind:
    if status_code in (401, 403):
        return FailureKind.AUTH
    if status_code in (400, 413, 422):
        return FailureKind.PERMANENT
    if status_code in (429, 500, 502, 503, 504):
        return FailureKind.RETRYABLE
    # Unknown codes: treat as permanent to avoid infinite retry loops
    return FailureKind.PERMANENT


def describe_permanent_error(status_code: int, resp_body: str) -> str:
    """Human-readable reason for permanent failures."""
    if status_code == 400:
        return f'Bad request: {resp_body[:200]}'
    if status_code == 413:
        return 'File too large (max 100MB for sync, 500MB for async)'
    if status_code == 422:
        body = json.loads(resp_body) if resp_body.startswith('{') else {}
        code = body.get('error', {}).get('code', 'unknown')
        if code == 'encrypted':
            return 'File is password-protected'
        if code == 'corrupted':
            return 'File appears corrupted or truncated'
        if code == 'unsupported_variant':
            return 'File format variant not supported'
        return f'Unprocessable: {resp_body[:200]}'
    return f'HTTP {status_code}'

Resilient pipeline with quarantine queue

#!/usr/bin/env python3
import asyncio
import json
import os
import time
from pathlib import Path
from typing import Optional

import httpx

API_KEY = os.environ['CTF_API_KEY']
API_URL = 'https://changethisfile.com/v1/convert'
CONCURRENCY = 10
MAX_RETRIES = 3

# Persistent state files
DONE_FILE = Path('.pipeline_done.json')
FAILED_FILE = Path('.pipeline_failed.jsonl')
RETRY_FILE = Path('.pipeline_retry.jsonl')


def load_done() -> set:
    return set(json.loads(DONE_FILE.read_text())) if DONE_FILE.exists() else set()


def append_failure(kind: str, path: str, reason: str, attempt: int):
    with FAILED_FILE.open('a') as f:
        json.dump({'kind': kind, 'path': path, 'reason': reason,
                   'attempt': attempt, 'ts': time.time()}, f)
        f.write('\n')


async def convert_file(
    client: httpx.AsyncClient,
    path: Path,
    target: str,
    out_path: Path,
    sem: asyncio.Semaphore,
    done: set,
) -> Optional[bytes]:
    if str(out_path) in done or out_path.exists():
        return b''  # Skip

    async with sem:
        for attempt in range(1, MAX_RETRIES + 1):
            try:
                resp = await client.post(
                    API_URL,
                    headers={'Authorization': f'Bearer {API_KEY}'},
                    content=path.read_bytes(),
                    params={'target': target},
                    timeout=120,
                )

                kind = classify_error(resp.status_code, resp.text if resp.status_code >= 400 else '')

                if kind == FailureKind.AUTH:
                    raise SystemExit(f'Auth failed ({resp.status_code}) — check API key')

                if kind == FailureKind.PERMANENT:
                    reason = describe_permanent_error(resp.status_code, resp.text)
                    append_failure('permanent', str(path), reason, attempt)
                    return None

                if kind == FailureKind.RETRYABLE:
                    if resp.status_code == 429:
                        sleep = int(resp.headers.get('Retry-After', '60'))
                    else:
                        sleep = min(2 ** attempt, 64)
                    if attempt < MAX_RETRIES:
                        await asyncio.sleep(sleep)
                        continue
                    else:
                        append_failure('retryable_exhausted', str(path), f'HTTP {resp.status_code}', attempt)
                        return None

                # Success
                resp.raise_for_status()
                out_path.parent.mkdir(parents=True, exist_ok=True)
                out_path.write_bytes(resp.content)
                done.add(str(out_path))
                return resp.content

            except httpx.TimeoutException as e:
                if attempt < MAX_RETRIES:
                    await asyncio.sleep(min(2 ** attempt, 64))
                else:
                    append_failure('timeout', str(path), str(e), attempt)
                    return None
            except httpx.ConnectError as e:
                append_failure('network', str(path), str(e), attempt)
                return None

    return None


async def main(source_dir: Path, output_dir: Path, target: str):
    files = sorted(source_dir.rglob('*') if source_dir.is_dir() else [source_dir])
    files = [f for f in files if f.is_file()]
    done = load_done()

    sem = asyncio.Semaphore(CONCURRENCY)
    success = 0

    async with httpx.AsyncClient() as client:
        tasks = [
            convert_file(client, f, target, output_dir / f.relative_to(source_dir).with_suffix(f'.{target}'), sem, done)
            for f in files
        ]
        for i, coro in enumerate(asyncio.as_completed(tasks), 1):
            result = await coro
            if result is not None:
                success += 1
            if i % 100 == 0:
                DONE_FILE.write_text(json.dumps(sorted(done)))
                print(f'[{i}/{len(files)}] {success} ok')

    DONE_FILE.write_text(json.dumps(sorted(done)))

    # Count permanent failures
    permanent = retryable = 0
    if FAILED_FILE.exists():
        for line in FAILED_FILE.read_text().splitlines():
            entry = json.loads(line)
            if entry['kind'] == 'permanent':
                permanent += 1
            else:
                retryable += 1

    print(f'\nResults: {success} ok | {permanent} permanent failures | {retryable} retryable exhausted')
    if permanent > 0:
        print(f'Permanent failures logged to {FAILED_FILE}')
        print('Review and address individually — retrying will not help.')

Reviewing and triaging the failure queue

After a batch, analyze the failures file:

# Count failures by kind
python3 -c "
import json
from collections import Counter
lines = open('.pipeline_failed.jsonl').readlines()
entries = [json.loads(l) for l in lines]
print('By kind:', Counter(e['kind'] for e in entries))
print('By reason:', Counter(e['reason'] for e in entries))
"

Common patterns and remedies:

  • "File appears corrupted": Re-download from source. If the source is also corrupted, the file is unrecoverable — log and skip.
  • "File is password-protected": Decrypt with the password and retry: qpdf --decrypt --password=pw input.pdf output.pdf
  • "File too large": Split (PDFs: pdftk burst; video: ffmpeg segment) and convert in chunks.
  • "File format variant not supported": Check if the file's actual format matches its extension. A JPEG renamed to .png will fail — detect the real format with file command or python-magic, rename, and retry.
# Re-run only the failures
python3 -c "
import json
failures = [json.loads(l)['path'] for l in open('.pipeline_failed.jsonl')]
print('\n'.join(failures))
" | xargs -I{} python3 retry_single.py {} jpg

Failure rate monitoring and alerting

For automated pipelines, alert when the failure rate exceeds a threshold:

FAILURE_THRESHOLD = 0.05  # Alert if >5% fail

def check_failure_rate(total: int, failed: int) -> bool:
    rate = failed / total if total > 0 else 0
    if rate > FAILURE_THRESHOLD:
        # Send alert (email, Slack, PagerDuty, etc.)
        print(f'ALERT: {rate:.1%} failure rate ({failed}/{total}) exceeds {FAILURE_THRESHOLD:.0%} threshold')
        return True
    return False

Distinguish between correlated failures (all files from the same source, same format, same time window) and random failures. Correlated failures indicate a systemic issue (bad source data, unsupported format variant, API incident). Random failures at 1-2% are normal and expected.

A pipeline that finishes with 98% success and a clear failure log is far more useful than one that stops at 3% with an unhandled exception. Error classification, quarantine, and structured failure logging turn pipeline failures from incidents into routine maintenance. Free API key for testing your error handling before production loads.