WebhooksVerifying Signatures

Verifying Signatures

Every Warpweb webhook is signed with HMAC-SHA256. This is the single most important thing to get right — without verification, anyone who learns your URL can forge submissions.

Copy a snippet for your stack below, then read the scheme and common mistakes sections if anything surprises you.

Node.js (Express)

import crypto from 'node:crypto'
import express from 'express'
 
const app = express()
const WEBHOOK_SECRET = process.env.WARPWEB_WEBHOOK_SECRET
const REPLAY_WINDOW_SECONDS = 300
 
app.post(
  '/webhooks/warpweb-leads',
  express.raw({ type: 'application/json' }), // raw, not json
  (req, res) => {
    const signature = req.headers['x-warpweb-signature']
    const timestamp = req.headers['x-warpweb-timestamp']
 
    if (!signature || !signature.startsWith('sha256=') || !timestamp) {
      return res.status(401).send('missing signature or timestamp')
    }
 
    // Replay protection
    const now = Math.floor(Date.now() / 1000)
    if (Math.abs(now - Number(timestamp)) > REPLAY_WINDOW_SECONDS) {
      return res.status(401).send('stale timestamp')
    }
 
    const signingString = `${timestamp}.${req.body.toString('utf8')}`
    const expected = crypto
      .createHmac('sha256', WEBHOOK_SECRET)
      .update(signingString)
      .digest('hex')
 
    const provided = signature.slice('sha256='.length)
    const valid =
      provided.length === expected.length &&
      crypto.timingSafeEqual(
        Buffer.from(provided, 'hex'),
        Buffer.from(expected, 'hex'),
      )
 
    if (!valid) return res.status(401).send('invalid signature')
 
    const payload = JSON.parse(req.body.toString('utf8'))
    // ...handle the event
    res.status(200).send('ok')
  },
)

Node.js (Next.js App Router)

// app/api/webhooks/warpweb-leads/route.ts
import crypto from 'node:crypto'
import { NextRequest, NextResponse } from 'next/server'
 
const WEBHOOK_SECRET = process.env.WARPWEB_WEBHOOK_SECRET!
const REPLAY_WINDOW_SECONDS = 300
 
export async function POST(req: NextRequest) {
  const rawBody = await req.text()
  const signature = req.headers.get('x-warpweb-signature') ?? ''
  const timestamp = req.headers.get('x-warpweb-timestamp') ?? ''
 
  if (!signature.startsWith('sha256=') || !timestamp) {
    return new NextResponse('missing signature or timestamp', { status: 401 })
  }
 
  const now = Math.floor(Date.now() / 1000)
  if (Math.abs(now - Number(timestamp)) > REPLAY_WINDOW_SECONDS) {
    return new NextResponse('stale timestamp', { status: 401 })
  }
 
  const signingString = `${timestamp}.${rawBody}`
  const expected = crypto
    .createHmac('sha256', WEBHOOK_SECRET)
    .update(signingString, 'utf8')
    .digest('hex')
 
  const provided = signature.slice('sha256='.length)
  const valid =
    provided.length === expected.length &&
    crypto.timingSafeEqual(
      Buffer.from(provided, 'hex'),
      Buffer.from(expected, 'hex'),
    )
 
  if (!valid) return new NextResponse('invalid signature', { status: 401 })
 
  const payload = JSON.parse(rawBody)
  // ...handle the event
 
  return NextResponse.json({ ok: true })
}

Python (Flask)

import hmac
import hashlib
import os
import time
from flask import Flask, request, abort
 
app = Flask(__name__)
WEBHOOK_SECRET = os.environ['WARPWEB_WEBHOOK_SECRET'].encode()
REPLAY_WINDOW_SECONDS = 300
 
@app.post('/webhooks/warpweb-leads')
def warpweb_webhook():
    signature = request.headers.get('X-Warpweb-Signature', '')
    timestamp = request.headers.get('X-Warpweb-Timestamp', '')
 
    if not signature.startswith('sha256=') or not timestamp:
        abort(401, 'missing signature or timestamp')
 
    if abs(int(time.time()) - int(timestamp)) > REPLAY_WINDOW_SECONDS:
        abort(401, 'stale timestamp')
 
    raw_body = request.get_data()  # raw bytes, do not use request.json
    signing_string = f"{timestamp}.{raw_body.decode('utf-8')}".encode()
    expected = hmac.new(WEBHOOK_SECRET, signing_string, hashlib.sha256).hexdigest()
    provided = signature[len('sha256='):]
 
    if not hmac.compare_digest(provided, expected):
        abort(401, 'invalid signature')
 
    payload = request.get_json()
    # ...handle the event
    return {'ok': True}, 200

Python (FastAPI)

import hmac
import hashlib
import os
import time
from fastapi import FastAPI, Request, HTTPException
 
app = FastAPI()
WEBHOOK_SECRET = os.environ['WARPWEB_WEBHOOK_SECRET'].encode()
REPLAY_WINDOW_SECONDS = 300
 
@app.post('/webhooks/warpweb-leads')
async def warpweb_webhook(request: Request):
    raw_body = await request.body()
    signature = request.headers.get('x-warpweb-signature', '')
    timestamp = request.headers.get('x-warpweb-timestamp', '')
 
    if not signature.startswith('sha256=') or not timestamp:
        raise HTTPException(status_code=401, detail='missing signature or timestamp')
 
    if abs(int(time.time()) - int(timestamp)) > REPLAY_WINDOW_SECONDS:
        raise HTTPException(status_code=401, detail='stale timestamp')
 
    signing_string = f"{timestamp}.{raw_body.decode('utf-8')}".encode()
    expected = hmac.new(WEBHOOK_SECRET, signing_string, hashlib.sha256).hexdigest()
    provided = signature[len('sha256='):]
 
    if not hmac.compare_digest(provided, expected):
        raise HTTPException(status_code=401, detail='invalid signature')
 
    import json
    payload = json.loads(raw_body)
    # ...handle the event
    return {'ok': True}

curl (one-off verification)

Useful for debugging in production or in a Bash-only environment:

# NOTE: the elisions below (`...`) are placeholders for illustration only.
# This snippet won't run as-is — paste your real timestamp + raw body bytes
# before running. The signature depends on byte-exact body content, so make
# sure you capture the body before any JSON parsing or re-serialization.
TS="1716045731"
RAW_BODY='{"event_id":"7d0b368c-...","type":"form.submit","site_id":"...","occurred_at":"2026-05-17T15:42:11Z","payload":{...}}'
 
EXPECTED=$(printf '%s.%s' "$TS" "$RAW_BODY" | openssl dgst -sha256 -hmac "$WARPWEB_WEBHOOK_SECRET" | awk '{print $2}')
 
echo "sha256=$EXPECTED"
# Compare to the X-Warpweb-Signature header value

The scheme

  • Algorithm: HMAC-SHA256
  • Key: the signing secret you stored at configure time (secret_issued from POST /v1/sites/:id/webhooks/forms, or secret_plaintext from PUT /v1/customer/webhook)
  • Input: ${timestamp}.${raw_body} — the timestamp from X-Warpweb-Timestamp, a literal ., then the raw request body bytes (before any JSON parsing or re-serialization)
  • Format: hex-encoded digest, prefixed with sha256=
  • Header: X-Warpweb-Signature
  • Replay window: reject any request whose X-Warpweb-Timestamp is more than 300 seconds (5 minutes) from your server’s wall clock

Two things will trip you up if your snippet drifts from the examples above:

  • Verify against the raw body, not parsed JSON. Parsing strips whitespace, re-orders keys, and breaks the signature. In Express, use express.raw() on the webhook route, not express.json(). In Next.js App Router, read req.text() (or req.arrayBuffer()) and JSON.parse() only after verification.
  • Use a constant-time comparison. crypto.timingSafeEqual / hmac.compare_digest. Plain === leaks how many characters matched.

Common mistakes

  • Forgetting the timestamp prefix. The signature is over ${timestamp}.${raw_body}, not just the body. Both pieces must be included with the literal . separator.
  • Skipping replay protection. Without the timestamp-window check, a captured request can be replayed indefinitely. Always enforce the ±300s window.
  • Parsing JSON before computing the signature. Re-serialization is not byte-identical. Always use the raw bytes.
  • Using req.body after a JSON middleware ran. In Express, express.json() consumes the stream. Use express.raw() on the webhook route.
  • Trimming whitespace. Don’t. The body is signed exactly as sent.
  • Comparing with === or ==. Leaks timing information about partial matches. Use crypto.timingSafeEqual / hmac.compare_digest.
  • Truncating the hex string. The full 64-character hex digest is the signature.
  • Hashing the secret. The secret is the HMAC key, not part of the message. Don’t include it in the message.

Rotating the secret

  • Per-site form webhook: call POST /v1/sites/:id/webhooks/forms with "rotate_secret": true.
  • Customer-level lifecycle webhook: call POST /v1/customer/webhook/regenerate-secret.

The old secret is invalidated immediately; the new plaintext is returned once in the response. In-flight retries are re-signed with the new secret on the next attempt.

For zero-downtime rotation, support both old and new secrets on your receiver for a few minutes while the rotation propagates.