Verifying Signatures
Every Warpweb webhook is signed with HMAC-SHA256. This is the single most important thing to get right — without verification, anyone who learns your URL can forge submissions.
Copy a snippet for your stack below, then read the scheme and common mistakes sections if anything surprises you.
Node.js (Express)
import crypto from 'node:crypto'
import express from 'express'
const app = express()
const WEBHOOK_SECRET = process.env.WARPWEB_WEBHOOK_SECRET
const REPLAY_WINDOW_SECONDS = 300
app.post(
'/webhooks/warpweb-leads',
express.raw({ type: 'application/json' }), // raw, not json
(req, res) => {
const signature = req.headers['x-warpweb-signature']
const timestamp = req.headers['x-warpweb-timestamp']
if (!signature || !signature.startsWith('sha256=') || !timestamp) {
return res.status(401).send('missing signature or timestamp')
}
// Replay protection
const now = Math.floor(Date.now() / 1000)
if (Math.abs(now - Number(timestamp)) > REPLAY_WINDOW_SECONDS) {
return res.status(401).send('stale timestamp')
}
const signingString = `${timestamp}.${req.body.toString('utf8')}`
const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(signingString)
.digest('hex')
const provided = signature.slice('sha256='.length)
const valid =
provided.length === expected.length &&
crypto.timingSafeEqual(
Buffer.from(provided, 'hex'),
Buffer.from(expected, 'hex'),
)
if (!valid) return res.status(401).send('invalid signature')
const payload = JSON.parse(req.body.toString('utf8'))
// ...handle the event
res.status(200).send('ok')
},
)Node.js (Next.js App Router)
// app/api/webhooks/warpweb-leads/route.ts
import crypto from 'node:crypto'
import { NextRequest, NextResponse } from 'next/server'
const WEBHOOK_SECRET = process.env.WARPWEB_WEBHOOK_SECRET!
const REPLAY_WINDOW_SECONDS = 300
export async function POST(req: NextRequest) {
const rawBody = await req.text()
const signature = req.headers.get('x-warpweb-signature') ?? ''
const timestamp = req.headers.get('x-warpweb-timestamp') ?? ''
if (!signature.startsWith('sha256=') || !timestamp) {
return new NextResponse('missing signature or timestamp', { status: 401 })
}
const now = Math.floor(Date.now() / 1000)
if (Math.abs(now - Number(timestamp)) > REPLAY_WINDOW_SECONDS) {
return new NextResponse('stale timestamp', { status: 401 })
}
const signingString = `${timestamp}.${rawBody}`
const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(signingString, 'utf8')
.digest('hex')
const provided = signature.slice('sha256='.length)
const valid =
provided.length === expected.length &&
crypto.timingSafeEqual(
Buffer.from(provided, 'hex'),
Buffer.from(expected, 'hex'),
)
if (!valid) return new NextResponse('invalid signature', { status: 401 })
const payload = JSON.parse(rawBody)
// ...handle the event
return NextResponse.json({ ok: true })
}Python (Flask)
import hmac
import hashlib
import os
import time
from flask import Flask, request, abort
app = Flask(__name__)
WEBHOOK_SECRET = os.environ['WARPWEB_WEBHOOK_SECRET'].encode()
REPLAY_WINDOW_SECONDS = 300
@app.post('/webhooks/warpweb-leads')
def warpweb_webhook():
signature = request.headers.get('X-Warpweb-Signature', '')
timestamp = request.headers.get('X-Warpweb-Timestamp', '')
if not signature.startswith('sha256=') or not timestamp:
abort(401, 'missing signature or timestamp')
if abs(int(time.time()) - int(timestamp)) > REPLAY_WINDOW_SECONDS:
abort(401, 'stale timestamp')
raw_body = request.get_data() # raw bytes, do not use request.json
signing_string = f"{timestamp}.{raw_body.decode('utf-8')}".encode()
expected = hmac.new(WEBHOOK_SECRET, signing_string, hashlib.sha256).hexdigest()
provided = signature[len('sha256='):]
if not hmac.compare_digest(provided, expected):
abort(401, 'invalid signature')
payload = request.get_json()
# ...handle the event
return {'ok': True}, 200Python (FastAPI)
import hmac
import hashlib
import os
import time
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
WEBHOOK_SECRET = os.environ['WARPWEB_WEBHOOK_SECRET'].encode()
REPLAY_WINDOW_SECONDS = 300
@app.post('/webhooks/warpweb-leads')
async def warpweb_webhook(request: Request):
raw_body = await request.body()
signature = request.headers.get('x-warpweb-signature', '')
timestamp = request.headers.get('x-warpweb-timestamp', '')
if not signature.startswith('sha256=') or not timestamp:
raise HTTPException(status_code=401, detail='missing signature or timestamp')
if abs(int(time.time()) - int(timestamp)) > REPLAY_WINDOW_SECONDS:
raise HTTPException(status_code=401, detail='stale timestamp')
signing_string = f"{timestamp}.{raw_body.decode('utf-8')}".encode()
expected = hmac.new(WEBHOOK_SECRET, signing_string, hashlib.sha256).hexdigest()
provided = signature[len('sha256='):]
if not hmac.compare_digest(provided, expected):
raise HTTPException(status_code=401, detail='invalid signature')
import json
payload = json.loads(raw_body)
# ...handle the event
return {'ok': True}curl (one-off verification)
Useful for debugging in production or in a Bash-only environment:
# NOTE: the elisions below (`...`) are placeholders for illustration only.
# This snippet won't run as-is — paste your real timestamp + raw body bytes
# before running. The signature depends on byte-exact body content, so make
# sure you capture the body before any JSON parsing or re-serialization.
TS="1716045731"
RAW_BODY='{"event_id":"7d0b368c-...","type":"form.submit","site_id":"...","occurred_at":"2026-05-17T15:42:11Z","payload":{...}}'
EXPECTED=$(printf '%s.%s' "$TS" "$RAW_BODY" | openssl dgst -sha256 -hmac "$WARPWEB_WEBHOOK_SECRET" | awk '{print $2}')
echo "sha256=$EXPECTED"
# Compare to the X-Warpweb-Signature header valueThe scheme
- Algorithm: HMAC-SHA256
- Key: the signing secret you stored at configure time (
secret_issuedfromPOST /v1/sites/:id/webhooks/forms, orsecret_plaintextfromPUT /v1/customer/webhook) - Input:
${timestamp}.${raw_body}— the timestamp fromX-Warpweb-Timestamp, a literal., then the raw request body bytes (before any JSON parsing or re-serialization) - Format: hex-encoded digest, prefixed with
sha256= - Header:
X-Warpweb-Signature - Replay window: reject any request whose
X-Warpweb-Timestampis more than 300 seconds (5 minutes) from your server’s wall clock
Two things will trip you up if your snippet drifts from the examples above:
- Verify against the raw body, not parsed JSON. Parsing strips whitespace, re-orders keys, and breaks the signature. In Express, use
express.raw()on the webhook route, notexpress.json(). In Next.js App Router, readreq.text()(orreq.arrayBuffer()) andJSON.parse()only after verification. - Use a constant-time comparison.
crypto.timingSafeEqual/hmac.compare_digest. Plain===leaks how many characters matched.
Common mistakes
- Forgetting the timestamp prefix. The signature is over
${timestamp}.${raw_body}, not just the body. Both pieces must be included with the literal.separator. - Skipping replay protection. Without the timestamp-window check, a captured request can be replayed indefinitely. Always enforce the ±300s window.
- Parsing JSON before computing the signature. Re-serialization is not byte-identical. Always use the raw bytes.
- Using
req.bodyafter a JSON middleware ran. In Express,express.json()consumes the stream. Useexpress.raw()on the webhook route. - Trimming whitespace. Don’t. The body is signed exactly as sent.
- Comparing with
===or==. Leaks timing information about partial matches. Usecrypto.timingSafeEqual/hmac.compare_digest. - Truncating the hex string. The full 64-character hex digest is the signature.
- Hashing the secret. The secret is the HMAC key, not part of the message. Don’t include it in the message.
Rotating the secret
- Per-site form webhook: call
POST /v1/sites/:id/webhooks/formswith"rotate_secret": true. - Customer-level lifecycle webhook: call
POST /v1/customer/webhook/regenerate-secret.
The old secret is invalidated immediately; the new plaintext is returned once in the response. In-flight retries are re-signed with the new secret on the next attempt.
For zero-downtime rotation, support both old and new secrets on your receiver for a few minutes while the rotation propagates.