Prerequisites

Before you begin, make sure there is:

API Key

Get your API key from runpulse.com/pricing-studio-pulse

Python 3.7+

Install Python and the requests library

Step 1: Install Required Libraries

pip install requests

Step 2: Basic Document Extraction

Here’s a complete Python script for extracting content from a document:
basic_extraction.py
import requests
import json
import time

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://dev.api.runpulse.com"

def extract_document_from_file(file_path, schema=None, pages=None):
    """
    Extract content from a local file using the /extract endpoint.
    
    Args:
        file_path: Path to the local file
        schema: Optional JSON schema for structured extraction
        pages: Optional page range (e.g., "1-5")
    
    Returns:
        Extracted content or S3 URL for large documents
    """
    url = f"{BASE_URL}/extract"
    headers = {"x-api-key": API_KEY}
    
    with open(file_path, 'rb') as f:
        files = {'file': f}
        data = {}
        
        if schema:
            data['schema'] = json.dumps(schema)
        if pages:
            data['pages'] = pages
            
        response = requests.post(url, files=files, data=data, headers=headers)
    
    if response.status_code == 200:
        result = response.json()
        
        # Check if result is returned as URL (for documents > 70 pages)
        if result.get('is_url'):
            print(f"Large document detected. Fetching from: {result['url']}")
            # Fetch the actual content from S3
            content_response = requests.get(result['url'])
            return content_response.json()
        else:
            return result
    else:
        print(f"Error: {response.status_code}")
        return response.json()

# Example usage
if __name__ == "__main__":
    # Define extraction schema
    schema = {
        "invoice_number": "string",
        "date": "date",
        "total": "float",
        "line_items": [{
            "description": "string",
            "quantity": "integer",
            "price": "float"
        }]
    }
    
    # Extract document
    result = extract_document_from_file(
        "invoice.pdf",
        schema=schema,
        pages="1-5"
    )
    
    print(json.dumps(result, indent=2))

Step 3: Using /convert When You Don’t Have a Presigned URL

If you need to upload a file first before processing, use the /convert endpoint:
upload_and_extract.py
def upload_and_extract(file_path, schema=None):
    """
    Upload a file to S3 first, then extract content.
    Use this when:
    - You want to reuse the uploaded file multiple times
    - You need to share the file URL with other services
    - You're processing the same file with different parameters
    
    Args:
        file_path: Path to the local file
        schema: Optional JSON schema for structured extraction
    
    Returns:
        Extracted content
    """
    
    # Step 1: Upload file to S3
    upload_url = f"{BASE_URL}/convert"
    headers = {"x-api-key": API_KEY}
    
    with open(file_path, 'rb') as f:
        files = {'file': f}
        upload_response = requests.post(upload_url, files=files, headers=headers)
    
    if upload_response.status_code != 200:
        print(f"Upload failed: {upload_response.status_code}")
        return None
    
    upload_result = upload_response.json()
    file_url = upload_result['s3_object_url']
    print(f"File uploaded to: {file_url}")
    
    # Step 2: Extract content using the uploaded file URL
    extract_url = f"{BASE_URL}/extract"
    extract_data = {
        "file-url": file_url
    }
    
    if schema:
        extract_data["schema"] = schema
    
    extract_response = requests.post(
        extract_url,
        json=extract_data,
        headers=headers
    )
    
    if extract_response.status_code == 200:
        result = extract_response.json()
        
        # Handle large document response
        if result.get('is_url'):
            print(f"Large document detected. Fetching from: {result['url']}")
            content_response = requests.get(result['url'])
            return content_response.json()
        else:
            return result
    else:
        print(f"Extraction failed: {extract_response.status_code}")
        return extract_response.json()

# Example usage
result = upload_and_extract("large_document.pdf", schema={"title": "string", "content": "string"})

Step 4: Asynchronous Processing for Large Documents

For documents over 50 pages or when processing multiple files, use async extraction:
async_extraction.py
def extract_async_with_polling(file_path, schema=None, pages=None):
    """
    Extract content asynchronously with automatic polling.
    Recommended for:
    - Documents over 50 pages
    - Multiple concurrent extractions
    - Long-running processes
    
    Args:
        file_path: Path to the local file
        schema: Optional JSON schema for structured extraction
        pages: Optional page range
    
    Returns:
        Extracted content
    """
    
    # Step 1: Start async extraction
    url = f"{BASE_URL}/extract_async"
    headers = {"x-api-key": API_KEY}
    
    with open(file_path, 'rb') as f:
        files = {'file': f}
        data = {}
        
        if schema:
            data['schema'] = json.dumps(schema)
        if pages:
            data['pages'] = pages
            
        response = requests.post(url, files=files, data=data, headers=headers)
    
    if response.status_code != 200:
        print(f"Failed to start extraction: {response.status_code}")
        return response.json()
    
    job_info = response.json()
    job_id = job_info['job_id']
    print(f"Job started: {job_id}")
    
    # Step 2: Poll for job completion
    poll_url = f"{BASE_URL}/job/{job_id}"
    max_attempts = 60  # Maximum 5 minutes (60 * 5 seconds)
    attempt = 0
    
    while attempt < max_attempts:
        time.sleep(5)  # Wait 5 seconds between polls
        poll_response = requests.get(poll_url, headers=headers)
        
        if poll_response.status_code != 200:
            print(f"Failed to poll job: {poll_response.status_code}")
            return None
        
        job_status = poll_response.json()
        status = job_status['status']
        
        print(f"Job status: {status}")
        
        if status == 'completed':
            result = job_status['result']
            
            # Handle large document response
            if result.get('is_url'):
                print(f"Large document detected. Fetching from: {result['url']}")
                content_response = requests.get(result['url'])
                return content_response.json()
            else:
                return result
                
        elif status == 'failed':
            print(f"Job failed: {job_status.get('error', 'Unknown error')}")
            return None
            
        elif status == 'cancelled':
            print("Job was cancelled")
            return None
        
        attempt += 1
    
    print("Job timed out")
    return None

# Example usage for large document
result = extract_async_with_polling(
    "large_report.pdf",
    schema={"sections": [{"title": "string", "content": "string"}]},
    pages="1-100"
)

Step 5: Production-Ready Client

Here’s a complete production-ready class that handles all scenarios:
pulse_client.py
import requests
import json
import time
from typing import Optional, Dict, Any
from pathlib import Path

class PulseAPIClient:
    """Production-ready client for Pulse API."""
    
    def __init__(self, api_key: str, base_url: str = "https://dev.api.runpulse.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {"x-api-key": api_key}
    
    def extract(self, 
                file_path: Optional[str] = None,
                file_url: Optional[str] = None,
                schema: Optional[Dict] = None,
                pages: Optional[str] = None,
                extract_figure: bool = False,
                async_mode: bool = False) -> Dict[str, Any]:
        """
        Smart extraction that automatically handles different scenarios.
        
        Args:
            file_path: Local file path
            file_url: Remote file URL
            schema: Extraction schema
            pages: Page range (e.g., "1-5")
            extract_figure: Extract figures from document
            async_mode: Use async processing
        
        Returns:
            Extracted content
        """
        
        # Determine if we should use async based on file size or explicit request
        if file_path and not async_mode:
            file_size_mb = Path(file_path).stat().st_size / (1024 * 1024)
            if file_size_mb > 10:  # Auto-switch to async for files > 10MB
                print(f"Large file detected ({file_size_mb:.1f}MB), using async mode")
                async_mode = True
        
        # Choose endpoint
        endpoint = "/extract_async" if async_mode else "/extract"
        
        # Prepare request
        if file_path:
            return self._extract_from_file(endpoint, file_path, schema, pages, extract_figure)
        elif file_url:
            return self._extract_from_url(endpoint, file_url, schema, pages, extract_figure)
        else:
            raise ValueError("Either file_path or file_url must be provided")
    
    def _extract_from_file(self, endpoint: str, file_path: str, 
                          schema: Optional[Dict], pages: Optional[str],
                          extract_figure: bool) -> Dict[str, Any]:
        """Extract from local file."""
        
        url = f"{self.base_url}{endpoint}"
        
        with open(file_path, 'rb') as f:
            files = {'file': f}
            data = {}
            
            if schema:
                data['schema'] = json.dumps(schema)
            if pages:
                data['pages'] = pages
            if extract_figure:
                data['extract_figure'] = 'true'
            
            response = requests.post(url, files=files, data=data, headers=self.headers)
        
        if response.status_code != 200:
            raise Exception(f"Extraction failed: {response.text}")
        
        result = response.json()
        
        # Handle async response
        if endpoint == "/extract_async":
            return self._wait_for_job(result['job_id'])
        
        # Handle large document URL response
        if result.get('is_url'):
            return self._fetch_from_url(result['url'])
        
        return result
    
    def _extract_from_url(self, endpoint: str, file_url: str,
                         schema: Optional[Dict], pages: Optional[str],
                         extract_figure: bool) -> Dict[str, Any]:
        """Extract from URL."""
        
        url = f"{self.base_url}{endpoint}"
        data = {"file-url": file_url}
        
        if schema:
            data['schema'] = schema
        if pages:
            data['pages'] = pages
        if extract_figure:
            data['extract_figure'] = extract_figure
        
        response = requests.post(url, json=data, headers=self.headers)
        
        if response.status_code != 200:
            raise Exception(f"Extraction failed: {response.text}")
        
        result = response.json()
        
        # Handle async response
        if endpoint == "/extract_async":
            return self._wait_for_job(result['job_id'])
        
        # Handle large document URL response
        if result.get('is_url'):
            return self._fetch_from_url(result['url'])
        
        return result
    
    def _wait_for_job(self, job_id: str, timeout: int = 300) -> Dict[str, Any]:
        """Poll for async job completion."""
        
        poll_url = f"{self.base_url}/job/{job_id}"
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            time.sleep(5)
            
            response = requests.get(poll_url, headers=self.headers)
            if response.status_code != 200:
                raise Exception(f"Failed to poll job: {response.text}")
            
            job_status = response.json()
            status = job_status['status']
            
            if status == 'completed':
                result = job_status['result']
                if result.get('is_url'):
                    return self._fetch_from_url(result['url'])
                return result
            elif status in ['failed', 'cancelled']:
                raise Exception(f"Job {status}: {job_status.get('error', 'Unknown error')}")
        
        raise TimeoutError(f"Job {job_id} timed out after {timeout} seconds")
    
    def _fetch_from_url(self, url: str) -> Dict[str, Any]:
        """Fetch content from S3 URL."""
        
        response = requests.get(url)
        if response.status_code != 200:
            raise Exception(f"Failed to fetch content from URL: {response.status_code}")
        return response.json()
    
    def upload_file(self, file_path: str) -> str:
        """Upload file to S3 and return URL."""
        
        url = f"{self.base_url}/convert"
        
        with open(file_path, 'rb') as f:
            files = {'file': f}
            response = requests.post(url, files=files, headers=self.headers)
        
        if response.status_code != 200:
            raise Exception(f"Upload failed: {response.text}")
        
        return response.json()['s3_object_url']
    
    def cancel_job(self, job_id: str) -> bool:
        """Cancel a running job."""
        
        url = f"{self.base_url}/cancel/{job_id}"
        response = requests.post(url, headers=self.headers)
        
        return response.status_code == 200

# Example usage
if __name__ == "__main__":
    client = PulseAPIClient(api_key="YOUR_API_KEY")
    
    # Example 1: Simple extraction
    result = client.extract(file_path="document.pdf")
    
    # Example 2: Extract with schema
    schema = {
        "invoice_number": "string",
        "total": "float",
        "date": "date"
    }
    result = client.extract(file_path="invoice.pdf", schema=schema)
    
    # Example 3: Force async mode for large file
    result = client.extract(file_path="large_report.pdf", async_mode=True)
    
    # Example 4: Extract from URL
    result = client.extract(file_url="https://example.com/document.pdf")
    
    print(json.dumps(result, indent=2))

Common Use Cases

Next Steps