llama-cloud v2: classify-extract-sec + extract-reconcile-invoice (#257)

This commit is contained in:
Adrian Lyjak
2026-04-16 19:37:26 -04:00
committed by GitHub
parent 0cdd4e8913
commit 1bdc5e2951
6 changed files with 248 additions and 119 deletions
+131 -24
View File
@@ -1,37 +1,80 @@
{
"extract": {
"extraction_agent_id": null,
"json_schema": {
"product_type": "extract_v2",
"configuration_id": null,
"data_schema": {
"type": "object",
"description": "Schema for extracting invoice data",
"properties": {
"invoice_number": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Invoice number or identifier"
},
"invoice_date": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Date of the invoice (YYYY-MM-DD format if possible)"
},
"vendor_name": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Name of the vendor or supplier"
},
"vendor_address": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Address of the vendor"
},
"purchase_order_number": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Purchase order (PO) number if present"
},
"payment_terms": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Payment terms (e.g., Net 30, Net 60, Due on receipt)"
},
@@ -41,22 +84,50 @@
"items": {
"properties": {
"description": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Description of the line item"
},
"quantity": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Quantity of the item"
},
"unit_price": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Price per unit of the item"
},
"total": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Total price for this line item"
}
@@ -65,35 +136,71 @@
},
"type": "array"
},
{ "type": "null" }
{
"type": "null"
}
],
"default": null,
"description": "List of line items on the invoice"
},
"subtotal": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Subtotal before tax and other charges"
},
"tax": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Tax amount"
},
"total": {
"anyOf": [{ "type": "number" }, { "type": "null" }],
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Total amount due on the invoice"
}
}
},
"settings": {
"extraction_mode": "PREMIUM",
"system_prompt": null,
"citation_bbox": true,
"use_reasoning": false,
"cite_sources": true,
"confidence_scores": true
}
"tier": "agentic",
"extraction_target": "per_doc",
"system_prompt": null,
"cite_sources": true,
"confidence_scores": true
},
"classify": {
"product_type": "classify_v2",
"configuration_id": null,
"rules": []
},
"parse": {
"product_type": "parse_v2",
"configuration_id": null,
"tier": "agentic",
"version": "latest"
},
"split": {
"product_type": "split_v1",
"configuration_id": null,
"categories": []
}
}
+1 -1
View File
@@ -5,7 +5,7 @@ description = "Extracts data"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"llama-cloud>=1.3.0,<2",
"llama-cloud>=2.3.0,<3",
"json-schema-to-pydantic>=0.4.8",
"llama-index-workflows>=2.16.0,<3.0.0",
"python-dotenv>=1.1.0",
+10 -13
View File
@@ -1,7 +1,7 @@
import logging
import os
from llama_cloud import AsyncLlamaCloud, ConflictError
from llama_cloud import AsyncLlamaCloud
from llama_index.llms.openai import OpenAI
from extraction_review.config import CONTRACTS_INDEX_NAME
@@ -34,18 +34,15 @@ async def get_contracts_pipeline_id() -> str:
global _contracts_pipeline_id
if _contracts_pipeline_id is None:
client = get_llama_cloud_client()
try:
pipeline = await client.pipelines.upsert(
name=CONTRACTS_INDEX_NAME,
project_id=project_id,
)
except ConflictError:
# Pipeline already exists — look it up by name
pipelines = await client.pipelines.list(
pipeline_name=CONTRACTS_INDEX_NAME,
project_id=project_id,
)
pipeline = pipelines[0]
# data_sink_id=None + transform_config are both required; omitting
# either trips a misleading 409 "integrity constraint" error. Leaving
# embedding_config unset lets the platform use managed OpenAI embeddings.
pipeline = await client.pipelines.upsert(
name=CONTRACTS_INDEX_NAME,
project_id=project_id,
data_sink_id=None,
transform_config={"mode": "auto"},
)
_contracts_pipeline_id = pipeline.id
return _contracts_pipeline_id
+46 -36
View File
@@ -2,24 +2,20 @@
Configuration for the extraction review application.
Configuration is loaded from configs/config.json via ResourceConfig.
The unified config contains both extraction settings and the JSON schema.
Extraction can run in two modes, controlled by the "extraction_agent_id" field
in configs/config.json:
- Local (default): extraction_agent_id is null. Uses the json_schema and
settings defined in config.json directly via extraction.run().
- Remote agent: extraction_agent_id is set to a LlamaCloud extraction agent
ID. Uses extraction.jobs.extract(extraction_agent_id=...) which delegates
schema and settings to the remote agent. The local json_schema and settings
in config.json are ignored — both extraction and the metadata workflow fetch
the schema directly from the remote agent.
Each top-level key in config.json maps to an SDK product-configuration type:
the discriminated union members returned by `client.configurations.retrieve`.
Each template-side subclass adds an optional `configuration_id` so a key
can either carry an inline snapshot OR point at a saved platform config.
"""
import logging
from typing import Any, Literal
import os
from llama_cloud.types.beta.split_category import SplitCategory
from llama_cloud.types.classify_v2_parameters import ClassifyV2Parameters, Rule
from llama_cloud.types.extract_v2_parameters import ExtractV2Parameters
from llama_cloud.types.parse_v2_parameters import ParseV2Parameters
from llama_cloud.types.split_v1_parameters import SplitV1Parameters
from pydantic import BaseModel, Field
from .json_util import get_extraction_schema as get_extraction_schema
@@ -31,35 +27,49 @@ logger = logging.getLogger(__name__)
EXTRACTED_DATA_COLLECTION: str = "invoices"
# The name of the LlamaCloud index for storing contracts
CONTRACTS_INDEX_NAME: str = "contracts"
# Override with CONTRACTS_INDEX_NAME env var (useful when running against a
# shared staging project where the default name may collide with other data).
CONTRACTS_INDEX_NAME: str = os.getenv("CONTRACTS_INDEX_NAME", "contracts")
class ExtractSettings(BaseModel):
extraction_mode: Literal["FAST", "PREMIUM", "MULTIMODAL"]
system_prompt: str | None = None
citation_bbox: bool = False
use_reasoning: bool = False
cite_sources: bool = False
confidence_scores: bool = False
class ExtractConfig(ExtractV2Parameters):
"""Extract product configuration.
Inherits the SDK `ExtractV2Parameters` shape. Set `configuration_id`
to a saved LlamaCloud configuration id (cfg-...) to pull parameters
from the platform instead of using the local values.
"""
configuration_id: str | None = None
class ExtractConfig(BaseModel):
json_schema: dict[str, Any]
settings: ExtractSettings
# Set this to a LlamaCloud extraction agent ID to use a remote agent's
# schema and settings instead of the local json_schema/settings above.
# When set, extraction uses extraction.jobs.extract(extraction_agent_id=...)
# and the local settings are ignored for extraction.
extraction_agent_id: str | None = None
class ClassifyConfig(ClassifyV2Parameters):
"""Classify product configuration (extension slot, unused by the workflow)."""
rules: list[Rule] = []
configuration_id: str | None = None
class JsonSchema(BaseModel):
type: str = "object"
properties: dict[str, Any] = {}
required: list[str] = []
class ParseConfig(ParseV2Parameters):
"""Parse product configuration (extension slot)."""
def to_dict(self) -> dict[str, Any]:
return self.model_dump(exclude_none=True)
configuration_id: str | None = None
class SplitConfig(SplitV1Parameters):
"""Split product configuration (extension slot)."""
categories: list[SplitCategory] = []
configuration_id: str | None = None
class Config(BaseModel):
"""Root configuration model for configs/config.json."""
extract: ExtractConfig
classify: ClassifyConfig
parse: ParseConfig
split: SplitConfig
# Invoice extraction schema - extracted from invoice documents
+17 -19
View File
@@ -1,12 +1,13 @@
from typing import Annotated, Any
import jsonref
from llama_cloud.types.configuration_response import ExtractV2Parameters
from workflows import Workflow, step
from workflows.events import StartEvent, StopEvent
from workflows.resource import ResourceConfig
from .clients import get_contracts_pipeline_id, get_llama_cloud_client
from .config import EXTRACTED_DATA_COLLECTION, ExtractConfig, JsonSchema
from .clients import get_contracts_pipeline_id, get_llama_cloud_client, project_id
from .config import EXTRACTED_DATA_COLLECTION, ExtractConfig
class MetadataResponse(StopEvent):
@@ -22,15 +23,6 @@ class MetadataWorkflow(Workflow):
async def get_metadata(
self,
_: StartEvent,
extraction_schema: Annotated[
JsonSchema,
ResourceConfig(
config_file="configs/config.json",
path_selector="extract.json_schema",
label="Extraction Schema",
description="JSON Schema defining the fields to extract from documents",
),
],
extract_config: Annotated[
ExtractConfig,
ResourceConfig(
@@ -43,18 +35,24 @@ class MetadataWorkflow(Workflow):
) -> MetadataResponse:
"""Return the data schema and storage settings for the review interface.
When extraction_agent_id is set, fetches the schema from the remote
agent so the UI always reflects what the agent will actually extract.
Otherwise uses the local schema from config.json.
When `configuration_id` is set, fetches the schema from the saved
extract configuration so the UI always reflects what will actually be
extracted. Otherwise uses the local schema from config.json.
"""
if extract_config.extraction_agent_id:
if extract_config.configuration_id:
client = get_llama_cloud_client()
agent = await client.extraction.extraction_agents.get(
extract_config.extraction_agent_id
config_resp = await client.configurations.retrieve(
extract_config.configuration_id,
project_id=project_id,
)
schema_dict = agent.data_schema
params = config_resp.parameters
if not isinstance(params, ExtractV2Parameters):
raise ValueError(
f"Configuration {extract_config.configuration_id} is not extract_v2"
)
schema_dict = dict(params.data_schema)
else:
schema_dict = extraction_schema.to_dict()
schema_dict = dict(extract_config.data_schema)
json_schema = jsonref.replace_refs(schema_dict, proxies=False)
contracts_pipeline_id = await get_contracts_pipeline_id()
+43 -26
View File
@@ -26,6 +26,19 @@ from .config import (
InvoiceWithReconciliation,
)
def _field_metadata_dict(job: Any) -> dict[str, Any]:
"""Pull document-level per-field metadata from a v2 extract job.
ExtractedData.create() only accepts dict/list/ExtractedFieldMetadata values,
so None entries for unextracted fields must be dropped.
"""
if job.extract_metadata is None or job.extract_metadata.field_metadata is None:
return {}
doc_metadata = job.extract_metadata.field_metadata.document_metadata or {}
return {k: v for k, v in doc_metadata.items() if v is not None}
logger = logging.getLogger(__name__)
@@ -95,10 +108,14 @@ class ProcessFileWorkflow(Workflow):
logger.info(f"Running file {file_id}")
try:
files_page = await llama_cloud_client.files.list(
file_metadata = None
async for f in llama_cloud_client.files.list(
file_ids=[file_id], project_id=project_id
)
file_metadata = files_page.items[0]
):
file_metadata = f
break
if file_metadata is None:
raise ValueError(f"File {file_id} not found")
filename = file_metadata.name
except Exception as e:
logger.error(f"Error fetching file metadata {file_id}: {e}", exc_info=True)
@@ -115,18 +132,19 @@ class ProcessFileWorkflow(Workflow):
Status(level="info", message=f"Extracting data from file {filename}")
)
if extract_config.extraction_agent_id:
# Remote agent mode: delegate schema and settings to the agent
extract_job = await llama_cloud_client.extraction.jobs.extract(
extraction_agent_id=extract_config.extraction_agent_id,
file_id=file_id,
if extract_config.configuration_id:
extract_job = await llama_cloud_client.extract.create(
file_input=file_id,
configuration_id=extract_config.configuration_id,
project_id=project_id,
)
else:
# Local mode: use schema and settings from config.json
extract_job = await llama_cloud_client.extraction.run(
config=extract_config.settings.model_dump(),
data_schema=extract_config.json_schema,
file_id=file_id,
extract_job = await llama_cloud_client.extract.create(
file_input=file_id,
configuration=extract_config.model_dump(
exclude={"configuration_id", "product_type"},
exclude_none=True,
),
project_id=project_id,
)
@@ -163,28 +181,27 @@ class ProcessFileWorkflow(Workflow):
if state.extract_job_id is None:
raise ValueError("Job ID cannot be null when waiting for its completion")
await llama_cloud_client.extraction.jobs.wait_for_completion(
state.extract_job_id
await llama_cloud_client.extract.wait_for_completion(
state.extract_job_id,
project_id=project_id,
)
job = await llama_cloud_client.extract.get(
state.extract_job_id,
expand=["extract_metadata"],
project_id=project_id,
)
extracted_result = await llama_cloud_client.extraction.jobs.get_result(
state.extract_job_id
)
try:
logger.info(
f"Extracted data: {json.dumps(extracted_result.model_dump(), indent=2)}"
f"Extracted data: {json.dumps(job.model_dump(mode='json'), indent=2, default=str)}"
)
# Validate the extracted data as invoice
if not extracted_result.data:
if not job.extract_result:
raise ValueError("No data extracted from invoice")
invoice_data = InvoiceExtractionSchema.model_validate(extracted_result.data)
invoice_data = InvoiceExtractionSchema.model_validate(job.extract_result)
logger.info(f"Extracted invoice data: {invoice_data}")
# Extract only the field_metadata we need, not the entire ExtractRun object
field_metadata = extracted_result.extraction_metadata.get(
"field_metadata", {}
)
field_metadata = _field_metadata_dict(job)
return ExtractedEvent(
invoice_data=invoice_data, field_metadata=field_metadata
)