Overview

Pulse API provides webhooks for real-time notifications about your asynchronous document processing jobs. Configure webhook endpoints to receive automatic updates when extraction jobs complete, fail, or change status.
Webhook event delivery is currently under development. The configuration portal is fully functional, but events are not yet being sent.

Why Use Webhooks?

Instead of continuously polling the /job/{job_id} endpoint to check job status, webhooks will provide:
  • Real-time updates - Get notified instantly when jobs complete
  • Reduced API calls - No need for repeated polling requests
  • Better scalability - Handle high volumes of concurrent extractions efficiently
  • Automatic retries - Failed deliveries will be retried with exponential backoff

Setting Up Webhooks

Use the /webhook endpoint to generate your webhook configuration portal link:
import requests

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://dev.api.runpulse.com"

headers = {"x-api-key": API_KEY}
response = requests.post(f"{BASE_URL}/webhook", headers=headers)

if response.status_code == 200:
    portal_link = response.json()['link']
    print(f"Configure your webhooks at: {portal_link}")

Step 2: Configure in Portal

The webhook configuration portal allows you to:
  • Add Multiple Endpoints - Configure different URLs for different event types
  • Set Authentication - Add headers or basic auth to your webhooks
  • Filter Events - Choose which events to receive at each endpoint
  • Test Endpoints - Send test events to verify your setup
  • View Logs - See delivery attempts and debug failed webhooks

Step 3: Implement Your Webhook Handler

Create an endpoint in your application to receive webhook notifications:
webhook_handler.py
from flask import Flask, request, abort
import hmac
import hashlib
import time
import json

app = Flask(__name__)

# Your webhook secret from the portal
WEBHOOK_SECRET = "your_webhook_secret_here"

def verify_webhook(payload, headers, webhook_secret):
    """Verify webhook authenticity using HMAC signature."""
    
    webhook_id = headers.get('webhook-id')
    webhook_timestamp = headers.get('webhook-timestamp')
    webhook_signature = headers.get('webhook-signature')
    
    if not all([webhook_id, webhook_timestamp, webhook_signature]):
        return False
    
    # Check timestamp to prevent replay attacks (5 minute window)
    current_time = int(time.time())
    if abs(current_time - int(webhook_timestamp)) > 300:
        return False
    
    # Construct signed content
    signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
    
    # Extract signature from header (format: v1,signature)
    signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
    
    # Compute expected signature
    expected = hmac.new(
        webhook_secret.encode(),
        signed_content.encode(),
        hashlib.sha256
    ).hexdigest()
    
    # Constant-time comparison
    return hmac.compare_digest(signature, expected)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    payload = request.get_data(as_text=True)
    
    # Verify webhook
    if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
        abort(401)
    
    event = json.loads(payload)
    
    # Handle different event types
    event_type = event.get('type')
    event_data = event.get('data')
    
    if event_type == 'job.completed':
        handle_job_completed(event_data)
    elif event_type == 'job.failed':
        handle_job_failed(event_data)
    elif event_type == 'job.cancelled':
        handle_job_cancelled(event_data)
    
    return '', 200

def handle_job_completed(data):
    job_id = data['job_id']
    pages_processed = data.get('pages_processed', 0)
    processing_time = data.get('processing_time', 0)
    
    print(f"Job {job_id} completed successfully")
    print(f"Processed {pages_processed} pages in {processing_time} seconds")
    
    # Fetch the full results using the job ID
    # Process results, update database, notify users, etc.

def handle_job_failed(data):
    job_id = data['job_id']
    error = data.get('error', 'Unknown error')
    
    print(f"Job {job_id} failed: {error}")
    # Handle failure - retry, notify user, etc.

def handle_job_cancelled(data):
    job_id = data['job_id']
    print(f"Job {job_id} was cancelled")
    # Clean up any resources

if __name__ == '__main__':
    app.run(port=5000)

Webhook Security

Each webhook request includes security headers for verification:
webhook-id: msg_2Jv7pYGL7UwXqF3v6RjLVxQYPZG
webhook-timestamp: 1704067200
webhook-signature: v1,g0hM9SsE+OTPJTjfm/kBRBOlqPmYFYpwTEFfQK6UHdI=
The signature is computed using HMAC-SHA256 with your webhook secret.

Webhook Events (Coming Soon)

Once enabled, you’ll receive events for job status changes:

Event Types

EventDescription
job.createdNew async job created
job.processingJob started processing
job.completedJob completed successfully
job.failedJob failed with error
job.cancelledJob was cancelled

Job Completed Event

{
  "type": "job.completed",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "completed",
    "pages_processed": 25,
    "processing_time": 12.5
  }
}

Job Failed Event

{
  "type": "job.failed",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "failed",
    "error": "Document processing failed: Invalid PDF structure"
  }
}

Example Implementations

Express.js Webhook Handler

const express = require('express');
const crypto = require('crypto');

const app = express();
app.use(express.raw({ type: 'application/json' }));

function verifyWebhook(payload, headers, secret) {
  const webhookId = headers['webhook-id'];
  const webhookTimestamp = headers['webhook-timestamp'];
  const webhookSignature = headers['webhook-signature'];
  
  if (!webhookId || !webhookTimestamp || !webhookSignature) {
    return false;
  }
  
  // Check timestamp (5 minute window)
  const currentTime = Math.floor(Date.now() / 1000);
  if (Math.abs(currentTime - parseInt(webhookTimestamp)) > 300) {
    return false;
  }
  
  // Construct signed content
  const signedContent = `${webhookId}.${webhookTimestamp}.${payload}`;
  
  // Extract signature
  const signature = webhookSignature.includes(',') 
    ? webhookSignature.split(',')[1] 
    : webhookSignature;
  
  // Compute expected signature
  const expected = crypto
    .createHmac('sha256', secret)
    .update(signedContent)
    .digest('hex');
  
  // Constant-time comparison
  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(expected)
  );
}

app.post('/webhook', (req, res) => {
  const payload = req.body.toString();
  
  // Verify webhook signature
  if (!verifyWebhook(payload, req.headers, process.env.WEBHOOK_SECRET)) {
    return res.status(401).send('Unauthorized');
  }
  
  const event = JSON.parse(payload);
  
  // Handle different event types
  switch (event.type) {
    case 'job.completed':
      console.log(`Job ${event.data.job_id} completed`);
      // Process completed job
      break;
    case 'job.failed':
      console.error(`Job ${event.data.job_id} failed: ${event.data.error}`);
      // Handle failure
      break;
  }
  
  res.status(200).send('OK');
});

app.listen(3000, () => {
  console.log('Webhook handler listening on port 3000');
});

Complete Integration Example

Here’s a complete example integrating async extraction with webhook handling:
complete_integration.py
import requests
import json
from typing import Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class PulseJob:
    job_id: str
    file_path: str
    schema: Optional[Dict]
    created_at: datetime
    status: str = "pending"

class PulseWebhookClient:
    """Complete Pulse API client with webhook support."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://dev.api.runpulse.com"
        self.headers = {"x-api-key": api_key}
        self.jobs = {}  # In production, use a proper database
    
    def setup_webhooks(self) -> str:
        """Get webhook portal link for configuration."""
        response = requests.post(
            f"{self.base_url}/webhook",
            headers=self.headers
        )
        if response.status_code == 200:
            return response.json()['link']
        raise Exception(f"Failed to get webhook portal: {response.text}")
    
    def extract_async_with_webhook(self, 
                                  file_path: str,
                                  schema: Optional[Dict] = None,
                                  pages: Optional[str] = None) -> str:
        """
        Start async extraction that will notify via webhook when complete.
        
        Returns:
            job_id for tracking
        """
        url = f"{self.base_url}/extract_async"
        
        with open(file_path, 'rb') as f:
            files = {'file': f}
            data = {}
            
            if schema:
                data['schema'] = json.dumps(schema)
            if pages:
                data['pages'] = pages
            
            response = requests.post(
                url, 
                files=files, 
                data=data, 
                headers=self.headers
            )
        
        if response.status_code == 200:
            job_info = response.json()
            job_id = job_info['job_id']
            
            # Store job info for webhook correlation
            job = PulseJob(
                job_id=job_id,
                file_path=file_path,
                schema=schema,
                created_at=datetime.now()
            )
            self.jobs[job_id] = job
            
            return job_id
        
        raise Exception(f"Failed to start extraction: {response.text}")
    
    def get_job_result(self, job_id: str) -> Dict[str, Any]:
        """
        Fetch job results after webhook notification.
        For large documents, this handles the S3 URL response.
        """
        url = f"{self.base_url}/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        
        if response.status_code == 200:
            job_data = response.json()
            
            if job_data['status'] == 'completed':
                result = job_data['result']
                
                # Handle large document URL response
                if result.get('is_url'):
                    content_response = requests.get(result['url'])
                    return content_response.json()
                
                return result
            
            raise Exception(f"Job not completed: {job_data['status']}")
        
        raise Exception(f"Failed to get job result: {response.text}")

# Usage example
if __name__ == "__main__":
    client = PulseWebhookClient(api_key="YOUR_API_KEY")
    
    # One-time setup: Configure webhooks
    portal_link = client.setup_webhooks()
    print(f"Configure webhooks at: {portal_link}")
    print("Add your webhook endpoint URL in the portal")
    
    # Extract document - webhook will notify when complete
    job_id = client.extract_async_with_webhook(
        "large_document.pdf",
        schema={
            "title": "string",
            "sections": [{
                "heading": "string",
                "content": "string"
            }]
        },
        pages="1-100"
    )
    
    print(f"Extraction started: {job_id}")
    print("You'll receive a webhook notification when complete!")

Best Practices

1. Idempotent Processing

Design your webhook handlers to be idempotent, as webhooks may be delivered multiple times:
processed_events = set()  # In production, use persistent storage

def handle_webhook_event(event):
    event_id = event.get('id')
    
    # Check if already processed
    if event_id in processed_events:
        print(f"Event {event_id} already processed, skipping")
        return
    
    # Process the event
    process_event(event)
    
    # Mark as processed
    processed_events.add(event_id)

2. Quick Response Times

Respond to webhooks quickly (within 5 seconds) to avoid timeouts:
from queue import Queue
from threading import Thread

# Background processing queue
task_queue = Queue()

def process_webhook_async():
    """Background worker to process webhook events."""
    while True:
        event = task_queue.get()
        try:
            process_event(event)
        except Exception as e:
            print(f"Error processing event: {e}")
        task_queue.task_done()

# Start background worker
Thread(target=process_webhook_async, daemon=True).start()

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    # Quickly validate and acknowledge
    event = json.loads(request.get_data(as_text=True))
    
    # Queue for async processing
    task_queue.put(event)
    
    # Return immediately
    return '', 200

3. Error Handling

Implement comprehensive error handling:
def handle_webhook():
    try:
        payload = request.get_data(as_text=True)
        
        # Verify webhook
        if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
            app.logger.error("Webhook verification failed")
            return '', 401
        
        event = json.loads(payload)
        process_event(event)
        
        return '', 200
        
    except json.JSONDecodeError as e:
        app.logger.error(f"Invalid JSON payload: {e}")
        return '', 400
        
    except Exception as e:
        app.logger.error(f"Unexpected error: {e}")
        # Return 500 to trigger retry
        return '', 500

Testing Webhooks Locally

Use ngrok or similar tools to test webhooks during development:
# Start your local server
python webhook_handler.py

# In another terminal, expose it to the internet
ngrok http 5000

# Use the ngrok URL in the webhook portal
# Example: https://abc123.ngrok.io/webhook

Troubleshooting

Common Issues

  1. Webhook signature verification fails
    • Ensure you’re using the correct webhook secret from the portal
    • Check that you’re passing the raw request body for verification
    • Verify headers are being passed correctly (case-sensitive)
  2. Webhooks not being received
    • Confirm your endpoint is publicly accessible
    • Check the portal logs for delivery attempts
    • Ensure your endpoint returns 2xx status codes
  3. Replay attacks
    • The 5-minute timestamp window prevents replay attacks
    • Store and check event IDs to prevent duplicate processing