Skip to main content

Overview

Pulse API is designed to handle documents of any size, from single-page memos to thousand-page reports. This guide covers strategies for efficiently processing large documents while maintaining accuracy and minimizing costs. Very large files are defined as documents with 100+ pages, which require special handling for optimal performance.

Size Thresholds

Understanding these thresholds helps you choose the right processing strategy:
Document SizeProcessing MethodResponse TypeRecommended Approach
< 50 pagesSynchronousDirect JSONUse /extract endpoint
50-70 pagesSynchronous/AsyncDirect JSONConsider async for reliability
> 70 pagesEither/large_results/{job_id} linkResults delivered via one-time download link
> 100 pagesAsync recommended/large_results/{job_id} linkUse /extract with async: true
> 500 pagesAsync required/large_results/{job_id} linkConsider page ranges

Automatic Optimizations

Smart Async Switching

The production client automatically switches to async mode for large files:
# Automatic async detection
client = Pulse(api_key="YOUR_API_KEY")

# Files > 10MB automatically use async
result = client.extract(file_url="large_document.pdf")  # Auto-async

# Or force async mode
result = client.extract(file_url="document.pdf", async_=True)
For documents over 70 pages — or any response payload above the 5 MB inline threshold — results are returned as a one-time download link of the form https://api.runpulse.com/large_results/{job_id}:
import os
import requests

API_KEY = os.environ["PULSE_API_KEY"]

result = client.extract(file_url="large_report.pdf")

if isinstance(result, dict) and result.get("is_url"):
    # One-time-use link. Authenticate with your API key and fetch immediately.
    content_response = requests.get(
        result["url"],
        headers={"x-api-key": API_KEY},
    )
    content_response.raise_for_status()
    actual_content = content_response.json()
/large_results/{job_id} links are single-use and expire 1 hour after the job completes. Once a successful download has streamed (or the hour has elapsed), the link returns 410 Gone. Always persist the payload to your own storage on first read.

Processing Strategies

1. Full Document Processing

Process entire documents when you need complete context:
# Process complete document asynchronously
result = client.extract(
    file_url="annual_report.pdf",
    async_=True,
    schema={
        "executive_summary": "string",
        "financial_highlights": {
            "revenue": "float",
            "profit": "float",
            "growth_rate": "float"
        },
        "key_risks": ["string"]
    }
)

2. Page Range Processing

Extract specific sections to reduce processing time:
# Process only specific pages
result = client.extract(
    file_url="manual.pdf",
    pages="1-10,50-55,100",  # First 10 pages, pages 50-55, and page 100
    schema={"introduction": "string", "specifications": {}}
)
Page Range Syntax (1-indexed, page 1 is the first page):
  • Single page: "5" — extracts the 5th page
  • Range: "10-20" — extracts pages 10 through 20 (inclusive)
  • Multiple ranges: "1-5,10-15,20" — extracts pages 1-5, 10-15, and page 20
  • Mixed: "1,3,5-10,15" — extracts pages 1, 3, 5-10, and 15

3. Chunked Processing

For very large documents, process in chunks:
def process_document_in_chunks(file_path, chunk_size=50):
    """Process large document in chunks."""
    
    # First, get total pages (you might need to check this separately)
    first_page = client.extract(file_url=file_path, pages="1")
    
    results = []
    page_num = 1
    
    while True:
        end_page = page_num + chunk_size - 1
        page_range = f"{page_num}-{end_page}"
        
        try:
            chunk_result = client.extract(
                file_url=file_path,
                pages=page_range,
                async_=True
            )
            results.append(chunk_result)
            page_num = end_page + 1
        except Exception as e:
            if "Invalid page range" in str(e):
                break  # Reached end of document
            raise
    
    return results

# Process 500-page document in 50-page chunks
chunks = process_document_in_chunks("huge_document.pdf", chunk_size=50)

4. Selective Extraction

Extract only what you need to minimize processing:
# Extract only table of contents first
toc_result = client.extract(
    file_url="technical_manual.pdf",
    pages="1-5",
    schema={"sections": [{"title": "string", "page": "integer"}]}
)

# Then extract specific sections based on TOC
for section in toc_result["sections"]:
    if section["title"] == "Technical Specifications":
        specs = client.extract(
            file_url="technical_manual.pdf",
            pages=str(section["page"]),
            schema={"specifications": {}}
        )

Async Processing Deep Dive

Starting Async Jobs

# Start async extraction (use /extract with async flag)
response = requests.post(
    "https://api.runpulse.com/extract",
    headers={"x-api-key": API_KEY},
    files={"file": open("large_doc.pdf", "rb")},
    data={
        "pages": "1-100",
        "async": "true"
    }
)

job_info = response.json()
# {"job_id": "123e4567-e89b-12d3-a456-426614174000", "status": "pending", "message": "Document processing started"}

# After extraction completes, apply schema via /schema endpoint:
# schema_response = requests.post(
#     "https://api.runpulse.com/schema",
#     headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
#     json={
#         "extraction_id": result["extraction_id"],
#         "schema_config": {"input_schema": schema}
#     }
# )

Polling for Completion

def poll_job_with_backoff(job_id, max_wait=600):
    """Poll with exponential backoff."""
    
    wait_time = 2
    total_waited = 0
    
    while total_waited < max_wait:
        response = requests.get(
            f"https://api.runpulse.com/job/{job_id}",
            headers={"x-api-key": API_KEY}
        )
        
        status = response.json()["status"]
        
        if status == "completed":
            return response.json()["result"]
        elif status in ["failed", "cancelled"]:
            raise Exception(f"Job {status}")
        
        time.sleep(wait_time)
        total_waited += wait_time
        wait_time = min(wait_time * 1.5, 30)  # Max 30 seconds
    
    raise TimeoutError("Job timed out")

Parallel Processing

Process multiple large documents simultaneously:
import concurrent.futures

def process_document_async(file_path, schema):
    """Start async processing for a single document."""
    return client.extract(
        file_url=file_path,
        schema=schema,
        async_=True
    )

# Process multiple documents in parallel
documents = [
    ("report1.pdf", schema1),
    ("report2.pdf", schema2),
    ("report3.pdf", schema3)
]

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    
    for file_path, schema in documents:
        future = executor.submit(process_document_async, file_path, schema)
        futures.append((file_path, future))
    
    # Collect results
    results = {}
    for file_path, future in futures:
        try:
            results[file_path] = future.result()
        except Exception as e:
            print(f"Failed to process {file_path}: {e}")

Optimization Techniques

1. Memory Management

For very large responses:
import requests
import json

def stream_large_result(url):
    """Stream large JSON responses to avoid memory issues."""
    
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    # Parse JSON in streaming mode
    parser = json.JSONDecoder()
    buffer = ""
    
    for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
        buffer += chunk
        while buffer:
            try:
                obj, idx = parser.raw_decode(buffer)
                yield obj
                buffer = buffer[idx:].lstrip()
            except json.JSONDecodeError:
                break

2. Cost Optimization

Minimize pages processed to reduce costs:
# Option 1: Extract summary first
summary = client.extract(
    file_url="report.pdf",
    pages="1-3"
)

# Option 2: Use custom chunk size
result = client.extract(
    file_url="report.pdf",
    chunk_size=5000,  # Larger chunks for better context
    pages="1-50"  # Process only relevant section
)

3. Performance Monitoring

Track processing performance:
import time

class PerformanceMonitor:
    def __init__(self):
        self.metrics = []
    
    def process_with_metrics(self, file_path, **kwargs):
        start_time = time.time()
        file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB
        
        result = client.extract(file_url=file_path, **kwargs)
        
        elapsed = time.time() - start_time
        self.metrics.append({
            "file": file_path,
            "size_mb": file_size,
            "time_seconds": elapsed,
            "pages_per_second": kwargs.get("pages", "all") / elapsed
        })
        
        return result

monitor = PerformanceMonitor()
result = monitor.process_with_metrics("large_doc.pdf", async_=True)

Common Patterns

# Process large legal documents with navigation
def process_legal_document(file_path):
    # First, extract table of contents
    toc = client.extract(
        file_url=file_path,
        pages="1-10",
        schema={
            "sections": [{
                "number": "string",
                "title": "string", 
                "page": "integer"
            }]
        }
    )
    
    # Then extract specific sections
    sections = {}
    for section in toc["sections"]:
        if section["title"] in ["Terms and Conditions", "Liability"]:
            content = client.extract(
                file_url=file_path,
                pages=f"{section['page']}-{section['page']+10}",
                schema={"content": "string", "key_terms": ["string"]}
            )
            sections[section["title"]] = content
    
    return sections

Financial Report Analysis

# Extract key data from annual reports
def analyze_annual_report(file_path):
    # Process in stages
    stages = [
        ("1-5", {"executive_summary": "string"}),
        ("10-30", {"financial_statements": {}}),
        ("50-70", {"risk_factors": ["string"]}),
        ("100-120", {"future_outlook": "string"})
    ]
    
    results = {}
    for pages, schema in stages:
        stage_result = client.extract(
            file_url=file_path,
            pages=pages,
            schema=schema,
            async_=True
        )
        results.update(stage_result)
    
    return results

Error Handling

Handling Large Document Errors

def robust_large_document_processing(file_path, max_retries=3):
    """Process large documents with retry logic."""

    for attempt in range(max_retries):
        try:
            result = client.extract(
                file_url=file_path,
                async_=True,
                # Per-request timeout. The Python SDK exposes timeout via the
                # `request_options` dict, not as a top-level kwarg.
                request_options={"timeout_in_seconds": 600},  # 10 minutes
            )

            # Handle one-time large-result download link
            if result.get("is_url"):
                # Single-use link — fetch once and persist immediately.
                # Retry only on transient network errors, not on 410 (already consumed/expired).
                response = requests.get(
                    result["url"],
                    headers={"x-api-key": API_KEY},
                    timeout=60,
                )
                response.raise_for_status()
                return response.json()

            return result

        except TimeoutError:
            if attempt < max_retries - 1:
                print(f"Timeout on attempt {attempt + 1}, retrying...")
                time.sleep(10 * (attempt + 1))  # Exponential backoff
            else:
                raise

Best Practices Summary

  • Always use async processing for files > 50 pages
  • Let the client auto-detect when to use async
  • Implement proper polling with backoff
  • Process only the pages you need
  • Extract TOC first to navigate large documents
  • Use chunking for very large documents
  • Track processing times and costs
  • Adjust chunk sizes based on document type
  • Use concurrent processing for multiple files

Next Steps

Async Processing

Async flag, polling, and webhooks

Error Handling

Handle errors in large documents