Webhook Security Best Practices


Why Webhook Security Matters

Webhooks are HTTP callbacks that allow services to push real-time events to your application. Unlike APIs where you initiate the request, webhooks are delivered to a public endpoint that anyone with the URL can call. Without proper security, an attacker can replay webhook events, send fake event data, or probe your internal infrastructure.

Threat Model

| Threat | Impact | Likelihood | |--------|--------|------------| | Fake webhook events | Application processes false data | High | | Replay attacks | Duplicate processing of legitimate events | Medium | | Payload tampering | Corrupted data processed as valid | Medium | | Reconnaissance | Attacker probes internal network | Low | | DDoS via webhooks | Service overload from fake events | Medium |

Defense 1: Signature Verification

Every webhook provider signs their payloads. Your endpoint must verify this signature before processing.

Stripe-Style Signatures




import hmac


import hashlib




def verify_stripe_signature(payload, sig_header, secret):


"""Verify Stripe webhook signature."""


# Signature format: t=timestamp,v1=signature


parts = dict(item.split('=', 1) for item in sig_header.split(','))




if 'v1' not in parts or 't' not in parts:


return False




timestamp = parts['t']


expected_sig = parts['v1']




# Prevent replay: signature must be within 5 minutes


if abs(int(timestamp) - time.time()) > 300:


return False




# Compute expected signature


signed_payload = f"{timestamp}.{payload}".encode()


computed_sig = hmac.new(


secret.encode(),


signed_payload,


hashlib.sha256


).hexdigest()




# Constant-time comparison


return hmac.compare_digest(computed_sig, expected_sig)





GitHub-Style Signatures




const crypto = require('crypto');




function verifyGitHubSignature(req, secret) {


const signature = req.headers['x-hub-signature-256'];


if (!signature) return false;




const payload = JSON.stringify(req.body);


const computed = 'sha256=' +


crypto.createHmac('sha256', secret)


.update(payload)


.digest('hex');




// Constant-time comparison


return crypto.timingSafeEqual(


Buffer.from(signature),


Buffer.from(computed)


);


}




// Express middleware


app.post('/webhooks/github', express.raw({type: 'application/json'}), (req, res) => {


if (!verifyGitHubSignature(req, process.env.GITHUB_WEBHOOK_SECRET)) {


return res.status(401).send('Invalid signature');


}


// Process webhook


res.status(200).send('OK');


});





Generic Verification Middleware




from functools import wraps


from flask import request, abort


import hmac


import hashlib




def verify_webhook(secret):


"""Decorator to verify webhook signatures."""


def decorator(f):


@wraps(f)


def decorated_function(*args, **kwargs):


payload = request.get_data()


received_sig = request.headers.get('X-Webhook-Signature')




if not received_sig:


abort(401, 'Missing signature')




computed_sig = hmac.new(


secret.encode(),


payload,


hashlib.sha256


).hexdigest()




if not hmac.compare_digest(computed_sig, received_sig):


abort(401, 'Invalid signature')




return f(*args, **kwargs)


return decorated_function


return decorator




@app.route('/webhooks/custom', methods=['POST'])


@verify_webhook(WEBHOOK_SECRET)


def handle_webhook():


event = request.json


# Process the verified event


return 'OK', 200





Defense 2: Replay Protection

Timestamp-Based Prevention

Include a timestamp in the signed payload and reject webhooks outside a time window:




function verifyWithReplayPrevention(payload, signature, secret, toleranceMs = 300000) {


// Expected format: ts=1234567890,v1=sig_here


const parts = signature.split(',');


const tsPart = parts.find(p => p.startsWith('ts='));


const sigPart = parts.find(p => p.startsWith('v1='));




const timestamp = parseInt(tsPart.split('=')[1]);


const now = Date.now();




// Reject if outside tolerance window


if (Math.abs(now - timestamp) > toleranceMs) {


throw new Error('Webhook replay detected');


}




// Verify signature includes timestamp


const signedContent = `${timestamp}.${JSON.stringify(payload)}`;


const expectedSig = crypto.createHmac('sha256', secret)


.update(signedContent)


.digest('hex');




return crypto.timingSafeEqual(


Buffer.from(expectedSig),


Buffer.from(sigPart.split('=')[1])


);


}





Idempotency Keys

Track processed webhook IDs to prevent duplicate processing:




const processedEvents = new Set();




app.post('/webhooks/stripe', async (req, res) => {


const eventId = req.headers['stripe-signature']


.split(',')


.find(p => p.startsWith('t='))


?.split('=')[1];




if (processedEvents.has(eventId)) {


// Already processed, return success to avoid retries


return res.status(200).json({ status: 'already_processed' });


}




// Verify and process


processedEvents.add(eventId);




// Clean old entries periodically


setTimeout(() => processedEvents.delete(eventId), 86400000);


});





Defense 3: IP Allowlisting

When possible, restrict webhook endpoints to known provider IPs:




ALLOWED_IPS = {


'stripe': ['3.18.12.63/32', '3.130.192.231/32'],


'github': ['192.30.252.0/22', '185.199.108.0/22'],


'slack': ['52.89.214.238/32', '54.70.199.8/32'],


}




def ip_allowed(provider, request_ip):


import ipaddress


for cidr in ALLOWED_IPS.get(provider, []):


if ipaddress.ip_address(request_ip) in ipaddress.ip_network(cidr):


return True


return False




@app.before_request


def restrict_webhook_ips():


if request.path.startswith('/webhooks/'):


provider = request.path.split('/')[2]


if not ip_allowed(provider, request.remote_addr):


abort(403, 'IP not allowed')





Defense 4: Payload Validation

Validate the webhook payload schema before processing:




from pydantic import BaseModel, ValidationError




class GitHubPushEvent(BaseModel):


ref: str


repository: dict


commits: list


sender: dict


forced: bool = False




def validate_webhook_payload(provider, payload):


validators = {


'github': GitHubPushEvent,


'stripe': StripeEvent,


'slack': SlackEvent


}


validator = validators.get(provider)


if not validator:


return False


try:


validator(**payload)


return True


except ValidationError:


return False





Implementation Checklist

| Security Control | Implementation | Priority | |-----------------|----------------|----------| | HMAC signature verification | Verify every webhook | Critical | | Replay protection | Timestamp + tolerance | Critical | | Idempotency | Track event IDs | High | | IP allowlisting | Restrict by provider CIDR | High | | Payload validation | Schema validation | High | | Rate limiting | Per-IP and per-provider | Medium | | HTTPS only | Reject HTTP | Critical | | Error handling | Generic error messages | Medium |

Summary

Webhook endpoints are publicly accessible by design, making them an attractive target. Always verify payload signatures using HMAC-SHA256, implement replay protection with timestamps, track idempotency keys to prevent duplicate processing, validate payload structure, and restrict by IP when possible. These controls turn a publicly accessible endpoint into a secure integration point that only processes legitimate webhook events.