Overview

Pulse API is designed to handle documents of any size, from single-page memos to thousand-page reports. This guide covers strategies for efficiently processing large documents while maintaining accuracy and minimizing costs.

Size Thresholds

Understanding these thresholds helps you choose the right processing strategy:
Document SizeProcessing MethodResponse TypeRecommended Approach
< 50 pagesSynchronousDirect JSONUse /extract endpoint
50-70 pagesSynchronous/AsyncDirect JSONConsider async for reliability
> 70 pagesEitherS3 URLResults delivered via URL
> 100 pagesAsync recommendedS3 URLUse /extract_async
> 500 pagesAsync requiredS3 URLConsider page ranges

Automatic Optimizations

Smart Async Switching

The production client automatically switches to async mode for large files:
# Automatic async detection
client = PulseAPIClient(api_key="YOUR_API_KEY")

# Files > 10MB automatically use async
result = client.extract(file_path="large_document.pdf")  # Auto-async

# Or force async mode
result = client.extract(file_path="document.pdf", async_mode=True)

S3 URL Responses

For documents over 70 pages, results are automatically delivered via S3 URL:
result = client.extract(file_path="large_report.pdf")

# Check if result is a URL
if isinstance(result, dict) and result.get('is_url'):
    # Fetch from S3
    content_response = requests.get(result['url'])
    actual_content = content_response.json()

Processing Strategies

1. Full Document Processing

Process entire documents when you need complete context:
# Process complete document asynchronously
result = client.extract(
    file_path="annual_report.pdf",
    async_mode=True,
    schema={
        "executive_summary": "string",
        "financial_highlights": {
            "revenue": "float",
            "profit": "float",
            "growth_rate": "float"
        },
        "key_risks": ["string"]
    }
)

2. Page Range Processing

Extract specific sections to reduce processing time:
# Process only specific pages
result = client.extract(
    file_path="manual.pdf",
    pages="1-10,50-55,100",  # First 10 pages, pages 50-55, and page 100
    schema={"introduction": "string", "specifications": {}}
)
Page Range Syntax:
  • Single page: "5"
  • Range: "10-20"
  • Multiple ranges: "1-5,10-15,20"
  • Mixed: "1,3,5-10,15"

3. Chunked Processing

For very large documents, process in chunks:
def process_document_in_chunks(file_path, chunk_size=50):
    """Process large document in chunks."""
    
    # First, get total pages (you might need to check this separately)
    first_page = client.extract(file_path=file_path, pages="1")
    
    results = []
    page_num = 1
    
    while True:
        end_page = page_num + chunk_size - 1
        page_range = f"{page_num}-{end_page}"
        
        try:
            chunk_result = client.extract(
                file_path=file_path,
                pages=page_range,
                async_mode=True
            )
            results.append(chunk_result)
            page_num = end_page + 1
        except Exception as e:
            if "Invalid page range" in str(e):
                break  # Reached end of document
            raise
    
    return results

# Process 500-page document in 50-page chunks
chunks = process_document_in_chunks("huge_document.pdf", chunk_size=50)

4. Selective Extraction

Extract only what you need to minimize processing:
# Extract only table of contents first
toc_result = client.extract(
    file_path="technical_manual.pdf",
    pages="1-5",
    schema={"sections": [{"title": "string", "page": "integer"}]}
)

# Then extract specific sections based on TOC
for section in toc_result["sections"]:
    if section["title"] == "Technical Specifications":
        specs = client.extract(
            file_path="technical_manual.pdf",
            pages=str(section["page"]),
            schema={"specifications": {}}
        )

Async Processing Deep Dive

Starting Async Jobs

# Start async extraction
response = requests.post(
    "https://api.runpulse.com/extract_async",
    headers={"x-api-key": API_KEY},
    files={"file": open("large_doc.pdf", "rb")},
    data={
        "schema": json.dumps(schema),
        "pages": "1-100"
    }
)

job_info = response.json()
# {"job_id": "123e4567-e89b-12d3-a456-426614174000", "status": "pending"}

Polling for Completion

def poll_job_with_backoff(job_id, max_wait=600):
    """Poll with exponential backoff."""
    
    wait_time = 2
    total_waited = 0
    
    while total_waited < max_wait:
        response = requests.get(
            f"https://api.runpulse.com/job/{job_id}",
            headers={"x-api-key": API_KEY}
        )
        
        status = response.json()["status"]
        
        if status == "completed":
            return response.json()["result"]
        elif status in ["failed", "cancelled"]:
            raise Exception(f"Job {status}")
        
        time.sleep(wait_time)
        total_waited += wait_time
        wait_time = min(wait_time * 1.5, 30)  # Max 30 seconds
    
    raise TimeoutError("Job timed out")

Parallel Processing

Process multiple large documents simultaneously:
import concurrent.futures

def process_document_async(file_path, schema):
    """Start async processing for a single document."""
    return client.extract(
        file_path=file_path,
        schema=schema,
        async_mode=True
    )

# Process multiple documents in parallel
documents = [
    ("report1.pdf", schema1),
    ("report2.pdf", schema2),
    ("report3.pdf", schema3)
]

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    
    for file_path, schema in documents:
        future = executor.submit(process_document_async, file_path, schema)
        futures.append((file_path, future))
    
    # Collect results
    results = {}
    for file_path, future in futures:
        try:
            results[file_path] = future.result()
        except Exception as e:
            print(f"Failed to process {file_path}: {e}")

Optimization Techniques

1. Memory Management

For very large responses:
import requests
import json

def stream_large_result(url):
    """Stream large JSON responses to avoid memory issues."""
    
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    # Parse JSON in streaming mode
    parser = json.JSONDecoder()
    buffer = ""
    
    for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
        buffer += chunk
        while buffer:
            try:
                obj, idx = parser.raw_decode(buffer)
                yield obj
                buffer = buffer[idx:].lstrip()
            except json.JSONDecodeError:
                break

2. Cost Optimization

Minimize pages processed to reduce costs:
# Option 1: Extract summary first
summary = client.extract(
    file_path="report.pdf",
    pages="1-3"
)

# Option 2: Use custom chunk size
result = client.extract(
    file_path="report.pdf",
    chunk_size=5000,  # Larger chunks for better context
    pages="1-50"  # Process only relevant section
)

3. Performance Monitoring

Track processing performance:
import time

class PerformanceMonitor:
    def __init__(self):
        self.metrics = []
    
    def process_with_metrics(self, file_path, **kwargs):
        start_time = time.time()
        file_size = os.path.getsize(file_path) / (1024 * 1024)  # MB
        
        result = client.extract(file_path=file_path, **kwargs)
        
        elapsed = time.time() - start_time
        self.metrics.append({
            "file": file_path,
            "size_mb": file_size,
            "time_seconds": elapsed,
            "pages_per_second": kwargs.get("pages", "all") / elapsed
        })
        
        return result

monitor = PerformanceMonitor()
result = monitor.process_with_metrics("large_doc.pdf", async_mode=True)

Common Patterns

# Process large legal documents with navigation
def process_legal_document(file_path):
    # First, extract table of contents
    toc = client.extract(
        file_path=file_path,
        pages="1-10",
        schema={
            "sections": [{
                "number": "string",
                "title": "string", 
                "page": "integer"
            }]
        }
    )
    
    # Then extract specific sections
    sections = {}
    for section in toc["sections"]:
        if section["title"] in ["Terms and Conditions", "Liability"]:
            content = client.extract(
                file_path=file_path,
                pages=f"{section['page']}-{section['page']+10}",
                schema={"content": "string", "key_terms": ["string"]}
            )
            sections[section["title"]] = content
    
    return sections

Financial Report Analysis

# Extract key data from annual reports
def analyze_annual_report(file_path):
    # Process in stages
    stages = [
        ("1-5", {"executive_summary": "string"}),
        ("10-30", {"financial_statements": {}}),
        ("50-70", {"risk_factors": ["string"]}),
        ("100-120", {"future_outlook": "string"})
    ]
    
    results = {}
    for pages, schema in stages:
        stage_result = client.extract(
            file_path=file_path,
            pages=pages,
            schema=schema,
            async_mode=True
        )
        results.update(stage_result)
    
    return results

Error Handling

Handling Large Document Errors

def robust_large_document_processing(file_path, max_retries=3):
    """Process large documents with retry logic."""
    
    for attempt in range(max_retries):
        try:
            # Try async processing
            result = client.extract(
                file_path=file_path,
                async_mode=True,
                timeout=600  # 10 minutes
            )
            
            # Handle S3 URL response
            if result.get('is_url'):
                # Retry fetching from S3 if needed
                for _ in range(3):
                    try:
                        response = requests.get(result['url'], timeout=60)
                        return response.json()
                    except requests.exceptions.Timeout:
                        time.sleep(5)
                        continue
            
            return result
            
        except TimeoutError:
            if attempt < max_retries - 1:
                print(f"Timeout on attempt {attempt + 1}, retrying...")
                time.sleep(10 * (attempt + 1))  # Exponential backoff
            else:
                raise

Best Practices Summary

Next Steps