import hmac
import hashlib
import time
import base64
def verify_webhook(payload: str, headers: dict, webhook_secret: str) -> bool:
"""
Verify webhook authenticity using HMAC signature.
Args:
payload: Raw request body as string
headers: Request headers dict
webhook_secret: Your webhook signing secret from the portal
Returns:
True if signature is valid, False otherwise
"""
webhook_id = headers.get('webhook-id')
webhook_timestamp = headers.get('webhook-timestamp')
webhook_signature = headers.get('webhook-signature')
if not all([webhook_id, webhook_timestamp, webhook_signature]):
return False
# Check timestamp to prevent replay attacks (5 minute window)
current_time = int(time.time())
if abs(current_time - int(webhook_timestamp)) > 300:
return False
# Construct signed content
signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
# Extract signature from header (format: v1,signature)
signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
# Compute expected signature (base64-encoded HMAC-SHA256)
expected = base64.b64encode(
hmac.new(
base64.b64decode(webhook_secret),
signed_content.encode(),
hashlib.sha256
).digest()
).decode()
# Constant-time comparison
return hmac.compare_digest(signature, expected)
# Example usage with Flask
from flask import Flask, request, abort
app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_secret_here"
@app.route('/webhook', methods=['POST'])
def handle_webhook():
payload = request.get_data(as_text=True)
if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
abort(401)
# Process the event
event = request.json
print(f"Received event: {event['type']}")
return '', 200