Complex intake forms contain rich structured data, but that structure is implicit rather than explicit: nested sections, conditional fields, repeated entities, and layout-dependent semantics. Conventional OCR pipelines flatten these documents into plain text, after which regexes or heuristic parsers attempt to reconstruct structure that was already lost. This approach is inherently brittle and fails under schema variation, layout drift, or partial updates.

In this article, we present an end-to-end pipeline for direct structured extraction from PDFs that emits typed, Pydantic-validated models, without intermediate text normalization, markdown conversion, or layout-specific rules.

The system combines:

Rather than treating PDFs as unstructured text, we treat them as structured visual inputs and extract data directly into application-level schemas. This enables deterministic validation, incremental reprocessing, schema evolution, and production-grade reliability across heterogeneous document formats.

The entire code is open sourced with Apache 2.0 license. To see more examples build with CocoIndex, you could refer to the examples page. ⭐ Star the project if you find it helpful! 

Why DSPy + CocoIndex?

This pipeline combines two complementary abstractions: schema-constrained LLM computation and incremental dataflow execution. Each addresses a different failure mode in production document extraction systems.

DSPy: A programming model for LLM pipelines

🔗 https://github.com/stanfordnlp/dspy

Most LLM-based extraction systems are built around prompt engineering: hand-written prompts with instructions, few-shot examples, and ad-hoc output parsing. This approach does not scale operationally:

DSPy replaces prompts-as-strings with a declarative programming model. Instead of specifying how to prompt the model, you specify what each step must produce: typed inputs, structured outputs, constraints, and validation rules. DSPy then automatically synthesizes and optimizes the underlying prompts to satisfy that specification.

In practice, this enables:

This makes multimodal extraction pipelines substantially more robust to change.

CocoIndex: Incremental execution engine for AI data pipelines

🔗 https://github.com/cocoindex-ai/cocoindex

CocoIndex is a high-performance data processing engine designed for AI workloads with incremental recomputation as a first-class primitive. Developers write straightforward Python transformations over in-memory objects; CocoIndex executes them as a resilient, scalable pipeline backed by a Rust engine.

Key properties:

In production, CocoIndex runs in “live” mode using polling or change data capture. Derived targets—databases, vector stores, or APIs—are continuously kept in sync with evolving unstructured sources such as PDFs, code repositories, and multi-hop API graphs.

Because every transformation is tracked with lineage and observability, the system provides built-in auditability and explainability—critical for regulated or compliance-sensitive workflows, but equally valuable for debugging and long-term maintenance.

Why the combination works

DSPy guarantees correctness at the model boundary.
CocoIndex guaranteescorrectness over time.

Together, they form a pipeline where structured extraction is:

Flow Overview

Prerequisites

Before getting started, make sure you have the following set up:

  1. Install Postgres if you don't have one, ensure you can connect to it from your development environment.
  2. Python dependencies
pip install -U cocoindex dspy-ai pydantic pymupdf
  1. Create a .env file:
# Postgres database address for cocoindex
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex

# Gemini API key
GEMINI_API_KEY=YOUR_GEMINI_API_KEY

Pydantic Models: Define the structured schema

We defined Pydantic-style classes (ContactAddressInsurance, etc.) to match a FHIR-inspired patient schema, enabling structured and validated representations of patient data. Each model corresponds to a key aspect of a patient's record, ensuring both type safety and nested relationships.

1. Contact Model

class Contact(BaseModel):
    name: str
    phone: str
    relationship: str

2. Address Model

class Address(BaseModel):
    street: str
    city: str
    state: str
    zip_code: str

3. Pharmacy Model

class Pharmacy(BaseModel):
    name: str
    phone: str
    address: Address

4. Insurance Model

class Insurance(BaseModel):
    provider: str
    policy_number: str
    group_number: str | None = None
    policyholder_name: str
    relationship_to_patient: str

5. Condition Model

class Condition(BaseModel):
    name: str
    diagnosed: bool

6. Medication Model

class Medication(BaseModel):
    name: str
    dosage: str

7. Allergy Model

class Allergy(BaseModel):
    name: str

8. Surgery Model

class Surgery(BaseModel):
    name: str
    date: str

9. Patient Model

class Patient(BaseModel):
    name: str
    dob: datetime.date
    gender: str
    address: Address
    phone: str
    email: str
    preferred_contact_method: str
    emergency_contact: Contact
    insurance: Insurance | None = None
    reason_for_visit: str
    symptoms_duration: str
    past_conditions: list[Condition] = Field(default_factory=list)
    current_medications: list[Medication] = Field(default_factory=list)
    allergies: list[Allergy] = Field(default_factory=list)
    surgeries: list[Surgery] = Field(default_factory=list)
    occupation: str | None = None
    pharmacy: Pharmacy | None = None
    consent_given: bool
    consent_date: str | None = None

Why Use Pydantic Here?

  1. Validation: Ensures all fields are the correct type (e.g., dob is a date).
  2. Structured Nested Models: Patient has nested objects like AddressContact, and Insurance.
  3. Default Values & Optional Fields: Handles optional fields and defaults (Field(default_factory=list) ensures empty lists if no data).
  4. Serialization: Easily convert models to JSON for APIs or databases.
  5. Error Checking: Automatically raises errors if invalid data is provided.

DSPy Vision Extractor

DSPy Signature

Let’s define PatientExtractionSignature. A Signature describes what data your module expects and what it will produce. Think of it as a schema for an AI task.

PatientExtractionSignature is a dspy.Signature, which is DSPy's way of declaring what the model should do, not how it does it.

# DSPy Signature for patient information extraction from images
class PatientExtractionSignature(dspy.Signature):
    """Extract structured patient information from a medical intake form image."""

    form_images: list[dspy.Image] = dspy.InputField(
        desc="Images of the patient intake form pages"
    )
    patient: Patient = dspy.OutputField(
        desc="Extracted patient information with all available fields filled"
    )

This signature defines task contract for patient information extraction.

From DSPy's point of view, this Signature is a "spec": a mapping from an image-based context to a structured, Pydantic-backed semantic object that can later be optimized, trained, and composed with other modules.

PatientExtractor Module

PatientExtractor is a dspy.Module, which in DSPy is a composable, potentially trainable building block that implements the Signature.

class PatientExtractor(dspy.Module):
    """DSPy module for extracting patient information from intake form images."""

    def __init__(self) -> None:
        super().__init__()
        self.extract = dspy.ChainOfThought(PatientExtractionSignature)

    def forward(self, form_images: list[dspy.Image]) -> Patient:
        """Extract patient information from form images and return as a Pydantic model."""
        result = self.extract(form_images=form_images)
        return result.patient  # type: ignore

Conceptually, PatientExtractor is an ETL operator: the Signature describes the input/output types, and the internal ChainOfThought module is the function that fills that contract.

Single-Step Extraction

Now let’s wire the DSPy Module to extract from a single PDF. From high level,

@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient:
    """Extract patient information from PDF content."""

    # Convert PDF pages to DSPy Image objects
    pdf_doc = pymupdf.open(stream=pdf_content, filetype="pdf")

    form_images = []
    for page in pdf_doc:
        # Render page to pixmap (image) at 2x resolution for better quality
        pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2))
        # Convert to PNG bytes
        img_bytes = pix.tobytes("png")
        # Create DSPy Image from bytes
        form_images.append(dspy.Image(img_bytes))

    pdf_doc.close()

    # Extract patient information using DSPy with vision
    extractor = PatientExtractor()
    patient = extractor(form_images=form_images)

    return patient  # type: ignore

This function is a CocoIndex function (decorated with @cocoindex.op.function) that takes PDF bytes as input and returns a fully structured Patient Pydantic object.

Create DSPy Image objects

We open PDF from bytes using PyMuPDF (pymupdf), then we iterate over each page.

DSPy Extraction

The list of form_images is passed to the DSPy module:

  1. ChainOfThought reasoning interprets each image.
  2. Vision + NLP extract relevant text fields.
  3. Populate Pydantic Patient object with structured patient info.

CocoIndex Flow

Declare Flow

Declare a CocoIndex flow, connect to the source, add a data collector to collect processed data.

@cocoindex.flow_def(name="PatientIntakeExtractionDSPy")
def patient_intake_extraction_dspy_flow(
    flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.LocalFile(path="data/patient_forms", binary=True)
    )

    patients_index = data_scope.add_collector()

You can connect to various sources, or even custom source with CocoIndex if native connectors are not available. CocoIndex is designed to keep your indexes synchronized with your data sources. This is achieved through a feature called live updates, which automatically detects changes in your sources and updates your indexes accordingly. This ensures that your search results and data analysis are always based on the most current information. You can read more here https://cocoindex.io/docs/tutorials/live_updates

Process documents

with data_scope["documents"].row() as doc:
    # Extract patient information directly from PDF using DSPy with vision
    # (PDF->Image conversion happens inside the extractor)
    doc["patient_info"] = doc["content"].transform(extract_patient)

    # Collect the extracted patient information
    patients_index.collect(
        filename=doc["filename"],
        patient_info=doc["patient_info"],
    )

This iterates over each document. We transform doc["content"] (the bytes) by our extract_patient function. The result is stored in a new field patient_info.

Then we collect a row with the filename and extracted patient_info.

Export to Postgres

patients_index.export(
    "patients",
    cocoindex.storages.Postgres(table_name="patients_info_dspy"),
    primary_key_fields=["filename"],
)

We export the collected index to Postgres. This will create/maintain a table patients keyed by filename, automatically deleting or updating rows if inputs change.

Because CocoIndex tracks data lineage, it will handle updates/deletions of source files incrementally.

Configure CocoIndex settings

Define a CocoIndex settings function that configures the AI model for DSPy:

@cocoindex.settings
def cocoindex_settings() -> cocoindex.Settings:
    # Configure the model used in DSPy
    lm = dspy.LM("gemini/gemini-2.5-flash")
    dspy.configure(lm=lm)

    return cocoindex.Settings.from_env()

It returns a cocoindex.Settings object initialized from environment variables, enabling the system to use the configured model and environment settings for all DSPy operations.

Running the Pipeline

Update the index:

cocoindex update main

CocoInsight

I used CocoInsight (Free beta now) to troubleshoot the index generation and understand the data lineage of the pipeline. It just connects to your local CocoIndex server, with zero pipeline data retention.

cocoindex server -ci main

Scalable open ecosystem (not a closed stack)

CocoIndex is designed to be composable by default. It provides a high-performance, incremental execution engine and a minimal flow abstraction, without coupling users to a specific model provider, vector database, transformation framework, or orchestration system.

All core concepts—sources, operators, and storage targets—are expressed as pluggable interfaces rather than proprietary primitives. A flow can ingest data from local files, object storage (e.g. S3), APIs, or custom sources; apply arbitrary transformation logic (SQL, DSPy modules, generated parsers, or general Python); and materialize results into relational databases, vector stores, search indexes, or custom sinks via the storage layer.

This design keeps CocoIndex opinionated about execution semantics (incrementality, caching, lineage), but intentionally unopinionated about modeling choices.


Why DSPy + CocoIndex aligns with this model

DSPy is itself a compositional programming framework for LLMs. Developers define typed Signatures and reusable Modules, and DSPy learns how to implement those contracts using the underlying model. This makes the LLM layer explicit, testable, and optimizable, rather than embedded in opaque prompt strings.

CocoIndex treats DSPy modules as first-class operators in a dataflow graph. This creates a clean separation of responsibilities:

Neither system attempts to own the full stack. CocoIndex does not prescribe a prompt or LLM framework, and DSPy does not provide a pipeline engine. Instead, they interlock: DSPy modules become composable, typed transformation nodes inside CocoIndex flows, while CocoIndex supplies production guarantees such as batching, retries, incremental recomputation, and live updates over evolving PDFs, codebases, or APIs.

Support CocoIndex ❤️

If this example was helpful, the easiest way to support CocoIndex is to give the project a ⭐ on GitHub.

Your stars help us grow the community, stay motivated, and keep shipping better tools for real-time data ingestion and transformation.