The most common SaaS file conversion problem: you need to accept HEIC from iPhone users, but your image processing pipeline expects JPG. Or users upload DOC files but you store everything as PDF. Or your mobile app uploads WAV voice memos but you deliver MP3 to listeners.
This guide covers three patterns: synchronous inline conversion (simplest), async background processing (for large files), and the normalization-on-read pattern (laziest — convert only when first accessed).
TL;DR
- Small files (<5MB): convert inline during the upload request (synchronous)
- Large files (5-25MB): accept upload, queue conversion job, return job ID, notify when done
- Many formats supported: API detects source format from filename — no format validation needed before conversion
- Always store original: keep the original upload alongside the converted version for re-processing
Pattern 1: Synchronous conversion (Node.js)
For image uploads where you want to normalize to WebP immediately:
// Express.js upload handler with inline conversion
const express = require('express');
const multer = require('multer');
const FormData = require('form-data');
const fetch = require('node-fetch');
const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3');
const app = express();
const upload = multer({ storage: multer.memoryStorage(), limits: { fileSize: 25 * 1024 * 1024 } });
const s3 = new S3Client({ region: 'us-east-1' });
const CTF_API_KEY = process.env.CTF_API_KEY;
const BUCKET = process.env.S3_BUCKET;
// Formats we accept but need to normalize
const CONVERT_TO_WEBP = new Set(['.heic', '.heif', '.png', '.bmp', '.tiff', '.avif']);
const KEEP_AS_IS = new Set(['.jpg', '.jpeg', '.webp', '.gif']);
async function convertToWebP(buffer, filename) {
const form = new FormData();
form.append('file', buffer, { filename, contentType: 'application/octet-stream' });
form.append('target', 'webp');
const res = await fetch('https://changethisfile.com/v1/convert', {
method: 'POST',
headers: { 'Authorization': `Bearer ${CTF_API_KEY}`, ...form.getHeaders() },
body: form,
});
if (!res.ok) throw new Error(`Conversion failed: ${await res.text()}`);
return res.buffer();
}
app.post('/upload/avatar', upload.single('photo'), async (req, res) => {
try {
const { originalname, buffer, size } = req.file;
const ext = originalname.slice(originalname.lastIndexOf('.')).toLowerCase();
const userId = req.user.id; // from your auth middleware
let finalBuffer, finalExt;
if (CONVERT_TO_WEBP.has(ext)) {
console.log(`Converting ${ext} → webp (${(size/1024).toFixed(0)}KB)`);
finalBuffer = await convertToWebP(buffer, originalname);
finalExt = '.webp';
} else if (KEEP_AS_IS.has(ext)) {
finalBuffer = buffer;
finalExt = ext;
} else {
return res.status(422).json({ error: `Unsupported format: ${ext}` });
}
const key = `avatars/${userId}${finalExt}`;
await s3.send(new PutObjectCommand({
Bucket: BUCKET,
Key: key,
Body: finalBuffer,
ContentType: finalExt === '.webp' ? 'image/webp' : 'image/jpeg',
}));
res.json({
url: `https://${BUCKET}.s3.amazonaws.com/${key}`,
size_kb: Math.round(finalBuffer.length / 1024),
});
} catch (err) {
console.error('Upload error:', err);
res.status(500).json({ error: 'Upload processing failed' });
}
});
Pattern 2: Async background conversion (Python + Celery)
For larger files or high-traffic endpoints where conversion shouldn't block the HTTP response:
# app.py (FastAPI)
from fastapi import FastAPI, UploadFile, BackgroundTasks
from pathlib import Path
import uuid, io, requests, boto3, os
app = FastAPI()
s3 = boto3.client('s3')
BUCKET = os.environ['S3_BUCKET']
API_KEY = os.environ['CTF_API_KEY']
def convert_and_store(job_id: str, file_bytes: bytes, filename: str, target: str):
"""Background task: convert file and update job status."""
db = get_db() # your database connection
try:
resp = requests.post(
'https://changethisfile.com/v1/convert',
headers={'Authorization': f'Bearer {API_KEY}'},
files={'file': (filename, io.BytesIO(file_bytes))},
data={'target': target},
timeout=180,
)
resp.raise_for_status()
stem = Path(filename).stem
out_key = f'processed/{job_id}/{stem}.{target}'
s3.put_object(Bucket=BUCKET, Key=out_key, Body=resp.content)
db.execute(
'UPDATE jobs SET status=?, output_key=? WHERE id=?',
('done', out_key, job_id)
)
except Exception as e:
db.execute('UPDATE jobs SET status=?, error=? WHERE id=?', ('failed', str(e), job_id))
@app.post('/documents/upload')
async def upload_document(file: UploadFile, background_tasks: BackgroundTasks):
content = await file.read()
job_id = str(uuid.uuid4())
db = get_db()
# Determine target format by source extension
ext = Path(file.filename).suffix.lower()
target_map = {'.doc': 'pdf', '.docx': 'pdf', '.rtf': 'pdf', '.odt': 'pdf'}
target = target_map.get(ext, 'pdf')
# Store original immediately
s3.put_object(Bucket=BUCKET, Key=f'originals/{job_id}/{file.filename}', Body=content)
db.execute('INSERT INTO jobs (id, status, filename) VALUES (?, ?, ?)', (job_id, 'pending', file.filename))
# Queue conversion in background
background_tasks.add_task(convert_and_store, job_id, content, file.filename, target)
return {'job_id': job_id, 'status': 'pending'}
@app.get('/documents/{job_id}/status')
async def job_status(job_id: str):
db = get_db()
job = db.execute('SELECT status, output_key, error FROM jobs WHERE id=?', (job_id,)).fetchone()
if not job:
raise HTTPException(404)
return {'status': job['status'], 'output_key': job['output_key'], 'error': job['error']}
Pattern 3: Webhook notification on completion
// After conversion completes in background job, notify user via webhook
async function notifyConversionComplete(userId, jobId, outputUrl) {
// Option 1: WebSocket (if user is still on page)
io.to(`user:${userId}`).emit('conversion_complete', { jobId, outputUrl });
// Option 2: Push notification (mobile)
await sendPushNotification(userId, {
title: 'Your file is ready',
body: 'Your converted document is ready to download',
data: { jobId, outputUrl }
});
// Option 3: Email
await sendEmail(userId, {
subject: 'File conversion complete',
template: 'conversion-ready',
data: { outputUrl }
});
}
Input validation before conversion
from pathlib import Path
# Allowlist of accepted extensions (prevents arbitrary file attacks)
ALLOWED_EXTENSIONS = {
'.heic', '.heif', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.tiff', # images
'.mp4', '.mov', '.avi', '.mkv', '.webm', # video
'.mp3', '.wav', '.flac', '.aac', '.ogg', # audio
'.pdf', '.docx', '.doc', '.rtf', '.odt', # documents
}
# Max upload size per format category
MAX_SIZE = {
'image': 10 * 1024 * 1024, # 10MB
'audio': 100 * 1024 * 1024, # 100MB
'video': 500 * 1024 * 1024, # 500MB — only on paid plans
'document': 25 * 1024 * 1024, # 25MB
}
def validate_upload(filename: str, size: int) -> tuple[bool, str]:
ext = Path(filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
return False, f"File type {ext} is not supported"
category = get_category(ext) # your mapping function
if size > MAX_SIZE[category]:
limit_mb = MAX_SIZE[category] // (1024*1024)
return False, f"File too large. {category} files must be under {limit_mb}MB"
return True, ""
Edge cases and gotchas
- Always store the original. Store the user's original file alongside the converted version. If you re-process later (better quality settings, new target format), you have the source. A user who uploads a HEIC will want their original HEIC back if they ask for a refund or data export.
- Content-Type spoofing. Don't trust the Content-Type header from the upload. A user can rename a malicious file to image.jpg. Validate file type by magic bytes (use python-magic or file-type npm package) rather than extension alone.
- Concurrent upload storms. If 100 users upload simultaneously, 100 concurrent API requests may exceed your plan's concurrency limit. Add a semaphore or queue in front of the conversion calls.
- Partial upload failures. If the upload succeeds but conversion fails, the user sees a broken state. Track job status in your database: pending → processing → done | failed. Always surface the status to the user.
- Filename sanitization. User-supplied filenames may contain path traversal characters (
../), null bytes, or special characters. Sanitize before using in API calls or storage keys:secure_filename()in Flask, or a custom regex replacement.
Scaling considerations
At higher volumes, move to a proper job queue:
# Using Celery + Redis for high-volume conversion
from celery import Celery
app = Celery('conversion', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3, default_retry_delay=30)
def convert_file_task(self, job_id: str, s3_key: str, target: str):
try:
# Download from S3
obj = boto3.client('s3').get_object(Bucket=BUCKET, Key=s3_key)
file_bytes = obj['Body'].read()
filename = s3_key.rsplit('/', 1)[-1]
# Convert via API
resp = requests.post(
'https://changethisfile.com/v1/convert',
headers={'Authorization': f'Bearer {API_KEY}'},
files={'file': (filename, io.BytesIO(file_bytes))},
data={'target': target},
timeout=180,
)
resp.raise_for_status()
# Store output
out_key = f'converted/{job_id}.{target}'
boto3.client('s3').put_object(Bucket=BUCKET, Key=out_key, Body=resp.content)
update_job_status(job_id, 'done', out_key)
except requests.HTTPError as exc:
# Retry on 5xx, don't retry on 4xx
if exc.response.status_code >= 500:
raise self.retry(exc=exc)
update_job_status(job_id, 'failed', error=str(exc))
The synchronous pattern is two API calls (upload + convert) and 30 lines of code. The async pattern with job tracking is the right architecture for production SaaS — it survives API timeouts, allows retries, and gives users clear status feedback. Get a free API key — free tier covers 1,000 conversions/month for early-stage apps.