> ## Documentation Index
> Fetch the complete documentation index at: https://docs.runpulse.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Webhooks

> Real-time notifications for asynchronous job processing

## Overview

Pulse API provides webhooks for real-time notifications about your asynchronous document processing jobs. Configure webhook endpoints to receive automatic updates when extraction jobs complete, fail, or change status.

## Why Use Webhooks?

Instead of continuously polling the `/job/{job_id}` endpoint to check job status, webhooks will provide:

* **Real-time updates** - Get notified instantly when jobs complete
* **Reduced API calls** - No need for repeated polling requests
* **Better scalability** - Handle high volumes of concurrent extractions efficiently
* **Automatic retries** - Failed deliveries will be retried with exponential backoff

## Setting Up Webhooks

### Step 1: Get Your Portal Link

Use the webhook endpoint to generate your webhook configuration portal link:

<CodeGroup>
  ```python Python theme={null}
  from pulse import Pulse

  client = Pulse(api_key="YOUR_API_KEY")

  # Get the webhook portal link
  response = client.webhooks.create_webhook_link()
  print(f"Configure your webhooks at: {response.link}")
  ```

  ```typescript TypeScript theme={null}
  import { PulseClient } from 'pulse-ts-sdk';

  const client = new PulseClient({ 
      apiKey: 'YOUR_API_KEY'
  });

  // Get the webhook portal link
  const response = await client.webhooks.createWebhookLink();
  console.log(`Configure your webhooks at: ${response.link}`);
  ```

  ```bash curl theme={null}
  curl -X POST https://api.runpulse.com/webhook \
    -H "x-api-key: YOUR_API_KEY"

  # Response: {"link": "https://app.svix.com/..."}
  ```
</CodeGroup>

### Step 2: Configure in Portal

The webhook configuration portal allows you to:

* **Add Multiple Endpoints** - Configure different URLs for different event types
* **Set Authentication** - Add headers or basic auth to your webhooks
* **Filter Events** - Choose which events to receive at each endpoint
* **Test Endpoints** - Send test events to verify your setup
* **View Logs** - See delivery attempts and debug failed webhooks

### Step 3: Implement Your Webhook Handler

Create an endpoint in your application to receive webhook notifications:

<CodeGroup>
  ```python Python (Flask) theme={null}
  from flask import Flask, request, abort
  import hmac
  import hashlib
  import time
  import json
  from pulse import Pulse

  app = Flask(__name__)

  WEBHOOK_SECRET = "your_webhook_secret_here"

  # Initialize the SDK client to fetch job results
  client = Pulse(api_key="YOUR_API_KEY")

  def verify_webhook(payload, headers, webhook_secret):
      """Verify webhook authenticity using HMAC signature."""
      webhook_id = headers.get('webhook-id')
      webhook_timestamp = headers.get('webhook-timestamp')
      webhook_signature = headers.get('webhook-signature')
      
      if not all([webhook_id, webhook_timestamp, webhook_signature]):
          return False
      
      if abs(int(time.time()) - int(webhook_timestamp)) > 300:
          return False
      
      signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
      signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
      expected = hmac.new(webhook_secret.encode(), signed_content.encode(), hashlib.sha256).hexdigest()
      
      return hmac.compare_digest(signature, expected)

  @app.route('/webhook', methods=['POST'])
  def handle_webhook():
      payload = request.get_data(as_text=True)
      
      if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
          abort(401)
      
      event = json.loads(payload)
      event_type = event.get('type')
      event_data = event.get('data')
      
      if event_type == 'job.completed':
          handle_job_completed(event_data)
      elif event_type == 'job.failed':
          handle_job_failed(event_data)
      elif event_type == 'job.cancelled':
          handle_job_cancelled(event_data)
      
      return '', 200

  def handle_job_completed(data):
      job_id = data['job_id']
      print(f"Job {job_id} completed successfully")
      
      # Fetch the full results using the SDK
      job_status = client.jobs.get_job(job_id=job_id)
      print(f"Result: {job_status.result}")

  def handle_job_failed(data):
      job_id = data['job_id']
      error = data.get('error', 'Unknown error')
      print(f"Job {job_id} failed: {error}")

  def handle_job_cancelled(data):
      job_id = data['job_id']
      print(f"Job {job_id} was cancelled")

  if __name__ == '__main__':
      app.run(port=5000)
  ```

  ```typescript TypeScript (Express) theme={null}
  import express from 'express';
  import crypto from 'crypto';
  import { PulseClient } from 'pulse-ts-sdk';

  const app = express();
  app.use(express.raw({ type: 'application/json' }));

  const WEBHOOK_SECRET = "your_webhook_secret_here";

  // Initialize the SDK client to fetch job results
  const client = new PulseClient({ 
      apiKey: 'YOUR_API_KEY'
  });

  function verifyWebhook(payload: string, headers: any, secret: string): boolean {
      const webhookId = headers['webhook-id'];
      const webhookTimestamp = headers['webhook-timestamp'];
      const webhookSignature = headers['webhook-signature'];
      
      if (!webhookId || !webhookTimestamp || !webhookSignature) return false;
      if (Math.abs(Date.now() / 1000 - parseInt(webhookTimestamp)) > 300) return false;
      
      const signedContent = `${webhookId}.${webhookTimestamp}.${payload}`;
      const signature = webhookSignature.includes(',') 
          ? webhookSignature.split(',')[1] : webhookSignature;
      const expected = crypto.createHmac('sha256', secret).update(signedContent).digest('hex');
      
      return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
  }

  app.post('/webhook', async (req, res) => {
      const payload = req.body.toString();
      
      if (!verifyWebhook(payload, req.headers, WEBHOOK_SECRET)) {
          return res.status(401).send('Unauthorized');
      }
      
      const event = JSON.parse(payload);
      const { type, data } = event;
      
      if (type === 'job.completed') {
          console.log(`Job ${data.job_id} completed`);
          // Fetch full results using SDK
          const jobStatus = await client.jobs.getJob({ jobId: data.job_id });
          console.log('Result:', jobStatus.result);
      } else if (type === 'job.failed') {
          console.log(`Job ${data.job_id} failed: ${data.error}`);
      } else if (type === 'job.cancelled') {
          console.log(`Job ${data.job_id} was cancelled`);
      }
      
      res.status(200).send('OK');
  });

  app.listen(5000, () => console.log('Webhook handler listening on port 5000'));
  ```
</CodeGroup>

## Webhook Security

Each webhook request includes security headers for verification:

```
webhook-id: msg_2Jv7pYGL7UwXqF3v6RjLVxQYPZG
webhook-timestamp: 1704067200
webhook-signature: v1,g0hM9SsE+OTPJTjfm/kBRBOlqPmYFYpwTEFfQK6UHdI=
```

The signature is computed using HMAC-SHA256 with your webhook secret.

## Webhook Events (Coming Soon)

Once enabled, you'll receive events for job status changes:

### Event Types

| Event            | Description                |
| ---------------- | -------------------------- |
| `job.created`    | New async job created      |
| `job.processing` | Job started processing     |
| `job.completed`  | Job completed successfully |
| `job.failed`     | Job failed with error      |
| `job.cancelled`  | Job was cancelled          |

### Job Completed Event

```json theme={null}
{
  "type": "job.completed",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "completed",
    "pages_processed": 25,
    "processing_time": 12.5
  }
}
```

### Job Failed Event

```json theme={null}
{
  "type": "job.failed",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "failed",
    "error": "Document processing failed: Invalid PDF structure"
  }
}
```

## Example Implementations

### Webhook Handlers

<CodeGroup>
  ```python Python (Flask) theme={null}
  from flask import Flask, request, abort
  import hmac
  import hashlib
  import time
  import json
  from pulse import Pulse

  app = Flask(__name__)

  WEBHOOK_SECRET = "your_webhook_secret_here"

  # Initialize Pulse client for fetching results
  client = Pulse(api_key="YOUR_API_KEY")

  def verify_webhook(payload, headers, secret):
      """Verify webhook authenticity using HMAC signature."""
      webhook_id = headers.get('webhook-id')
      webhook_timestamp = headers.get('webhook-timestamp')
      webhook_signature = headers.get('webhook-signature')
      
      if not all([webhook_id, webhook_timestamp, webhook_signature]):
          return False
      
      # Check timestamp (5 minute window)
      if abs(int(time.time()) - int(webhook_timestamp)) > 300:
          return False
      
      signed_content = f"{webhook_id}.{webhook_timestamp}.{payload}"
      signature = webhook_signature.split(',')[1] if ',' in webhook_signature else webhook_signature
      
      expected = hmac.new(secret.encode(), signed_content.encode(), hashlib.sha256).hexdigest()
      return hmac.compare_digest(signature, expected)

  @app.route('/webhook', methods=['POST'])
  def handle_webhook():
      payload = request.get_data(as_text=True)
      
      if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
          abort(401)
      
      event = json.loads(payload)
      event_type = event.get('type')
      event_data = event.get('data')
      
      if event_type == 'job.completed':
          job_id = event_data['job_id']
          # Fetch full results using SDK
          job_status = client.jobs.get_job(job_id=job_id)
          print(f"Job {job_id} completed: {job_status.result}")
      elif event_type == 'job.failed':
          print(f"Job {event_data['job_id']} failed: {event_data.get('error')}")
      
      return '', 200

  if __name__ == '__main__':
      app.run(port=5000)
  ```

  ```typescript TypeScript (Express) theme={null}
  import express from 'express';
  import crypto from 'crypto';
  import { PulseClient } from 'pulse-ts-sdk';

  const app = express();
  app.use(express.raw({ type: 'application/json' }));

  const WEBHOOK_SECRET = "YOUR_WEBHOOK_SECRET";

  // Initialize Pulse client for fetching results
  const client = new PulseClient({ 
      apiKey: "YOUR_API_KEY"
  });

  function verifyWebhook(payload: string, headers: any, secret: string): boolean {
    const webhookId = headers['webhook-id'];
    const webhookTimestamp = headers['webhook-timestamp'];
    const webhookSignature = headers['webhook-signature'];
    
    if (!webhookId || !webhookTimestamp || !webhookSignature) {
      return false;
    }
    
    // Check timestamp (5 minute window)
    const currentTime = Math.floor(Date.now() / 1000);
    if (Math.abs(currentTime - parseInt(webhookTimestamp)) > 300) {
      return false;
    }
    
    const signedContent = `${webhookId}.${webhookTimestamp}.${payload}`;
    const signature = webhookSignature.includes(',') 
      ? webhookSignature.split(',')[1] 
      : webhookSignature;
    
    const expected = crypto
      .createHmac('sha256', secret)
      .update(signedContent)
      .digest('hex');
    
      return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
  }

  app.post('/webhook', async (req, res) => {
    const payload = req.body.toString();
    
      if (!verifyWebhook(payload, req.headers, WEBHOOK_SECRET)) {
      return res.status(401).send('Unauthorized');
    }
    
    const event = JSON.parse(payload);
    
    switch (event.type) {
      case 'job.completed':
              const jobId = event.data.job_id;
              // Fetch full results using SDK
              const jobStatus = await client.jobs.getJob({ jobId });
              console.log(`Job ${jobId} completed:`, jobStatus.result);
        break;
      case 'job.failed':
              console.error(`Job ${event.data.job_id} failed:`, event.data.error);
        break;
    }
    
    res.status(200).send('OK');
  });

  app.listen(3000, () => {
    console.log('Webhook handler listening on port 3000');
  });
  ```
</CodeGroup>

### Complete Integration Example

Here's a complete example integrating async extraction with webhook handling:

<CodeGroup>
  ```python Python theme={null}
  from pulse import Pulse

  client = Pulse(api_key="YOUR_API_KEY")

  # One-time setup: Configure webhooks
  portal_response = client.webhooks.create_webhook_link()
  print(f"Configure webhooks at: {portal_response.link}")
  print("Add your webhook endpoint URL in the portal")

  # Define extraction schema
  schema = {
      "type": "object",
      "properties": {
          "title": {"type": "string"},
          "sections": {
              "type": "array",
              "items": {
                  "type": "object",
                  "properties": {
                      "heading": {"type": "string"},
                      "content": {"type": "string"}
                  }
              }
          }
      }
  }

  # Submit async extraction - webhook will notify when complete
  submission = client.extract(
      file_url="https://www.impact-bank.com/user/file/dummy_statement.pdf",
      pages="1-100",
      async_=True
  )

  # After extraction completes (via webhook), apply schema via /schema endpoint:
  # schema_result = client.schema(
  #     extraction_id=submission.extraction_id,
  #     schema_config={"input_schema": schema}
  # )

  print(f"Extraction started: {submission.job_id}")
  print("You'll receive a webhook notification when complete!")

  # When webhook fires, fetch the results
  def handle_webhook_completion(job_id: str):
      """Called when webhook notification is received."""
      job_status = client.jobs.get_job(job_id=job_id)
      
      if job_status.status == "completed":
          print(f"Job {job_id} completed!")
          print(f"Result: {job_status.result}")
      else:
          print(f"Job {job_id} status: {job_status.status}")
  ```

  ```typescript TypeScript theme={null}
  import { PulseClient } from 'pulse-ts-sdk';

  const client = new PulseClient({ 
      apiKey: 'YOUR_API_KEY'
  });

  // One-time setup: Configure webhooks
  const portalResponse = await client.webhooks.createWebhookLink();
  console.log(`Configure webhooks at: ${portalResponse.link}`);
  console.log("Add your webhook endpoint URL in the portal");

  // Define extraction schema
  const schema = {
      type: "object",
      properties: {
          title: { type: "string" },
          sections: {
              type: "array",
              items: {
                  type: "object",
                  properties: {
                      heading: { type: "string" },
                      content: { type: "string" }
                  }
              }
          }
      }
  };

  // Submit async extraction - webhook will notify when complete
  const submission = await client.extract({
      fileUrl: "https://www.impact-bank.com/user/file/dummy_statement.pdf",
      pages: "1-100",
      async: true
  });

  // After extraction completes (via webhook), apply schema:
  // const schemaResult = await client.schema({
  //     extractionId: submission.extraction_id,
  //     schemaConfig: { schema }
  // });

  console.log(`Extraction started: ${submission.job_id}`);
  console.log("You'll receive a webhook notification when complete!");

  // When webhook fires, fetch the results
  async function handleWebhookCompletion(jobId: string) {
      const jobStatus = await client.jobs.getJob({ jobId });
      
      if (jobStatus.status === "completed") {
          console.log(`Job ${jobId} completed!`);
          console.log(`Result: ${JSON.stringify(jobStatus.result)}`);
      } else {
          console.log(`Job ${jobId} status: ${jobStatus.status}`);
      }
  }
  ```
</CodeGroup>

## Best Practices

### 1. Idempotent Processing

Design your webhook handlers to be idempotent, as webhooks may be delivered multiple times:

```python theme={null}
processed_events = set()  # In production, use persistent storage

def handle_webhook_event(event):
    event_id = event.get('id')
    
    # Check if already processed
    if event_id in processed_events:
        print(f"Event {event_id} already processed, skipping")
        return
    
    # Process the event
    process_event(event)
    
    # Mark as processed
    processed_events.add(event_id)
```

### 2. Quick Response Times

Respond to webhooks quickly (within 5 seconds) to avoid timeouts:

```python theme={null}
from queue import Queue
from threading import Thread

# Background processing queue
task_queue = Queue()

def process_webhook_async():
    """Background worker to process webhook events."""
    while True:
        event = task_queue.get()
        try:
            process_event(event)
        except Exception as e:
            print(f"Error processing event: {e}")
        task_queue.task_done()

# Start background worker
Thread(target=process_webhook_async, daemon=True).start()

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    # Quickly validate and acknowledge
    event = json.loads(request.get_data(as_text=True))
    
    # Queue for async processing
    task_queue.put(event)
    
    # Return immediately
    return '', 200
```

### 3. Error Handling

Implement comprehensive error handling:

```python theme={null}
def handle_webhook():
    try:
        payload = request.get_data(as_text=True)
        
        # Verify webhook
        if not verify_webhook(payload, request.headers, WEBHOOK_SECRET):
            app.logger.error("Webhook verification failed")
            return '', 401
        
        event = json.loads(payload)
        process_event(event)
        
        return '', 200
        
    except json.JSONDecodeError as e:
        app.logger.error(f"Invalid JSON payload: {e}")
        return '', 400
        
    except Exception as e:
        app.logger.error(f"Unexpected error: {e}")
        # Return 500 to trigger retry
        return '', 500
```

## Testing Webhooks Locally

Use [ngrok](https://ngrok.com/) or similar tools to test webhooks during development:

```bash theme={null}
# Start your local server
python webhook_handler.py

# In another terminal, expose it to the internet
ngrok http 5000

# Use the ngrok URL in the webhook portal
# Example: https://abc123.ngrok.io/webhook
```

## Troubleshooting

### Common Issues

1. **Webhook signature verification fails**
   * Ensure you're using the correct webhook secret from the portal
   * Check that you're passing the raw request body for verification
   * Verify headers are being passed correctly (case-sensitive)

2. **Webhooks not being received**
   * Confirm your endpoint is publicly accessible
   * Check the portal logs for delivery attempts
   * Ensure your endpoint returns 2xx status codes

3. **Replay attacks**
   * The 5-minute timestamp window prevents replay attacks
   * Store and check event IDs to prevent duplicate processing

## Related Resources

<CardGroup cols={2}>
  <Card title="Async Processing" icon="clock" href="/api-reference/async-processing">
    Learn about asynchronous document processing
  </Card>

  <Card title="API Reference" icon="book" href="/api-reference/endpoint/webhook">
    View the webhook endpoint documentation
  </Card>

  <Card title="Poll Endpoint" icon="sync" href="/api-reference/endpoint/poll">
    Alternative method for checking job status
  </Card>

  <Card title="Error Handling" icon="triangle-exclamation" href="/advanced/error-handling">
    Handle errors in your integration
  </Card>
</CardGroup>
