Skip to main content

Overview

Pulse API provides webhooks for real-time notifications about your asynchronous document processing jobs. Configure webhook endpoints to receive automatic updates when extraction jobs complete, fail, or change status.
Webhook event delivery is currently under development. The configuration portal is fully functional, but events are not yet being sent.

Why Use Webhooks?

Instead of continuously polling the /job/{job_id} endpoint to check job status, webhooks will provide:
  • Real-time updates - Get notified instantly when jobs complete
  • Reduced API calls - No need for repeated polling requests
  • Better scalability - Handle high volumes of concurrent extractions efficiently
  • Automatic retries - Failed deliveries will be retried with exponential backoff

Setting Up Webhooks

Use the webhook endpoint to generate your webhook configuration portal link:
from pulse import Pulse

client = Pulse(api_key="YOUR_API_KEY")

# Get the webhook portal link
response = client.webhooks.create_webhook_link()
print(f"Configure your webhooks at: {response.link}")

Step 2: Configure in Portal

The webhook configuration portal allows you to:
  • Add Multiple Endpoints - Configure different URLs for different event types
  • Set Authentication - Add headers or basic auth to your webhooks
  • Filter Events - Choose which events to receive at each endpoint
  • Test Endpoints - Send test events to verify your setup
  • View Logs - See delivery attempts and debug failed webhooks

Step 3: Implement Your Webhook Handler

Create an endpoint in your application to receive webhook notifications:
from flask import Flask, request, abort
import hmac
import hashlib
import time
import json
from pulse import Pulse

app = Flask(__name__)

WEBHOOK_SECRET = "your_webhook_secret_here"

# Initialize the SDK client to fetch job results
client = Pulse(api_key="YOUR_API_KEY")

def verify_webhook(payload, headers, webhook_secret):
    """Verify webhook authenticity using HMAC signature."""
    webhook_id = headers.get('webhook-id')
    webhook_timestamp = headers.get('webhook-timestamp')
    webhook_signature = headers.get('webhook-signature')
    
    if not all([webhook_id, webhook_timestamp, webhook_signature]):
        return False
    
    if abs(int(time.time()) - int(webhook_timestamp)) > 300:
        return False
    
    signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
    signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
    expected = hmac.new(webhook_secret.encode(), signed_content.encode(), hashlib.sha256).hexdigest()
    
    return hmac.compare_digest(signature, expected)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    payload = request.get_data(as_text=True)
    
    if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
        abort(401)
    
    event = json.loads(payload)
    event_type = event.get('type')
    event_data = event.get('data')
    
    if event_type == 'job.completed':
        handle_job_completed(event_data)
    elif event_type == 'job.failed':
        handle_job_failed(event_data)
    elif event_type == 'job.cancelled':
        handle_job_cancelled(event_data)
    
    return '', 200

def handle_job_completed(data):
    job_id = data['job_id']
    print(f"Job {job_id} completed successfully")
    
    # Fetch the full results using the SDK
    job_status = client.jobs.get_job(job_id=job_id)
    print(f"Result: {job_status.result}")

def handle_job_failed(data):
    job_id = data['job_id']
    error = data.get('error', 'Unknown error')
    print(f"Job {job_id} failed: {error}")

def handle_job_cancelled(data):
    job_id = data['job_id']
    print(f"Job {job_id} was cancelled")

if __name__ == '__main__':
    app.run(port=5000)

Webhook Security

Each webhook request includes security headers for verification:
webhook-id: msg_2Jv7pYGL7UwXqF3v6RjLVxQYPZG
webhook-timestamp: 1704067200
webhook-signature: v1,g0hM9SsE+OTPJTjfm/kBRBOlqPmYFYpwTEFfQK6UHdI=
The signature is computed using HMAC-SHA256 with your webhook secret.

Webhook Events (Coming Soon)

Once enabled, you’ll receive events for job status changes:

Event Types

EventDescription
job.createdNew async job created
job.processingJob started processing
job.completedJob completed successfully
job.failedJob failed with error
job.cancelledJob was cancelled

Job Completed Event

{
  "type": "job.completed",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "completed",
    "pages_processed": 25,
    "processing_time": 12.5
  }
}

Job Failed Event

{
  "type": "job.failed",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "failed",
    "error": "Document processing failed: Invalid PDF structure"
  }
}

Example Implementations

Webhook Handlers

from flask import Flask, request, abort
import hmac
import hashlib
import time
import json
from pulse import Pulse

app = Flask(__name__)

WEBHOOK_SECRET = "your_webhook_secret_here"

# Initialize Pulse client for fetching results
client = Pulse(api_key="YOUR_API_KEY")

def verify_webhook(payload, headers, secret):
    """Verify webhook authenticity using HMAC signature."""
    webhook_id = headers.get('webhook-id')
    webhook_timestamp = headers.get('webhook-timestamp')
    webhook_signature = headers.get('webhook-signature')
    
    if not all([webhook_id, webhook_timestamp, webhook_signature]):
        return False
    
    # Check timestamp (5 minute window)
    if abs(int(time.time()) - int(webhook_timestamp)) > 300:
        return False
    
    signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
    signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
    
    expected = hmac.new(secret.encode(), signed_content.encode(), hashlib.sha256).hexdigest()
    return hmac.compare_digest(signature, expected)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    payload = request.get_data(as_text=True)
    
    if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
        abort(401)
    
    event = json.loads(payload)
    event_type = event.get('type')
    event_data = event.get('data')
    
    if event_type == 'job.completed':
        job_id = event_data['job_id']
        # Fetch full results using SDK
        job_status = client.jobs.get_job(job_id=job_id)
        print(f"Job {job_id} completed: {job_status.result}")
    elif event_type == 'job.failed':
        print(f"Job {event_data['job_id']} failed: {event_data.get('error')}")
    
    return '', 200

if __name__ == '__main__':
    app.run(port=5000)

Complete Integration Example

Here’s a complete example integrating async extraction with webhook handling:
from pulse import Pulse

client = Pulse(api_key="YOUR_API_KEY")

# One-time setup: Configure webhooks
portal_response = client.webhooks.create_webhook_link()
print(f"Configure webhooks at: {portal_response.link}")
print("Add your webhook endpoint URL in the portal")

# Define extraction schema
schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string"},
        "sections": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "heading": {"type": "string"},
                    "content": {"type": "string"}
                }
            }
        }
    }
}

# Submit async extraction - webhook will notify when complete
submission = client.extract_async(
    file_url="https://www.impact-bank.com/user/file/dummy_statement.pdf",
    structured_output={"schema": schema},
    pages="1-100"
)

print(f"Extraction started: {submission.job_id}")
print("You'll receive a webhook notification when complete!")

# When webhook fires, fetch the results
def handle_webhook_completion(job_id: str):
    """Called when webhook notification is received."""
    job_status = client.jobs.get_job(job_id=job_id)
    
    if job_status.status == "completed":
        print(f"Job {job_id} completed!")
        print(f"Result: {job_status.result}")
    else:
        print(f"Job {job_id} status: {job_status.status}")

Best Practices

1. Idempotent Processing

Design your webhook handlers to be idempotent, as webhooks may be delivered multiple times:
processed_events = set()  # In production, use persistent storage

def handle_webhook_event(event):
    event_id = event.get('id')
    
    # Check if already processed
    if event_id in processed_events:
        print(f"Event {event_id} already processed, skipping")
        return
    
    # Process the event
    process_event(event)
    
    # Mark as processed
    processed_events.add(event_id)

2. Quick Response Times

Respond to webhooks quickly (within 5 seconds) to avoid timeouts:
from queue import Queue
from threading import Thread

# Background processing queue
task_queue = Queue()

def process_webhook_async():
    """Background worker to process webhook events."""
    while True:
        event = task_queue.get()
        try:
            process_event(event)
        except Exception as e:
            print(f"Error processing event: {e}")
        task_queue.task_done()

# Start background worker
Thread(target=process_webhook_async, daemon=True).start()

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    # Quickly validate and acknowledge
    event = json.loads(request.get_data(as_text=True))
    
    # Queue for async processing
    task_queue.put(event)
    
    # Return immediately
    return '', 200

3. Error Handling

Implement comprehensive error handling:
def handle_webhook():
    try:
        payload = request.get_data(as_text=True)
        
        # Verify webhook
        if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
            app.logger.error("Webhook verification failed")
            return '', 401
        
        event = json.loads(payload)
        process_event(event)
        
        return '', 200
        
    except json.JSONDecodeError as e:
        app.logger.error(f"Invalid JSON payload: {e}")
        return '', 400
        
    except Exception as e:
        app.logger.error(f"Unexpected error: {e}")
        # Return 500 to trigger retry
        return '', 500

Testing Webhooks Locally

Use ngrok or similar tools to test webhooks during development:
# Start your local server
python webhook_handler.py

# In another terminal, expose it to the internet
ngrok http 5000

# Use the ngrok URL in the webhook portal
# Example: https://abc123.ngrok.io/webhook

Troubleshooting

Common Issues

  1. Webhook signature verification fails
    • Ensure you’re using the correct webhook secret from the portal
    • Check that you’re passing the raw request body for verification
    • Verify headers are being passed correctly (case-sensitive)
  2. Webhooks not being received
    • Confirm your endpoint is publicly accessible
    • Check the portal logs for delivery attempts
    • Ensure your endpoint returns 2xx status codes
  3. Replay attacks
    • The 5-minute timestamp window prevents replay attacks
    • Store and check event IDs to prevent duplicate processing