data-extraction: align with llama-cloud v2 SDK (#253)

This commit is contained in:
Adrian Lyjak
2026-04-15 18:43:13 -04:00
committed by GitHub
parent 86ea63a0af
commit 263d8dd0a8
5 changed files with 146 additions and 99 deletions
+23 -10
View File
@@ -1,7 +1,8 @@
{
"extract": {
"extraction_agent_id": null,
"json_schema": {
"product_type": "extract_v2",
"configuration_id": null,
"data_schema": {
"type": "object",
"properties": {
"document_type": {
@@ -20,13 +21,25 @@
},
"required": ["document_type", "summary", "key_points"]
},
"settings": {
"extraction_mode": "PREMIUM",
"system_prompt": null,
"citation_bbox": true,
"use_reasoning": false,
"cite_sources": true,
"confidence_scores": true
}
"tier": "agentic",
"extraction_target": "per_doc",
"cite_sources": true,
"confidence_scores": true
},
"classify": {
"product_type": "classify_v2",
"configuration_id": null,
"rules": []
},
"parse": {
"product_type": "parse_v2",
"configuration_id": null,
"tier": "agentic",
"version": "latest"
},
"split": {
"product_type": "split_v1",
"configuration_id": null,
"categories": []
}
}
+1 -1
View File
@@ -5,7 +5,7 @@ description = "Extracts data"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"llama-cloud>=1.3.0,<2",
"llama-cloud>=2.3.0,<3",
"json-schema-to-pydantic>=0.4.8",
"llama-index-workflows>=2.16.0,<3.0.0",
"python-dotenv>=1.1.0",
+55 -35
View File
@@ -2,24 +2,19 @@
Configuration for the extraction review application.
Configuration is loaded from configs/config.json via ResourceConfig.
The unified config contains both extraction settings and the JSON schema.
Extraction can run in two modes, controlled by the "extraction_agent_id" field
in configs/config.json:
- Local (default): extraction_agent_id is null. Uses the json_schema and
settings defined in config.json directly via extraction.run().
- Remote agent: extraction_agent_id is set to a LlamaCloud extraction agent
ID. Uses extraction.jobs.extract(extraction_agent_id=...) which delegates
schema and settings to the remote agent. The local json_schema and settings
in config.json are ignored — both extraction and the metadata workflow fetch
the schema directly from the remote agent.
Each top-level key in config.json maps to an SDK product-configuration type:
the discriminated union members returned by `client.configurations.retrieve`.
Each template-side subclass adds an optional `configuration_id` so a key
can either carry an inline snapshot OR point at a saved platform config.
"""
import logging
from typing import Any, Literal
from llama_cloud.types.beta.split_category import SplitCategory
from llama_cloud.types.classify_v2_parameters import ClassifyV2Parameters, Rule
from llama_cloud.types.extract_v2_parameters import ExtractV2Parameters
from llama_cloud.types.parse_v2_parameters import ParseV2Parameters
from llama_cloud.types.split_v1_parameters import SplitV1Parameters
from pydantic import BaseModel
from .json_util import get_extraction_schema as get_extraction_schema
@@ -31,29 +26,54 @@ logger = logging.getLogger(__name__)
EXTRACTED_DATA_COLLECTION: str = "extraction-review"
class ExtractSettings(BaseModel):
extraction_mode: Literal["FAST", "PREMIUM", "MULTIMODAL"]
system_prompt: str | None = None
citation_bbox: bool = False
use_reasoning: bool = False
cite_sources: bool = False
confidence_scores: bool = False
class ExtractConfig(ExtractV2Parameters):
"""Extract product configuration.
Inherits the SDK `ExtractV2Parameters` shape (product_type="extract_v2",
data_schema, tier, cite_sources, confidence_scores, extraction_target, ...).
Set `configuration_id` to a saved LlamaCloud configuration id (cfg-...)
to pull the parameters from the platform instead of using the local values.
"""
configuration_id: str | None = None
class ExtractConfig(BaseModel):
json_schema: dict[str, Any]
settings: ExtractSettings
# Set this to a LlamaCloud extraction agent ID to use a remote agent's
# schema and settings instead of the local json_schema/settings above.
# When set, extraction uses extraction.jobs.extract(extraction_agent_id=...)
# and the local settings are ignored for extraction.
extraction_agent_id: str | None = None
class ClassifyConfig(ClassifyV2Parameters):
"""Classify product configuration.
Inherits the SDK `ClassifyV2Parameters` shape. Overrides `rules` default
to `[]` so an unused classify slot validates without a rule list.
"""
rules: list[Rule] = []
configuration_id: str | None = None
class JsonSchema(BaseModel):
type: str = "object"
properties: dict[str, Any] = {}
required: list[str] = []
class ParseConfig(ParseV2Parameters):
"""Parse product configuration.
def to_dict(self) -> dict[str, Any]:
return self.model_dump(exclude_none=True)
Inherits the SDK `ParseV2Parameters` shape (product_type="parse_v2",
tier, version, plus structured option groups).
"""
configuration_id: str | None = None
class SplitConfig(SplitV1Parameters):
"""Split product configuration.
Inherits the SDK `SplitV1Parameters` shape. Overrides `categories` default
to `[]` so an unused split slot validates without categories.
"""
categories: list[SplitCategory] = []
configuration_id: str | None = None
class Config(BaseModel):
"""Root configuration model for configs/config.json."""
extract: ExtractConfig
classify: ClassifyConfig
parse: ParseConfig
split: SplitConfig
+19 -24
View File
@@ -1,13 +1,13 @@
from typing import Annotated, Any
import jsonref
from llama_cloud import AsyncLlamaCloud
from llama_cloud.types.configuration_response import ExtractV2Parameters
from workflows import Workflow, step
from workflows.events import StartEvent, StopEvent
from workflows.resource import Resource, ResourceConfig
from workflows.resource import ResourceConfig
from .clients import get_llama_cloud_client
from .config import EXTRACTED_DATA_COLLECTION, ExtractConfig, JsonSchema
from .clients import get_llama_cloud_client, project_id
from .config import EXTRACTED_DATA_COLLECTION, ExtractConfig
class MetadataResponse(StopEvent):
@@ -22,15 +22,6 @@ class MetadataWorkflow(Workflow):
async def get_metadata(
self,
_: StartEvent,
extraction_schema: Annotated[
JsonSchema,
ResourceConfig(
config_file="configs/config.json",
path_selector="extract.json_schema",
label="Extraction Schema",
description="JSON Schema defining the fields to extract from documents",
),
],
extract_config: Annotated[
ExtractConfig,
ResourceConfig(
@@ -40,23 +31,27 @@ class MetadataWorkflow(Workflow):
description="Configuration for document extraction quality and features",
),
],
llama_cloud_client: Annotated[
AsyncLlamaCloud, Resource(get_llama_cloud_client)
],
) -> MetadataResponse:
"""Return the data schema and storage settings for the review interface.
When extraction_agent_id is set, fetches the schema from the remote
agent so the UI always reflects what the agent will actually extract.
Otherwise uses the local schema from config.json.
When `configuration_id` is set, fetches the schema from the saved
extract configuration so the UI always reflects what will actually be
extracted. Otherwise uses the local schema from config.json.
"""
if extract_config.extraction_agent_id:
agent = await llama_cloud_client.extraction.extraction_agents.get(
extract_config.extraction_agent_id
if extract_config.configuration_id:
client = get_llama_cloud_client()
config_resp = await client.configurations.retrieve(
extract_config.configuration_id,
project_id=project_id,
)
schema_dict = agent.data_schema
params = config_resp.parameters
if not isinstance(params, ExtractV2Parameters):
raise ValueError(
f"Configuration {extract_config.configuration_id} is not extract_v2"
)
schema_dict = dict(params.data_schema)
else:
schema_dict = extraction_schema.to_dict()
schema_dict = dict(extract_config.data_schema)
json_schema = jsonref.replace_refs(schema_dict, proxies=False)
return MetadataResponse(
+48 -29
View File
@@ -5,6 +5,7 @@ from typing import Annotated, Any, Literal
from llama_cloud import AsyncLlamaCloud
from llama_cloud.types.beta.extracted_data import ExtractedData, InvalidExtractionData
from llama_cloud.types.configuration_response import ExtractV2Parameters
from pydantic import BaseModel
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent
@@ -70,9 +71,14 @@ class ProcessFileWorkflow(Workflow):
file_id = event.file_id
logger.info(f"Running file {file_id}")
# Get file metadata (v2: files.list returns AsyncPaginator)
try:
files_page = await llama_cloud_client.files.list(file_ids=[file_id])
file_metadata = files_page.items[0]
file_metadata = None
async for f in llama_cloud_client.files.list(file_ids=[file_id]):
file_metadata = f
break
if file_metadata is None:
raise ValueError(f"File {file_id} not found")
filename = file_metadata.name
except Exception as e:
logger.error(f"Error fetching file metadata {file_id}: {e}", exc_info=True)
@@ -89,21 +95,24 @@ class ProcessFileWorkflow(Workflow):
Status(level="info", message=f"Extracting data from file {filename}")
)
if extract_config.extraction_agent_id:
# Remote agent mode: delegate schema and settings to the agent
extract_job = await llama_cloud_client.extraction.jobs.extract(
extraction_agent_id=extract_config.extraction_agent_id,
file_id=file_id,
if extract_config.configuration_id:
extract_job = await llama_cloud_client.extract.create(
file_input=file_id,
configuration_id=extract_config.configuration_id,
project_id=project_id,
)
else:
# Local mode: use schema and settings from config.json
extract_job = await llama_cloud_client.extraction.run(
config=extract_config.settings.model_dump(),
data_schema=extract_config.json_schema,
file_id=file_id,
extract_job = await llama_cloud_client.extract.create(
file_input=file_id,
configuration=extract_config.model_dump(
exclude={"configuration_id", "product_type"},
exclude_none=True,
),
project_id=project_id,
)
# Use file_hash from the event (computed by UI from file content)
# or fall back to external_file_id from file metadata for deduplication
file_hash = event.file_hash or file_metadata.external_file_id
async with ctx.store.edit_state() as state:
@@ -137,32 +146,41 @@ class ProcessFileWorkflow(Workflow):
if state.extract_job_id is None:
raise ValueError("Job ID cannot be null when waiting for its completion")
await llama_cloud_client.extraction.jobs.wait_for_completion(
state.extract_job_id
# Wait for extraction job to complete; v2 returns the completed job
# (with extract_result embedded) directly.
job = await llama_cloud_client.extract.wait_for_completion(
state.extract_job_id,
project_id=project_id,
)
extracted_result = await llama_cloud_client.extraction.jobs.get_result(
state.extract_job_id
)
extract_run = await llama_cloud_client.extraction.runs.get(
run_id=extracted_result.run_id
# Re-fetch with extract_metadata expansion for field-level metadata
job = await llama_cloud_client.extract.get(
state.extract_job_id,
expand=["extract_metadata"],
project_id=project_id,
)
extracted_event: ExtractedEvent | ExtractedInvalidEvent
try:
logger.info(
f"Extracted data: {json.dumps(extracted_result.model_dump(), indent=2)}"
f"Extracted data: {json.dumps(job.model_dump(mode='json'), indent=2, default=str)}"
)
if extract_config.extraction_agent_id:
# Remote agent mode: get schema from the agent
agent = await llama_cloud_client.extraction.extraction_agents.get(
extract_config.extraction_agent_id
# Resolve the schema that governs the extracted data.
if extract_config.configuration_id:
config_resp = await llama_cloud_client.configurations.retrieve(
extract_config.configuration_id,
project_id=project_id,
)
schema_class = get_extraction_schema(agent.data_schema)
params = config_resp.parameters
if not isinstance(params, ExtractV2Parameters):
raise ValueError(
f"Configuration {extract_config.configuration_id} is not extract_v2"
)
schema_class = get_extraction_schema(dict(params.data_schema))
else:
schema_class = get_extraction_schema(extract_config.json_schema)
data = ExtractedData.from_extraction_result(
result=extract_run,
schema_class = get_extraction_schema(dict(extract_config.data_schema))
data = ExtractedData.from_extract_job(
job=job,
schema=schema_class,
file_name=state.filename,
file_id=state.file_id,
@@ -224,6 +242,7 @@ workflow = ProcessFileWorkflow(timeout=None)
if __name__ == "__main__":
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()