> ## Documentation Index
> Fetch the complete documentation index at: https://docs.runpulse.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Chaining Steps

> Use extraction_id, split_id, schema_id, tables_id, and job_id to connect Pulse workflow steps.

Pulse workflows are chains of small, reusable steps. Each step returns an ID that lets the next step reuse work instead of re-uploading or re-processing the same document.

## The Golden Path

```mermaid theme={null}
sequenceDiagram
    participant App
    participant Pulse
    App->>Pulse: POST /extract
    Pulse-->>App: extraction_id
    App->>Pulse: POST /split with extraction_id
    Pulse-->>App: split_id
    App->>Pulse: POST /schema with split_id
    Pulse-->>App: schema_id + values
```

## ID Handoffs

| You have        | You can call next          | Why                                                                  |
| --------------- | -------------------------- | -------------------------------------------------------------------- |
| `job_id`        | `GET /job/{jobId}`         | Check async status and retrieve the result.                          |
| `extraction_id` | `/schema`                  | Extract structured data from the whole document.                     |
| `extraction_id` | `/tables`                  | Extract table structure from the document.                           |
| `extraction_id` | `/split`                   | Assign pages to topics.                                              |
| `split_id`      | `/schema`                  | Apply different schemas to topic page groups.                        |
| `split_id`      | `/tables`                  | Extract tables scoped to split topics.                               |
| `schema_id`     | `/schema/{schemaId}/excel` | Download a filled Excel template when schema template mode was used. |

## Extract -> Schema

```python theme={null}
from pulse import Pulse

client = Pulse(api_key="YOUR_API_KEY")

extract_result = client.extract(file=open("invoice.pdf", "rb"))

schema_result = client.schema(
    extraction_id=extract_result.extraction_id,
    schema_config={
        "input_schema": {
            "type": "object",
            "properties": {
                "invoice_number": {"type": "string"},
                "total_amount": {"type": "number"}
            }
        }
    }
)

print(schema_result.schema_output["values"])
```

## Extract -> Tables

```python theme={null}
extract_result = client.extract(file=open("10k.pdf", "rb"))

tables_result = client.tables(
    extraction_id=extract_result.extraction_id,
    tables_config={
        "merge": True,
        "charts_to_tables": True
    }
)

for table in tables_result.tables_output["tables"]:
    print(table["table_content"])
```

## Extract -> Split -> Schema

```python theme={null}
extract_result = client.extract(file=open("annual-report.pdf", "rb"))

split_result = client.split(
    extraction_id=extract_result.extraction_id,
    split_config={
        "split_input": [
            {"name": "Financials", "description": "Financial statements and metrics"},
            {"name": "Leadership", "description": "Executives and board members"}
        ]
    }
)

schema_result = client.schema(
    split_id=split_result.split_id,
    split_schema_config={
        "Financials": {
            "schema": {
                "type": "object",
                "properties": {
                    "revenue": {"type": "number"},
                    "net_income": {"type": "number"}
                }
            }
        },
        "Leadership": {
            "schema": {
                "type": "object",
                "properties": {
                    "ceo": {"type": "string"}
                }
            }
        }
    }
)
```

## Async Chaining

When you set `async: true`, wait for completion before passing the result to the next step.

```python theme={null}
job = client.extract(
    file=open("large-file.pdf", "rb"),
    async_=True
)

while True:
    status = client.jobs.get_job(job_id=job.job_id)
    if status.status == "completed":
        extract_result = status.result
        break
    if status.status in ["failed", "canceled"]:
        raise RuntimeError(status.status)

schema_result = client.schema(
    extraction_id=extract_result["extraction_id"],
    schema_config={"input_schema": {"type": "object", "properties": {}}}
)
```

## Common Mistakes

<AccordionGroup>
  <Accordion title="Passing job_id where extraction_id is required">
    A `job_id` is for polling. After the job completes, read the completed result and pass its `extraction_id` into `/schema`, `/split`, or `/tables`.
  </Accordion>

  <Accordion title="Disabling storage before downstream steps">
    Downstream steps need saved extraction artifacts. Keep storage enabled when you plan to chain.
  </Accordion>

  <Accordion title="Using Split when a page range is enough">
    If the target pages are always known, pass `pages` or a table `page_range`. Use Split when topic location changes by document.
  </Accordion>

  <Accordion title="Using Schema for a table-first workflow">
    Schema is great for named fields. Use Tables when preserving row and column relationships is the product.
  </Accordion>
</AccordionGroup>

## Related

<CardGroup cols={2}>
  <Card title="Pipeline Overview" icon="diagram-project" href="/api-reference/endpoint/pipeline-overview">
    See supported API pipeline shapes.
  </Card>

  <Card title="Moving from Platform to Production" icon="rocket" href="/platform-reference/platform-to-production">
    Generate chained SDK calls from the Playground.
  </Card>
</CardGroup>
