from flask import Flask, request, abort
import hmac
import hashlib
import time
import json
from pulse import Pulse
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_here"
# Initialize the SDK client to fetch job results
client = Pulse(api_key="YOUR_API_KEY")
def verify_webhook(payload, headers, webhook_secret):
"""Verify webhook authenticity using HMAC signature."""
webhook_id = headers.get('webhook-id')
webhook_timestamp = headers.get('webhook-timestamp')
webhook_signature = headers.get('webhook-signature')
if not all([webhook_id, webhook_timestamp, webhook_signature]):
return False
if abs(int(time.time()) - int(webhook_timestamp)) > 300:
return False
signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
expected = hmac.new(webhook_secret.encode(), signed_content.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(signature, expected)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
payload = request.get_data(as_text=True)
if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
abort(401)
event = json.loads(payload)
event_type = event.get('type')
event_data = event.get('data')
if event_type == 'job.completed':
handle_job_completed(event_data)
elif event_type == 'job.failed':
handle_job_failed(event_data)
elif event_type == 'job.cancelled':
handle_job_cancelled(event_data)
return '', 200
def handle_job_completed(data):
job_id = data['job_id']
print(f"Job {job_id} completed successfully")
# Fetch the full results using the SDK
job_status = client.jobs.get_job(job_id=job_id)
print(f"Result: {job_status.result}")
def handle_job_failed(data):
job_id = data['job_id']
error = data.get('error', 'Unknown error')
print(f"Job {job_id} failed: {error}")
def handle_job_cancelled(data):
job_id = data['job_id']
print(f"Job {job_id} was cancelled")
if __name__ == '__main__':
app.run(port=5000)