Polling is the easy path but it wastes connections, burns Lambda invocations, and complicates your code with sleep loops. The webhook pattern is the right architecture for production: submit conversions and handle results asynchronously when they arrive. This guide covers the full receiver implementation — signature verification, idempotent result storage, retry handling, and failure modes — in Python (FastAPI) and JavaScript (fetch).
TL;DR — the webhook flow in 4 steps
- Submit: POST to
/v1/jobswithwebhook_urlparam pointing at your receiver endpoint - Store: Save the returned
job_id→ user/file mapping in your database - Receive: The API POSTs a JSON payload to your webhook URL when conversion completes (or fails)
- Process: Verify the HMAC signature, download the file from
download_url, update your database
The webhook fires within 1-5 seconds of job completion. Download URLs expire after 1 hour — your receiver must download promptly or re-poll the job for a fresh URL.
The API retries failed webhook deliveries 3 times with exponential backoff (30s, 5m, 30m). Return HTTP 2xx quickly (before any long work) to acknowledge receipt, then process in the background.
Submitting a job with a webhook URL
import os
import asyncio
import json
from pathlib import Path
import httpx
API_KEY = os.environ['CTF_API_KEY']
WEBHOOK_URL = os.environ['WEBHOOK_URL'] # e.g. https://yourapp.com/webhooks/ctf
WEBHOOK_SECRET = os.environ['CTF_WEBHOOK_SECRET'] # From API key settings
async def submit_conversion_job(
file_path: Path,
target: str,
metadata: dict,
) -> str:
"""Submit an async conversion job. Returns job_id."""
async with httpx.AsyncClient() as client:
resp = await client.post(
'https://changethisfile.com/v1/jobs',
headers={'Authorization': f'Bearer {API_KEY}'},
content=file_path.read_bytes(),
params={
'target': target,
'filename': file_path.name,
'webhook_url': WEBHOOK_URL,
# Pass arbitrary metadata back to your webhook receiver
'webhook_metadata': json.dumps(metadata),
},
timeout=60,
)
resp.raise_for_status()
job_data = resp.json()
return job_data['job_id']
# Example: submit a video conversion
job_id = asyncio.run(submit_conversion_job(
Path('upload.avi'),
'mp4',
metadata={'user_id': 'u_123', 'upload_id': 'ul_456'},
))
print(f'Job submitted: {job_id}')
The webhook_metadata param passes a JSON string that the API echoes back verbatim in the webhook payload. Use this to carry the context your receiver needs (user ID, upload ID, S3 key) without querying your database in the receiver.
Python receiver (FastAPI)
# webhook_receiver.py
import hashlib
import hmac
import json
import os
import time
from pathlib import Path
import httpx
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
app = FastAPI()
WEBHOOK_SECRET = os.environ['CTF_WEBHOOK_SECRET'].encode()
def verify_signature(payload: bytes, signature_header: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
# Header format: sha256=
if not signature_header.startswith('sha256='):
return False
expected_sig = signature_header[7:]
computed_sig = hmac.new(WEBHOOK_SECRET, payload, hashlib.sha256).hexdigest()
# Constant-time comparison to prevent timing attacks
return hmac.compare_digest(computed_sig, expected_sig)
async def handle_conversion_result(payload: dict):
"""Background task: download result and update storage."""
job_id = payload['job_id']
state = payload['state']
metadata = payload.get('metadata', {})
user_id = metadata.get('user_id')
upload_id = metadata.get('upload_id')
if state == 'failed':
error = payload.get('error', {})
print(f'Job {job_id} failed: {error}')
# Update DB: mark upload as failed, notify user
await update_upload_status(upload_id, 'failed', error=error)
return
if state == 'done':
download_url = payload['download_url']
# Download within 1 hour — URL expires
async with httpx.AsyncClient() as client:
dl_resp = await client.get(download_url, timeout=300)
dl_resp.raise_for_status()
result_bytes = dl_resp.content
# Save to your storage (S3, disk, etc.)
output_path = Path(f'outputs/{upload_id}.mp4')
output_path.write_bytes(result_bytes)
await update_upload_status(upload_id, 'done', output_path=str(output_path))
print(f'Job {job_id} done for user {user_id}, {len(result_bytes)} bytes')
async def update_upload_status(upload_id, status, **kwargs):
"""Stub — replace with your DB call."""
print(f'Upload {upload_id}: {status} {kwargs}')
@app.post('/webhooks/ctf')
async def ctf_webhook(request: Request, background_tasks: BackgroundTasks):
payload_bytes = await request.body()
signature = request.headers.get('X-CTF-Signature', '')
if not verify_signature(payload_bytes, signature):
raise HTTPException(status_code=401, detail='Invalid signature')
payload = json.loads(payload_bytes)
# Ack immediately — process in background
background_tasks.add_task(handle_conversion_result, payload)
return {'status': 'ok'}
JavaScript receiver (Cloudflare Worker)
// Cloudflare Worker webhook receiver
export default {
async fetch(request, env) {
if (request.method !== 'POST' || new URL(request.url).pathname !== '/webhooks/ctf') {
return new Response('Not found', { status: 404 });
}
const bodyBytes = await request.arrayBuffer();
const signature = request.headers.get('X-CTF-Signature') || '';
// Verify HMAC-SHA256
const secret = new TextEncoder().encode(env.CTF_WEBHOOK_SECRET);
const key = await crypto.subtle.importKey('raw', secret, { name: 'HMAC', hash: 'SHA-256' }, false, ['sign']);
const expectedSig = await crypto.subtle.sign('HMAC', key, bodyBytes);
const expectedHex = Array.from(new Uint8Array(expectedSig))
.map(b => b.toString(16).padStart(2, '0'))
.join('');
const receivedHex = signature.replace('sha256=', '');
if (expectedHex !== receivedHex) {
return new Response('Unauthorized', { status: 401 });
}
const payload = JSON.parse(new TextDecoder().decode(bodyBytes));
const { job_id, state, download_url, metadata } = payload;
// Ack immediately
if (state === 'done') {
// Download result in background (Cloudflare Workers: use waitUntil)
const ctx = env; // passed via handler signature in real Workers
// Download and save to R2
const dlResp = await fetch(download_url);
const resultBytes = await dlResp.arrayBuffer();
const key = `conversions/${metadata.upload_id}.mp4`;
await env.CONVERSION_BUCKET.put(key, resultBytes);
// Update D1 or KV with status
await env.DB.prepare('UPDATE uploads SET status=?, output_key=? WHERE id=?')
.bind('done', key, metadata.upload_id)
.run();
} else if (state === 'failed') {
await env.DB.prepare('UPDATE uploads SET status=?, error=? WHERE id=?')
.bind('failed', JSON.stringify(payload.error), metadata.upload_id)
.run();
}
return Response.json({ ok: true });
},
};
Signature verification deep dive
The X-CTF-Signature header contains sha256= where the digest is HMAC-SHA256 of the raw request body using your webhook secret as the key. Three implementation notes:
- Use the raw body bytes — don't parse JSON first. Any whitespace normalization changes the signature.
- Use constant-time comparison (
hmac.compare_digestin Python, or length-constant comparison in JS) — timing attacks can recover secrets from variable-time string comparison. - Rotate secrets periodically — webhook secrets should be rotated every 90 days. The API supports two active secrets during rotation (old + new) so you don't drop deliveries.
# The signature verification is 3 lines
import hashlib, hmac
def verify(body: bytes, header: str, secret: str) -> bool:
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, header.removeprefix('sha256='))
Handling webhook retries and deduplication
The API retries failed deliveries (non-2xx response, timeout) 3 times: after 30s, 5 minutes, and 30 minutes. Your receiver may process the same webhook payload multiple times. Make it idempotent:
PROCESSED_JOBS = set() # In production: use Redis or DB
@app.post('/webhooks/ctf')
async def ctf_webhook(request: Request, background_tasks: BackgroundTasks):
payload_bytes = await request.body()
# ... signature verification ...
payload = json.loads(payload_bytes)
job_id = payload['job_id']
if job_id in PROCESSED_JOBS:
return {'status': 'already_processed'} # 200 to stop retries
PROCESSED_JOBS.add(job_id)
background_tasks.add_task(handle_conversion_result, payload)
return {'status': 'ok'}
Return HTTP 200 as quickly as possible. If your receiver returns 5xx or times out, the API will retry — even if you've already partially processed the job. The deduplication check prevents double-processing on retries.
The webhook pattern is the production-grade approach for any conversion pipeline that processes more than a handful of files. It scales to thousands of concurrent jobs without polling overhead, works cleanly in serverless environments, and handles failure (with retries) automatically. Get a free API key and test your webhook receiver locally with a tunneling tool like ngrok.