Overview
Pulse API is designed to handle documents of any size, from single-page memos to thousand-page reports. This guide covers strategies for efficiently processing large documents while maintaining accuracy and minimizing costs.
Very large files are defined as documents with 100+ pages, which require special handling for optimal performance.
Size Thresholds
Understanding these thresholds helps you choose the right processing strategy:
Document Size Processing Method Response Type Recommended Approach < 50 pages Synchronous Direct JSON Use /extract endpoint 50-70 pages Synchronous/Async Direct JSON Consider async for reliability > 70 pages Either S3 URL Results delivered via URL > 100 pages Async recommended S3 URL Use /extract_async > 500 pages Async required S3 URL Consider page ranges
Automatic Optimizations
Smart Async Switching
The production client automatically switches to async mode for large files:
# Automatic async detection
client = PulseAPIClient( api_key = "YOUR_API_KEY" )
# Files > 10MB automatically use async
result = client.extract( file_path = "large_document.pdf" ) # Auto-async
# Or force async mode
result = client.extract( file_path = "document.pdf" , async_mode = True )
S3 URL Responses
For documents over 70 pages, results are automatically delivered via S3 URL:
result = client.extract( file_path = "large_report.pdf" )
# Check if result is a URL
if isinstance (result, dict ) and result.get( 'is_url' ):
# Fetch from S3
content_response = requests.get(result[ 'url' ])
actual_content = content_response.json()
Processing Strategies
1. Full Document Processing
Process entire documents when you need complete context:
# Process complete document asynchronously
result = client.extract(
file_path = "annual_report.pdf" ,
async_mode = True ,
schema = {
"executive_summary" : "string" ,
"financial_highlights" : {
"revenue" : "float" ,
"profit" : "float" ,
"growth_rate" : "float"
},
"key_risks" : [ "string" ]
}
)
2. Page Range Processing
Extract specific sections to reduce processing time:
# Process only specific pages
result = client.extract(
file_path = "manual.pdf" ,
pages = "1-10,50-55,100" , # First 10 pages, pages 50-55, and page 100
schema = { "introduction" : "string" , "specifications" : {}}
)
Page Range Syntax :
Single page: "5"
Range: "10-20"
Multiple ranges: "1-5,10-15,20"
Mixed: "1,3,5-10,15"
3. Chunked Processing
For very large documents, process in chunks:
def process_document_in_chunks ( file_path , chunk_size = 50 ):
"""Process large document in chunks."""
# First, get total pages (you might need to check this separately)
first_page = client.extract( file_path = file_path, pages = "1" )
results = []
page_num = 1
while True :
end_page = page_num + chunk_size - 1
page_range = f " { page_num } - { end_page } "
try :
chunk_result = client.extract(
file_path = file_path,
pages = page_range,
async_mode = True
)
results.append(chunk_result)
page_num = end_page + 1
except Exception as e:
if "Invalid page range" in str (e):
break # Reached end of document
raise
return results
# Process 500-page document in 50-page chunks
chunks = process_document_in_chunks( "huge_document.pdf" , chunk_size = 50 )
Extract only what you need to minimize processing:
# Extract only table of contents first
toc_result = client.extract(
file_path = "technical_manual.pdf" ,
pages = "1-5" ,
schema = { "sections" : [{ "title" : "string" , "page" : "integer" }]}
)
# Then extract specific sections based on TOC
for section in toc_result[ "sections" ]:
if section[ "title" ] == "Technical Specifications" :
specs = client.extract(
file_path = "technical_manual.pdf" ,
pages = str (section[ "page" ]),
schema = { "specifications" : {}}
)
Async Processing Deep Dive
Starting Async Jobs
# Start async extraction
response = requests.post(
"https://dev.api.runpulse.com/extract_async" ,
headers = { "x-api-key" : API_KEY },
files = { "file" : open ( "large_doc.pdf" , "rb" )},
data = {
"schema" : json.dumps(schema),
"pages" : "1-100"
}
)
job_info = response.json()
# {"job_id": "123e4567-e89b-12d3-a456-426614174000", "status": "pending"}
Polling for Completion
def poll_job_with_backoff ( job_id , max_wait = 600 ):
"""Poll with exponential backoff."""
wait_time = 2
total_waited = 0
while total_waited < max_wait:
response = requests.get(
f "https://dev.api.runpulse.com/job/ { job_id } " ,
headers = { "x-api-key" : API_KEY }
)
status = response.json()[ "status" ]
if status == "completed" :
return response.json()[ "result" ]
elif status in [ "failed" , "cancelled" ]:
raise Exception ( f "Job { status } " )
time.sleep(wait_time)
total_waited += wait_time
wait_time = min (wait_time * 1.5 , 30 ) # Max 30 seconds
raise TimeoutError ( "Job timed out" )
Parallel Processing
Process multiple large documents simultaneously:
import concurrent.futures
def process_document_async ( file_path , schema ):
"""Start async processing for a single document."""
return client.extract(
file_path = file_path,
schema = schema,
async_mode = True
)
# Process multiple documents in parallel
documents = [
( "report1.pdf" , schema1),
( "report2.pdf" , schema2),
( "report3.pdf" , schema3)
]
with concurrent.futures.ThreadPoolExecutor( max_workers = 5 ) as executor:
futures = []
for file_path, schema in documents:
future = executor.submit(process_document_async, file_path, schema)
futures.append((file_path, future))
# Collect results
results = {}
for file_path, future in futures:
try :
results[file_path] = future.result()
except Exception as e:
print ( f "Failed to process { file_path } : { e } " )
Optimization Techniques
1. Memory Management
For very large responses:
import requests
import json
def stream_large_result ( url ):
"""Stream large JSON responses to avoid memory issues."""
response = requests.get(url, stream = True )
response.raise_for_status()
# Parse JSON in streaming mode
parser = json.JSONDecoder()
buffer = ""
for chunk in response.iter_content( chunk_size = 1024 , decode_unicode = True ):
buffer += chunk
while buffer:
try :
obj, idx = parser.raw_decode(buffer)
yield obj
buffer = buffer[idx:].lstrip()
except json.JSONDecodeError:
break
2. Cost Optimization
Minimize pages processed to reduce costs:
# Option 1: Extract summary first
summary = client.extract(
file_path = "report.pdf" ,
pages = "1-3"
)
# Option 2: Use custom chunk size
result = client.extract(
file_path = "report.pdf" ,
chunk_size = 5000 , # Larger chunks for better context
pages = "1-50" # Process only relevant section
)
Track processing performance:
import time
class PerformanceMonitor :
def __init__ ( self ):
self .metrics = []
def process_with_metrics ( self , file_path , ** kwargs ):
start_time = time.time()
file_size = os.path.getsize(file_path) / ( 1024 * 1024 ) # MB
result = client.extract( file_path = file_path, ** kwargs)
elapsed = time.time() - start_time
self .metrics.append({
"file" : file_path,
"size_mb" : file_size,
"time_seconds" : elapsed,
"pages_per_second" : kwargs.get( "pages" , "all" ) / elapsed
})
return result
monitor = PerformanceMonitor()
result = monitor.process_with_metrics( "large_doc.pdf" , async_mode = True )
Common Patterns
Legal Document Processing
# Process large legal documents with navigation
def process_legal_document ( file_path ):
# First, extract table of contents
toc = client.extract(
file_path = file_path,
pages = "1-10" ,
schema = {
"sections" : [{
"number" : "string" ,
"title" : "string" ,
"page" : "integer"
}]
}
)
# Then extract specific sections
sections = {}
for section in toc[ "sections" ]:
if section[ "title" ] in [ "Terms and Conditions" , "Liability" ]:
content = client.extract(
file_path = file_path,
pages = f " { section[ 'page' ] } - { section[ 'page' ] + 10 } " ,
schema = { "content" : "string" , "key_terms" : [ "string" ]}
)
sections[section[ "title" ]] = content
return sections
Financial Report Analysis
# Extract key data from annual reports
def analyze_annual_report ( file_path ):
# Process in stages
stages = [
( "1-5" , { "executive_summary" : "string" }),
( "10-30" , { "financial_statements" : {}}),
( "50-70" , { "risk_factors" : [ "string" ]}),
( "100-120" , { "future_outlook" : "string" })
]
results = {}
for pages, schema in stages:
stage_result = client.extract(
file_path = file_path,
pages = pages,
schema = schema,
async_mode = True
)
results.update(stage_result)
return results
Error Handling
Handling Large Document Errors
def robust_large_document_processing ( file_path , max_retries = 3 ):
"""Process large documents with retry logic."""
for attempt in range (max_retries):
try :
# Try async processing
result = client.extract(
file_path = file_path,
async_mode = True ,
timeout = 600 # 10 minutes
)
# Handle S3 URL response
if result.get( 'is_url' ):
# Retry fetching from S3 if needed
for _ in range ( 3 ):
try :
response = requests.get(result[ 'url' ], timeout = 60 )
return response.json()
except requests.exceptions.Timeout:
time.sleep( 5 )
continue
return result
except TimeoutError :
if attempt < max_retries - 1 :
print ( f "Timeout on attempt { attempt + 1 } , retrying..." )
time.sleep( 10 * (attempt + 1 )) # Exponential backoff
else :
raise
Best Practices Summary
Use Async for Large Files
Always use async processing for files > 50 pages
Let the client auto-detect when to use async
Implement proper polling with backoff
Process only the pages you need
Extract TOC first to navigate large documents
Use chunking for very large documents
Check for is_url in responses
Download results promptly (URLs expire in 24 hours)
Implement retry logic for S3 downloads
Next Steps