Secure Webhook Handling¶
Learn how to securely receive and process asynchronous events from Circuit.
Overview¶
Circuit uses webhooks to notify your application when:
- Data ingestion completes (202 Accepted → completed/failed)
- Billing events occur (subscription changes, payment status)
- Compliance events trigger (data deletion requests)
The 202 Accepted Pattern¶
When you call /ingest, you receive an immediate 202 Accepted response with a batch ID:
This means the data was received and queued for processing. Processing happens asynchronously.
Polling vs Webhooks¶
Polling (simple but inefficient):
import time
batch_id = result.batch_id
while True:
status = client.get_ingestion_status(batch_id)
if status.status in ["completed", "failed"]:
break
time.sleep(5) # Don't hammer the API
Webhooks (recommended for production):
Register a webhook endpoint and receive a POST when processing completes.
Setting Up Webhooks¶
Step 1: Create an Endpoint¶
Create an endpoint in your application to receive webhook events:
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
WEBHOOK_SECRET = "whsec_your-secret-here"
@app.post("/webhooks/circuit")
async def handle_circuit_webhook(request: Request):
# Get the signature from headers
signature = request.headers.get("X-Circuit-Signature")
timestamp = request.headers.get("X-Circuit-Timestamp")
if not signature or not timestamp:
raise HTTPException(status_code=400, detail="Missing signature")
# Get the raw body
body = await request.body()
# Verify the signature
if not verify_signature(body, timestamp, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse and handle the event
event = await request.json()
await process_event(event)
return {"received": True}
def verify_signature(body: bytes, timestamp: str, signature: str) -> bool:
"""Verify the webhook signature."""
payload = f"{timestamp}.{body.decode()}"
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
async def process_event(event: dict):
"""Process the webhook event."""
event_type = event.get("type")
if event_type == "ingestion.completed":
batch_id = event["data"]["batch_id"]
records_processed = event["data"]["records_processed"]
print(f"Ingestion {batch_id} completed: {records_processed} records")
elif event_type == "ingestion.failed":
batch_id = event["data"]["batch_id"]
errors = event["data"]["errors"]
print(f"Ingestion {batch_id} failed: {errors}")
elif event_type == "billing.low_balance":
balance = event["data"]["current_balance"]
print(f"Low balance warning: {balance} credits remaining")
import express from 'express';
import crypto from 'crypto';
const app = express();
const WEBHOOK_SECRET = 'whsec_your-secret-here';
// Use raw body for signature verification
app.use('/webhooks/circuit', express.raw({ type: 'application/json' }));
app.post('/webhooks/circuit', (req, res) => {
const signature = req.headers['x-circuit-signature'] as string;
const timestamp = req.headers['x-circuit-timestamp'] as string;
if (!signature || !timestamp) {
return res.status(400).json({ error: 'Missing signature' });
}
// Verify signature
if (!verifySignature(req.body, timestamp, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
const event = JSON.parse(req.body.toString());
processEvent(event);
res.json({ received: true });
});
function verifySignature(
body: Buffer,
timestamp: string,
signature: string
): boolean {
const payload = `${timestamp}.${body.toString()}`;
const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(payload)
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(`sha256=${expected}`),
Buffer.from(signature)
);
}
function processEvent(event: Record<string, any>) {
const eventType = event.type;
switch (eventType) {
case 'ingestion.completed':
console.log(`Ingestion ${event.data.batch_id} completed`);
break;
case 'ingestion.failed':
console.log(`Ingestion ${event.data.batch_id} failed`);
break;
case 'billing.low_balance':
console.log(`Low balance: ${event.data.current_balance}`);
break;
}
}
Step 2: Register Your Endpoint¶
Register your webhook URL in the Circuit dashboard or via API:
curl -X POST https://api.circuit-kyc.com/api/v1/webhooks \
-H "X-API-Key: $CIRCUIT_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"url": "https://your-app.com/webhooks/circuit",
"events": ["ingestion.completed", "ingestion.failed", "billing.low_balance"]
}'
Step 3: Store Your Webhook Secret¶
You'll receive a webhook secret (whsec_...). Store this securely:
Event Types¶
Ingestion Events¶
| Event | Description |
|---|---|
ingestion.accepted |
Batch received and queued |
ingestion.processing |
Processing started |
ingestion.completed |
All records processed |
ingestion.failed |
Processing failed |
Example payload:
{
"id": "evt_abc123",
"type": "ingestion.completed",
"created_at": "2024-01-15T10:30:00Z",
"data": {
"batch_id": "batch_xyz789",
"records_received": 100,
"records_processed": 98,
"records_failed": 2,
"errors": [
{"record_index": 45, "error": "Invalid email format"},
{"record_index": 67, "error": "Missing required field: kyc_level"}
]
}
}
Billing Events¶
| Event | Description |
|---|---|
billing.low_balance |
Credits below warning threshold (20%) |
billing.critical_balance |
Credits below critical threshold (5%) |
billing.credits_purchased |
Credits added to account |
billing.subscription_changed |
Tier upgraded/downgraded |
Signature Verification¶
Always verify webhook signatures to ensure the request came from Circuit.
The signature is computed as:
Where:
- webhook_secret is your whsec_... secret
- timestamp is the Unix timestamp from X-Circuit-Timestamp
- body is the raw request body
Preventing Replay Attacks¶
Check that the timestamp is recent (within 5 minutes):
import time
def is_timestamp_valid(timestamp: str, tolerance: int = 300) -> bool:
"""Check if timestamp is within tolerance (default 5 minutes)."""
try:
ts = int(timestamp)
now = int(time.time())
return abs(now - ts) < tolerance
except ValueError:
return False
Best Practices¶
1. Return 200 Quickly¶
Acknowledge the webhook immediately, then process asynchronously:
@app.post("/webhooks/circuit")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
event = await request.json()
# Add to background queue - don't block
background_tasks.add_task(process_event, event)
return {"received": True}
2. Handle Duplicates¶
Circuit may retry failed deliveries. Use the event ID for idempotency:
processed_events = set() # Use Redis in production
async def process_event(event: dict):
event_id = event["id"]
if event_id in processed_events:
return # Already processed
processed_events.add(event_id)
# Process the event...
3. Log Everything¶
Log webhook events for debugging:
import logging
logger = logging.getLogger("webhooks")
async def process_event(event: dict):
logger.info(
"Webhook received",
extra={
"event_id": event["id"],
"event_type": event["type"],
}
)
4. Test with CLI¶
Use the Circuit CLI to send test webhooks:
Troubleshooting¶
Webhook not received¶
- Check your endpoint is publicly accessible
- Verify the URL is correct in the dashboard
- Check your firewall allows Circuit IPs
Signature verification fails¶
- Use the raw body (not parsed JSON) for verification
- Check your webhook secret is correct
- Ensure timestamp is included in the signature payload
Events processed multiple times¶
- Implement idempotency using event IDs
- Use a database or Redis to track processed events