Polling is the easy path but it wastes connections, burns Lambda invocations, and complicates your code with sleep loops. The webhook pattern is the right architecture for production: submit conversions and handle results asynchronously when they arrive. This guide covers the full receiver implementation — signature verification, idempotent result storage, retry handling, and failure modes — in Python (FastAPI) and JavaScript (fetch).

TL;DR — the webhook flow in 4 steps

  1. Submit: POST to /v1/jobs with webhook_url param pointing at your receiver endpoint
  2. Store: Save the returned job_id → user/file mapping in your database
  3. Receive: The API POSTs a JSON payload to your webhook URL when conversion completes (or fails)
  4. Process: Verify the HMAC signature, download the file from download_url, update your database

The webhook fires within 1-5 seconds of job completion. Download URLs expire after 1 hour — your receiver must download promptly or re-poll the job for a fresh URL.

The API retries failed webhook deliveries 3 times with exponential backoff (30s, 5m, 30m). Return HTTP 2xx quickly (before any long work) to acknowledge receipt, then process in the background.

Submitting a job with a webhook URL

import os
import asyncio
import json
from pathlib import Path
import httpx

API_KEY = os.environ['CTF_API_KEY']
WEBHOOK_URL = os.environ['WEBHOOK_URL']  # e.g. https://yourapp.com/webhooks/ctf
WEBHOOK_SECRET = os.environ['CTF_WEBHOOK_SECRET']  # From API key settings

async def submit_conversion_job(
    file_path: Path,
    target: str,
    metadata: dict,
) -> str:
    """Submit an async conversion job. Returns job_id."""
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            'https://changethisfile.com/v1/jobs',
            headers={'Authorization': f'Bearer {API_KEY}'},
            content=file_path.read_bytes(),
            params={
                'target': target,
                'filename': file_path.name,
                'webhook_url': WEBHOOK_URL,
                # Pass arbitrary metadata back to your webhook receiver
                'webhook_metadata': json.dumps(metadata),
            },
            timeout=60,
        )
    resp.raise_for_status()
    job_data = resp.json()
    return job_data['job_id']


# Example: submit a video conversion
job_id = asyncio.run(submit_conversion_job(
    Path('upload.avi'),
    'mp4',
    metadata={'user_id': 'u_123', 'upload_id': 'ul_456'},
))
print(f'Job submitted: {job_id}')

The webhook_metadata param passes a JSON string that the API echoes back verbatim in the webhook payload. Use this to carry the context your receiver needs (user ID, upload ID, S3 key) without querying your database in the receiver.

Python receiver (FastAPI)

# webhook_receiver.py
import hashlib
import hmac
import json
import os
import time
from pathlib import Path

import httpx
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks

app = FastAPI()
WEBHOOK_SECRET = os.environ['CTF_WEBHOOK_SECRET'].encode()


def verify_signature(payload: bytes, signature_header: str) -> bool:
    """Verify HMAC-SHA256 webhook signature."""
    # Header format: sha256=
    if not signature_header.startswith('sha256='):
        return False
    expected_sig = signature_header[7:]
    computed_sig = hmac.new(WEBHOOK_SECRET, payload, hashlib.sha256).hexdigest()
    # Constant-time comparison to prevent timing attacks
    return hmac.compare_digest(computed_sig, expected_sig)


async def handle_conversion_result(payload: dict):
    """Background task: download result and update storage."""
    job_id = payload['job_id']
    state = payload['state']
    metadata = payload.get('metadata', {})
    user_id = metadata.get('user_id')
    upload_id = metadata.get('upload_id')

    if state == 'failed':
        error = payload.get('error', {})
        print(f'Job {job_id} failed: {error}')
        # Update DB: mark upload as failed, notify user
        await update_upload_status(upload_id, 'failed', error=error)
        return

    if state == 'done':
        download_url = payload['download_url']
        # Download within 1 hour — URL expires
        async with httpx.AsyncClient() as client:
            dl_resp = await client.get(download_url, timeout=300)
            dl_resp.raise_for_status()
            result_bytes = dl_resp.content

        # Save to your storage (S3, disk, etc.)
        output_path = Path(f'outputs/{upload_id}.mp4')
        output_path.write_bytes(result_bytes)

        await update_upload_status(upload_id, 'done', output_path=str(output_path))
        print(f'Job {job_id} done for user {user_id}, {len(result_bytes)} bytes')


async def update_upload_status(upload_id, status, **kwargs):
    """Stub — replace with your DB call."""
    print(f'Upload {upload_id}: {status} {kwargs}')


@app.post('/webhooks/ctf')
async def ctf_webhook(request: Request, background_tasks: BackgroundTasks):
    payload_bytes = await request.body()
    signature = request.headers.get('X-CTF-Signature', '')

    if not verify_signature(payload_bytes, signature):
        raise HTTPException(status_code=401, detail='Invalid signature')

    payload = json.loads(payload_bytes)

    # Ack immediately — process in background
    background_tasks.add_task(handle_conversion_result, payload)
    return {'status': 'ok'}

JavaScript receiver (Cloudflare Worker)

// Cloudflare Worker webhook receiver
export default {
  async fetch(request, env) {
    if (request.method !== 'POST' || new URL(request.url).pathname !== '/webhooks/ctf') {
      return new Response('Not found', { status: 404 });
    }

    const bodyBytes = await request.arrayBuffer();
    const signature = request.headers.get('X-CTF-Signature') || '';

    // Verify HMAC-SHA256
    const secret = new TextEncoder().encode(env.CTF_WEBHOOK_SECRET);
    const key = await crypto.subtle.importKey('raw', secret, { name: 'HMAC', hash: 'SHA-256' }, false, ['sign']);
    const expectedSig = await crypto.subtle.sign('HMAC', key, bodyBytes);
    const expectedHex = Array.from(new Uint8Array(expectedSig))
      .map(b => b.toString(16).padStart(2, '0'))
      .join('');

    const receivedHex = signature.replace('sha256=', '');
    if (expectedHex !== receivedHex) {
      return new Response('Unauthorized', { status: 401 });
    }

    const payload = JSON.parse(new TextDecoder().decode(bodyBytes));
    const { job_id, state, download_url, metadata } = payload;

    // Ack immediately
    if (state === 'done') {
      // Download result in background (Cloudflare Workers: use waitUntil)
      const ctx = env; // passed via handler signature in real Workers
      // Download and save to R2
      const dlResp = await fetch(download_url);
      const resultBytes = await dlResp.arrayBuffer();
      const key = `conversions/${metadata.upload_id}.mp4`;
      await env.CONVERSION_BUCKET.put(key, resultBytes);
      // Update D1 or KV with status
      await env.DB.prepare('UPDATE uploads SET status=?, output_key=? WHERE id=?')
        .bind('done', key, metadata.upload_id)
        .run();
    } else if (state === 'failed') {
      await env.DB.prepare('UPDATE uploads SET status=?, error=? WHERE id=?')
        .bind('failed', JSON.stringify(payload.error), metadata.upload_id)
        .run();
    }

    return Response.json({ ok: true });
  },
};

Signature verification deep dive

The X-CTF-Signature header contains sha256= where the digest is HMAC-SHA256 of the raw request body using your webhook secret as the key. Three implementation notes:

  1. Use the raw body bytes — don't parse JSON first. Any whitespace normalization changes the signature.
  2. Use constant-time comparison (hmac.compare_digest in Python, or length-constant comparison in JS) — timing attacks can recover secrets from variable-time string comparison.
  3. Rotate secrets periodically — webhook secrets should be rotated every 90 days. The API supports two active secrets during rotation (old + new) so you don't drop deliveries.
# The signature verification is 3 lines
import hashlib, hmac

def verify(body: bytes, header: str, secret: str) -> bool:
    sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(sig, header.removeprefix('sha256='))

Handling webhook retries and deduplication

The API retries failed deliveries (non-2xx response, timeout) 3 times: after 30s, 5 minutes, and 30 minutes. Your receiver may process the same webhook payload multiple times. Make it idempotent:

PROCESSED_JOBS = set()  # In production: use Redis or DB

@app.post('/webhooks/ctf')
async def ctf_webhook(request: Request, background_tasks: BackgroundTasks):
    payload_bytes = await request.body()
    # ... signature verification ...

    payload = json.loads(payload_bytes)
    job_id = payload['job_id']

    if job_id in PROCESSED_JOBS:
        return {'status': 'already_processed'}  # 200 to stop retries

    PROCESSED_JOBS.add(job_id)
    background_tasks.add_task(handle_conversion_result, payload)
    return {'status': 'ok'}

Return HTTP 200 as quickly as possible. If your receiver returns 5xx or times out, the API will retry — even if you've already partially processed the job. The deduplication check prevents double-processing on retries.

The webhook pattern is the production-grade approach for any conversion pipeline that processes more than a handful of files. It scales to thousands of concurrent jobs without polling overhead, works cleanly in serverless environments, and handles failure (with retries) automatically. Get a free API key and test your webhook receiver locally with a tunneling tool like ngrok.