Skip to main content

Overview

The batch endpoints let you run any step of the Pulse pipeline across many documents at once. Each batch call is fully asynchronous — it returns a batch_job_id immediately and orchestrates parallel workers behind the scenes. Poll GET /job/batch_job_id for real-time progress including per-item completion status and individual child job IDs.
Batch endpoints mirror the individual pipeline steps. Each child call goes through the exact same code path as calling the individual endpoint directly — batch is orchestration, not a separate implementation.

Pipeline

Batch endpoints can be chained together, just like their single-document counterparts: Each step takes the output of a previous step as input, either via a batch_extract_id / batch_split_id that references the parent batch job, or via an explicit list of individual IDs.

Workers

Workers process items in parallel. You can control concurrency with the workers parameter on every batch endpoint.
ParameterTypeDefaultMaxDescription
workersinteger410Number of parallel workers

Batch Extract

Enumerate files from an input source and extract content from each one.
See Extract for details on extract_options (pages, figure processing, extensions, etc.).

Request — POST /batch/extract

FieldTypeRequiredDescription
inputobjectYesSource of files to process (see Input Sources)
outputobjectYesWhere to save extraction results (see Output Destinations)
extract_optionsobjectNoOptions forwarded to each /extract call
workersintegerNoParallel workers (default: 4, max: 10)

Response (202)

FieldTypeDescription
batch_job_idstringJob ID for polling
statusstring"processing"
total_filesintegerNumber of files that will be processed

Example

from pulse import Pulse
from pulse.types.batch_input_source import BatchInputSource
from pulse.types.batch_output_destination import BatchOutputDestination

client = Pulse(api_key="YOUR_API_KEY")

resp = client.batch.extract(
    input=BatchInputSource(s_3_prefix="s3://my-bucket/documents/"),
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/extractions/"),
    workers=10,
)

print(f"Job: {resp.batch_job_id}, files: {resp.total_files}")

Batch Schema

Apply structured data extraction to multiple items. Supports two modes, inferred from input:
  • Single mode — Provide extraction_ids or batch_extract_id with schema_config
  • Split mode — Provide split_ids or batch_split_id with split_schema_config
See Schema for details on schema_config, split_schema_config, and the difference between single and split modes.

Request — POST /batch/schema

FieldTypeRequiredDescription
outputobjectYesWhere to save schema results
batch_extract_idstringXORID of a prior batch extract run (single mode)
extraction_idsarrayXORExplicit list of extraction IDs (single mode)
batch_split_idstringXORID of a prior batch split run (split mode)
split_idsarrayXORExplicit list of split IDs (split mode)
schema_configobjectConditionalSchema configuration for single mode
split_schema_configobjectConditionalPer-topic schema configurations for split mode
workersintegerNoParallel workers (default: 4, max: 10)

Response (202)

FieldTypeDescription
batch_job_idstringJob ID for polling
statusstring"processing"
total_extractionsintegerNumber of extractions to process (single mode)
total_splitsintegerNumber of splits to process (split mode)

Example — Single Mode

from pulse.types.schema_config import SchemaConfig

resp = client.batch.schema(
    batch_extract_id="<batch_extract_job_id>",
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/schemas/"),
    schema_config=SchemaConfig(
        input_schema={
            "type": "object",
            "properties": {
                "total_amount": {"type": "number"},
                "vendor_name": {"type": "string"},
            },
        },
        schema_prompt="Extract invoice details",
    ),
    workers=10,
)

Example — Split Mode

from pulse.types.topic_schema_config import TopicSchemaConfig

resp = client.batch.schema(
    batch_split_id="<batch_split_job_id>",
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/schema-splits/"),
    split_schema_config={
        "financial_statements": TopicSchemaConfig(
            schema_={"type": "object", "properties": {"revenue": {"type": "number"}}},
            schema_prompt="Extract financial data",
        ),
        "risk_factors": TopicSchemaConfig(
            schema_={"type": "object", "properties": {"risks": {"type": "array", "items": {"type": "string"}}}},
        ),
    },
    workers=10,
)

Batch Tables

Extract tables from multiple existing extractions.
See Tables for details on tables_config (merge, table format, etc.).

Request — POST /batch/tables

FieldTypeRequiredDescription
outputobjectYesWhere to save table results
batch_extract_idstringXORID of a prior batch extract run
extraction_idsarrayXORExplicit list of extraction IDs
tables_configobjectNoTable extraction configuration
workersintegerNoParallel workers (default: 4, max: 10)

Response (202)

FieldTypeDescription
batch_job_idstringJob ID for polling
statusstring"processing"
total_extractionsintegerNumber of extractions to process

Example

resp = client.batch.tables(
    batch_extract_id="<batch_extract_job_id>",
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/tables/"),
    tables_config=TablesConfig(merge=True, table_format="html"),
    workers=10,
)

Batch Split

Split multiple extractions into topics.
See Split for details on split_config (topic definitions with names and descriptions).

Request — POST /batch/split

FieldTypeRequiredDescription
outputobjectYesWhere to save split results
split_configobjectYesSplit configuration with topic definitions
batch_extract_idstringXORID of a prior batch extract run
extraction_idsarrayXORExplicit list of extraction IDs
workersintegerNoParallel workers (default: 4, max: 10)

Response (202)

FieldTypeDescription
batch_job_idstringJob ID for polling
statusstring"processing"
total_extractionsintegerNumber of extractions to process

Example

from pulse.types.split_config import SplitConfig
from pulse.types.topic_definition import TopicDefinition

resp = client.batch.split(
    batch_extract_id="<batch_extract_job_id>",
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/splits/"),
    split_config=SplitConfig(
        split_input=[
            TopicDefinition(name="financial_statements", description="Balance sheets, income statements"),
            TopicDefinition(name="risk_factors", description="Risk factors and forward-looking statements"),
        ],
    ),
    workers=10,
)

Input and Output

Input Sources

Batch Extract accepts one of the following input sources:
SourceFieldExample
S3 prefixs3_prefixs3://my-bucket/documents/
Local directorylocal_path/data/documents/
URL listfile_urls["https://example.com/doc.pdf"]
All other batch endpoints reference prior results via IDs rather than raw files.

Output Destinations

Every batch endpoint writes results to an output destination. You can specify one or both:
DestinationFieldExample
S3 prefixs3_prefixs3://my-bucket/results/
Local directorylocal_path/data/results/

Monitoring Progress

Poll GET /job/batch_job_id to monitor a batch job. The response includes a result object with structured progress:
{
  "status": "processing",
  "result": {
    "progress": {
      "total": 10,
      "completed": 7,
      "failed": 1
    },
    "jobs": {
      "completed": [
        { "job_id": "abc-123", "file": "report.pdf" }
      ],
      "failed": [
        { "job_id": "def-456", "file": "corrupt.pdf", "error": "..." }
      ],
      "processing": ["ghi-789"]
    }
  }
}
Each child job_id can be polled individually for detailed results.

Polling Example

import time

job_id = resp.batch_job_id

while True:
    job = client.jobs.get_job(job_id=job_id)

    if job.status in ("completed", "failed", "canceled"):
        print(f"Final status: {job.status}")
        break

    if job.result and "progress" in job.result:
        p = job.result["progress"]
        print(f"{p['completed']}/{p['total']} completed, {p['failed']} failed")

    time.sleep(5)

Cancellation

Cancel a batch job with DELETE /job/batch_job_id. This cascades to all child jobs that are still pending or processing.

Full Pipeline Example

Process a folder of SEC filings: extract all files, apply a schema, extract tables, split by topic, and apply per-topic schemas.
from pulse import Pulse
from pulse.types.batch_input_source import BatchInputSource
from pulse.types.batch_output_destination import BatchOutputDestination
from pulse.types.schema_config import SchemaConfig
from pulse.types.split_config import SplitConfig
from pulse.types.tables_config import TablesConfig
from pulse.types.topic_definition import TopicDefinition
from pulse.types.topic_schema_config import TopicSchemaConfig
import time

client = Pulse(api_key="YOUR_API_KEY")

def wait_for_job(job_id: str) -> dict:
    while True:
        job = client.jobs.get_job(job_id=job_id)
        if job.status in ("completed", "failed", "canceled"):
            return {"status": job.status, "result": job.result}
        time.sleep(5)

# Step 1: Batch Extract
extract = client.batch.extract(
    input=BatchInputSource(s_3_prefix="s3://my-bucket/10-K/"),
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/extractions/"),
    workers=10,
)
extract_result = wait_for_job(extract.batch_job_id)

# Step 2: Batch Schema
schema = client.batch.schema(
    batch_extract_id=extract.batch_job_id,
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/schemas/"),
    schema_config=SchemaConfig(
        input_schema={"type": "object", "properties": {"revenue": {"type": "number"}}},
    ),
    workers=10,
)
wait_for_job(schema.batch_job_id)

# Step 3: Batch Tables
tables = client.batch.tables(
    batch_extract_id=extract.batch_job_id,
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/tables/"),
    tables_config=TablesConfig(merge=True, table_format="html"),
    workers=10,
)
wait_for_job(tables.batch_job_id)

# Step 4: Batch Split
split = client.batch.split(
    batch_extract_id=extract.batch_job_id,
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/splits/"),
    split_config=SplitConfig(split_input=[
        TopicDefinition(name="financials", description="Financial statements"),
        TopicDefinition(name="risk_factors", description="Risk disclosures"),
    ]),
    workers=10,
)
wait_for_job(split.batch_job_id)

# Step 5: Batch Schema (split mode)
split_schema = client.batch.schema(
    batch_split_id=split.batch_job_id,
    output=BatchOutputDestination(s_3_prefix="s3://my-bucket/results/schema-splits/"),
    split_schema_config={
        "financials": TopicSchemaConfig(
            schema_={"type": "object", "properties": {"revenue": {"type": "number"}}},
        ),
        "risk_factors": TopicSchemaConfig(
            schema_={"type": "object", "properties": {"risks": {"type": "array", "items": {"type": "string"}}}},
        ),
    },
    workers=10,
)
wait_for_job(split_schema.batch_job_id)

Extract

Individual file extraction — config options apply to Batch Extract

Schema

Single/split schema extraction — config options apply to Batch Schema

Tables

Table extraction — config options apply to Batch Tables

Split

Topic splitting — config options apply to Batch Split

Poll Job

Poll batch job progress

Cancel Job

Cancel a batch job and all child jobs