Get up and running with Pulse API in 5 minutes
pip install requests
import requests
import json
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://dev.api.runpulse.com"
def extract_document_from_file(file_path, schema=None, pages=None):
"""
Extract content from a local file using the /extract endpoint.
Args:
file_path: Path to the local file
schema: Optional JSON schema for structured extraction
pages: Optional page range (e.g., "1-5")
Returns:
Extracted content or S3 URL for large documents
"""
url = f"{BASE_URL}/extract"
headers = {"x-api-key": API_KEY}
with open(file_path, 'rb') as f:
files = {'file': f}
data = {}
if schema:
data['schema'] = json.dumps(schema)
if pages:
data['pages'] = pages
response = requests.post(url, files=files, data=data, headers=headers)
if response.status_code == 200:
result = response.json()
# Check if result is returned as URL (for documents > 70 pages)
if result.get('is_url'):
print(f"Large document detected. Fetching from: {result['url']}")
# Fetch the actual content from S3
content_response = requests.get(result['url'])
return content_response.json()
else:
return result
else:
print(f"Error: {response.status_code}")
return response.json()
# Example usage
if __name__ == "__main__":
# Define extraction schema
schema = {
"invoice_number": "string",
"date": "date",
"total": "float",
"line_items": [{
"description": "string",
"quantity": "integer",
"price": "float"
}]
}
# Extract document
result = extract_document_from_file(
"invoice.pdf",
schema=schema,
pages="1-5"
)
print(json.dumps(result, indent=2))
/convert
endpoint:
def upload_and_extract(file_path, schema=None):
"""
Upload a file to S3 first, then extract content.
Use this when:
- You want to reuse the uploaded file multiple times
- You need to share the file URL with other services
- You're processing the same file with different parameters
Args:
file_path: Path to the local file
schema: Optional JSON schema for structured extraction
Returns:
Extracted content
"""
# Step 1: Upload file to S3
upload_url = f"{BASE_URL}/convert"
headers = {"x-api-key": API_KEY}
with open(file_path, 'rb') as f:
files = {'file': f}
upload_response = requests.post(upload_url, files=files, headers=headers)
if upload_response.status_code != 200:
print(f"Upload failed: {upload_response.status_code}")
return None
upload_result = upload_response.json()
file_url = upload_result['s3_object_url']
print(f"File uploaded to: {file_url}")
# Step 2: Extract content using the uploaded file URL
extract_url = f"{BASE_URL}/extract"
extract_data = {
"file-url": file_url
}
if schema:
extract_data["schema"] = schema
extract_response = requests.post(
extract_url,
json=extract_data,
headers=headers
)
if extract_response.status_code == 200:
result = extract_response.json()
# Handle large document response
if result.get('is_url'):
print(f"Large document detected. Fetching from: {result['url']}")
content_response = requests.get(result['url'])
return content_response.json()
else:
return result
else:
print(f"Extraction failed: {extract_response.status_code}")
return extract_response.json()
# Example usage
result = upload_and_extract("large_document.pdf", schema={"title": "string", "content": "string"})
def extract_async_with_polling(file_path, schema=None, pages=None):
"""
Extract content asynchronously with automatic polling.
Recommended for:
- Documents over 50 pages
- Multiple concurrent extractions
- Long-running processes
Args:
file_path: Path to the local file
schema: Optional JSON schema for structured extraction
pages: Optional page range
Returns:
Extracted content
"""
# Step 1: Start async extraction
url = f"{BASE_URL}/extract_async"
headers = {"x-api-key": API_KEY}
with open(file_path, 'rb') as f:
files = {'file': f}
data = {}
if schema:
data['schema'] = json.dumps(schema)
if pages:
data['pages'] = pages
response = requests.post(url, files=files, data=data, headers=headers)
if response.status_code != 200:
print(f"Failed to start extraction: {response.status_code}")
return response.json()
job_info = response.json()
job_id = job_info['job_id']
print(f"Job started: {job_id}")
# Step 2: Poll for job completion
poll_url = f"{BASE_URL}/job/{job_id}"
max_attempts = 60 # Maximum 5 minutes (60 * 5 seconds)
attempt = 0
while attempt < max_attempts:
time.sleep(5) # Wait 5 seconds between polls
poll_response = requests.get(poll_url, headers=headers)
if poll_response.status_code != 200:
print(f"Failed to poll job: {poll_response.status_code}")
return None
job_status = poll_response.json()
status = job_status['status']
print(f"Job status: {status}")
if status == 'completed':
result = job_status['result']
# Handle large document response
if result.get('is_url'):
print(f"Large document detected. Fetching from: {result['url']}")
content_response = requests.get(result['url'])
return content_response.json()
else:
return result
elif status == 'failed':
print(f"Job failed: {job_status.get('error', 'Unknown error')}")
return None
elif status == 'cancelled':
print("Job was cancelled")
return None
attempt += 1
print("Job timed out")
return None
# Example usage for large document
result = extract_async_with_polling(
"large_report.pdf",
schema={"sections": [{"title": "string", "content": "string"}]},
pages="1-100"
)
import requests
import json
import time
from typing import Optional, Dict, Any
from pathlib import Path
class PulseAPIClient:
"""Production-ready client for Pulse API."""
def __init__(self, api_key: str, base_url: str = "https://dev.api.runpulse.com"):
self.api_key = api_key
self.base_url = base_url
self.headers = {"x-api-key": api_key}
def extract(self,
file_path: Optional[str] = None,
file_url: Optional[str] = None,
schema: Optional[Dict] = None,
pages: Optional[str] = None,
extract_figure: bool = False,
async_mode: bool = False) -> Dict[str, Any]:
"""
Smart extraction that automatically handles different scenarios.
Args:
file_path: Local file path
file_url: Remote file URL
schema: Extraction schema
pages: Page range (e.g., "1-5")
extract_figure: Extract figures from document
async_mode: Use async processing
Returns:
Extracted content
"""
# Determine if we should use async based on file size or explicit request
if file_path and not async_mode:
file_size_mb = Path(file_path).stat().st_size / (1024 * 1024)
if file_size_mb > 10: # Auto-switch to async for files > 10MB
print(f"Large file detected ({file_size_mb:.1f}MB), using async mode")
async_mode = True
# Choose endpoint
endpoint = "/extract_async" if async_mode else "/extract"
# Prepare request
if file_path:
return self._extract_from_file(endpoint, file_path, schema, pages, extract_figure)
elif file_url:
return self._extract_from_url(endpoint, file_url, schema, pages, extract_figure)
else:
raise ValueError("Either file_path or file_url must be provided")
def _extract_from_file(self, endpoint: str, file_path: str,
schema: Optional[Dict], pages: Optional[str],
extract_figure: bool) -> Dict[str, Any]:
"""Extract from local file."""
url = f"{self.base_url}{endpoint}"
with open(file_path, 'rb') as f:
files = {'file': f}
data = {}
if schema:
data['schema'] = json.dumps(schema)
if pages:
data['pages'] = pages
if extract_figure:
data['extract_figure'] = 'true'
response = requests.post(url, files=files, data=data, headers=self.headers)
if response.status_code != 200:
raise Exception(f"Extraction failed: {response.text}")
result = response.json()
# Handle async response
if endpoint == "/extract_async":
return self._wait_for_job(result['job_id'])
# Handle large document URL response
if result.get('is_url'):
return self._fetch_from_url(result['url'])
return result
def _extract_from_url(self, endpoint: str, file_url: str,
schema: Optional[Dict], pages: Optional[str],
extract_figure: bool) -> Dict[str, Any]:
"""Extract from URL."""
url = f"{self.base_url}{endpoint}"
data = {"file-url": file_url}
if schema:
data['schema'] = schema
if pages:
data['pages'] = pages
if extract_figure:
data['extract_figure'] = extract_figure
response = requests.post(url, json=data, headers=self.headers)
if response.status_code != 200:
raise Exception(f"Extraction failed: {response.text}")
result = response.json()
# Handle async response
if endpoint == "/extract_async":
return self._wait_for_job(result['job_id'])
# Handle large document URL response
if result.get('is_url'):
return self._fetch_from_url(result['url'])
return result
def _wait_for_job(self, job_id: str, timeout: int = 300) -> Dict[str, Any]:
"""Poll for async job completion."""
poll_url = f"{self.base_url}/job/{job_id}"
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(5)
response = requests.get(poll_url, headers=self.headers)
if response.status_code != 200:
raise Exception(f"Failed to poll job: {response.text}")
job_status = response.json()
status = job_status['status']
if status == 'completed':
result = job_status['result']
if result.get('is_url'):
return self._fetch_from_url(result['url'])
return result
elif status in ['failed', 'cancelled']:
raise Exception(f"Job {status}: {job_status.get('error', 'Unknown error')}")
raise TimeoutError(f"Job {job_id} timed out after {timeout} seconds")
def _fetch_from_url(self, url: str) -> Dict[str, Any]:
"""Fetch content from S3 URL."""
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to fetch content from URL: {response.status_code}")
return response.json()
def upload_file(self, file_path: str) -> str:
"""Upload file to S3 and return URL."""
url = f"{self.base_url}/convert"
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, files=files, headers=self.headers)
if response.status_code != 200:
raise Exception(f"Upload failed: {response.text}")
return response.json()['s3_object_url']
def cancel_job(self, job_id: str) -> bool:
"""Cancel a running job."""
url = f"{self.base_url}/cancel/{job_id}"
response = requests.post(url, headers=self.headers)
return response.status_code == 200
# Example usage
if __name__ == "__main__":
client = PulseAPIClient(api_key="YOUR_API_KEY")
# Example 1: Simple extraction
result = client.extract(file_path="document.pdf")
# Example 2: Extract with schema
schema = {
"invoice_number": "string",
"total": "float",
"date": "date"
}
result = client.extract(file_path="invoice.pdf", schema=schema)
# Example 3: Force async mode for large file
result = client.extract(file_path="large_report.pdf", async_mode=True)
# Example 4: Extract from URL
result = client.extract(file_url="https://example.com/document.pdf")
print(json.dumps(result, indent=2))
Invoice Processing
schema = {
"invoice_number": "string",
"vendor_name": "string",
"date": "date",
"due_date": "date",
"subtotal": "float",
"tax": "float",
"total": "float",
"line_items": [{
"description": "string",
"quantity": "integer",
"unit_price": "float",
"amount": "float"
}]
}
result = client.extract(file_path="invoice.pdf", schema=schema)
Contract Analysis
schema = {
"parties": [{
"name": "string",
"role": "string"
}],
"effective_date": "date",
"termination_date": "date",
"payment_terms": "string",
"key_obligations": ["string"]
}
result = client.extract(file_path="contract.pdf", schema=schema)
Research Paper Processing
schema = {
"title": "string",
"authors": ["string"],
"abstract": "string",
"keywords": ["string"],
"sections": [{
"title": "string",
"content": "string"
}],
"references": ["string"]
}
result = client.extract(file_path="paper.pdf", schema=schema)