diff --git a/configs/config.json b/configs/config.json index 16f5c7c..d33a274 100644 --- a/configs/config.json +++ b/configs/config.json @@ -1,7 +1,8 @@ { "extract": { - "extraction_agent_id": null, - "json_schema": { + "product_type": "extract_v2", + "configuration_id": null, + "data_schema": { "type": "object", "properties": { "document_type": { @@ -20,13 +21,25 @@ }, "required": ["document_type", "summary", "key_points"] }, - "settings": { - "extraction_mode": "PREMIUM", - "system_prompt": null, - "citation_bbox": true, - "use_reasoning": false, - "cite_sources": true, - "confidence_scores": true - } + "tier": "agentic", + "extraction_target": "per_doc", + "cite_sources": true, + "confidence_scores": true + }, + "classify": { + "product_type": "classify_v2", + "configuration_id": null, + "rules": [] + }, + "parse": { + "product_type": "parse_v2", + "configuration_id": null, + "tier": "agentic", + "version": "latest" + }, + "split": { + "product_type": "split_v1", + "configuration_id": null, + "categories": [] } } diff --git a/pyproject.toml b/pyproject.toml index 6e18df4..36ae5e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Extracts data" readme = "README.md" requires-python = ">=3.12" dependencies = [ - "llama-cloud>=1.3.0,<2", + "llama-cloud>=2.3.0,<3", "json-schema-to-pydantic>=0.4.8", "llama-index-workflows>=2.16.0,<3.0.0", "python-dotenv>=1.1.0", diff --git a/src/extraction_review/config.py b/src/extraction_review/config.py index a88ee2f..b169147 100644 --- a/src/extraction_review/config.py +++ b/src/extraction_review/config.py @@ -2,24 +2,19 @@ Configuration for the extraction review application. Configuration is loaded from configs/config.json via ResourceConfig. -The unified config contains both extraction settings and the JSON schema. - -Extraction can run in two modes, controlled by the "extraction_agent_id" field -in configs/config.json: - - - Local (default): extraction_agent_id is null. Uses the json_schema and - settings defined in config.json directly via extraction.run(). - - - Remote agent: extraction_agent_id is set to a LlamaCloud extraction agent - ID. Uses extraction.jobs.extract(extraction_agent_id=...) which delegates - schema and settings to the remote agent. The local json_schema and settings - in config.json are ignored — both extraction and the metadata workflow fetch - the schema directly from the remote agent. +Each top-level key in config.json maps to an SDK product-configuration type: +the discriminated union members returned by `client.configurations.retrieve`. +Each template-side subclass adds an optional `configuration_id` so a key +can either carry an inline snapshot OR point at a saved platform config. """ import logging -from typing import Any, Literal +from llama_cloud.types.beta.split_category import SplitCategory +from llama_cloud.types.classify_v2_parameters import ClassifyV2Parameters, Rule +from llama_cloud.types.extract_v2_parameters import ExtractV2Parameters +from llama_cloud.types.parse_v2_parameters import ParseV2Parameters +from llama_cloud.types.split_v1_parameters import SplitV1Parameters from pydantic import BaseModel from .json_util import get_extraction_schema as get_extraction_schema @@ -31,29 +26,54 @@ logger = logging.getLogger(__name__) EXTRACTED_DATA_COLLECTION: str = "extraction-review" -class ExtractSettings(BaseModel): - extraction_mode: Literal["FAST", "PREMIUM", "MULTIMODAL"] - system_prompt: str | None = None - citation_bbox: bool = False - use_reasoning: bool = False - cite_sources: bool = False - confidence_scores: bool = False +class ExtractConfig(ExtractV2Parameters): + """Extract product configuration. + + Inherits the SDK `ExtractV2Parameters` shape (product_type="extract_v2", + data_schema, tier, cite_sources, confidence_scores, extraction_target, ...). + Set `configuration_id` to a saved LlamaCloud configuration id (cfg-...) + to pull the parameters from the platform instead of using the local values. + """ + + configuration_id: str | None = None -class ExtractConfig(BaseModel): - json_schema: dict[str, Any] - settings: ExtractSettings - # Set this to a LlamaCloud extraction agent ID to use a remote agent's - # schema and settings instead of the local json_schema/settings above. - # When set, extraction uses extraction.jobs.extract(extraction_agent_id=...) - # and the local settings are ignored for extraction. - extraction_agent_id: str | None = None +class ClassifyConfig(ClassifyV2Parameters): + """Classify product configuration. + + Inherits the SDK `ClassifyV2Parameters` shape. Overrides `rules` default + to `[]` so an unused classify slot validates without a rule list. + """ + + rules: list[Rule] = [] + configuration_id: str | None = None -class JsonSchema(BaseModel): - type: str = "object" - properties: dict[str, Any] = {} - required: list[str] = [] +class ParseConfig(ParseV2Parameters): + """Parse product configuration. - def to_dict(self) -> dict[str, Any]: - return self.model_dump(exclude_none=True) + Inherits the SDK `ParseV2Parameters` shape (product_type="parse_v2", + tier, version, plus structured option groups). + """ + + configuration_id: str | None = None + + +class SplitConfig(SplitV1Parameters): + """Split product configuration. + + Inherits the SDK `SplitV1Parameters` shape. Overrides `categories` default + to `[]` so an unused split slot validates without categories. + """ + + categories: list[SplitCategory] = [] + configuration_id: str | None = None + + +class Config(BaseModel): + """Root configuration model for configs/config.json.""" + + extract: ExtractConfig + classify: ClassifyConfig + parse: ParseConfig + split: SplitConfig diff --git a/src/extraction_review/metadata_workflow.py b/src/extraction_review/metadata_workflow.py index 11c8b33..c58a142 100644 --- a/src/extraction_review/metadata_workflow.py +++ b/src/extraction_review/metadata_workflow.py @@ -1,13 +1,13 @@ from typing import Annotated, Any import jsonref -from llama_cloud import AsyncLlamaCloud +from llama_cloud.types.configuration_response import ExtractV2Parameters from workflows import Workflow, step from workflows.events import StartEvent, StopEvent -from workflows.resource import Resource, ResourceConfig +from workflows.resource import ResourceConfig -from .clients import get_llama_cloud_client -from .config import EXTRACTED_DATA_COLLECTION, ExtractConfig, JsonSchema +from .clients import get_llama_cloud_client, project_id +from .config import EXTRACTED_DATA_COLLECTION, ExtractConfig class MetadataResponse(StopEvent): @@ -22,15 +22,6 @@ class MetadataWorkflow(Workflow): async def get_metadata( self, _: StartEvent, - extraction_schema: Annotated[ - JsonSchema, - ResourceConfig( - config_file="configs/config.json", - path_selector="extract.json_schema", - label="Extraction Schema", - description="JSON Schema defining the fields to extract from documents", - ), - ], extract_config: Annotated[ ExtractConfig, ResourceConfig( @@ -40,23 +31,27 @@ class MetadataWorkflow(Workflow): description="Configuration for document extraction quality and features", ), ], - llama_cloud_client: Annotated[ - AsyncLlamaCloud, Resource(get_llama_cloud_client) - ], ) -> MetadataResponse: """Return the data schema and storage settings for the review interface. - When extraction_agent_id is set, fetches the schema from the remote - agent so the UI always reflects what the agent will actually extract. - Otherwise uses the local schema from config.json. + When `configuration_id` is set, fetches the schema from the saved + extract configuration so the UI always reflects what will actually be + extracted. Otherwise uses the local schema from config.json. """ - if extract_config.extraction_agent_id: - agent = await llama_cloud_client.extraction.extraction_agents.get( - extract_config.extraction_agent_id + if extract_config.configuration_id: + client = get_llama_cloud_client() + config_resp = await client.configurations.retrieve( + extract_config.configuration_id, + project_id=project_id, ) - schema_dict = agent.data_schema + params = config_resp.parameters + if not isinstance(params, ExtractV2Parameters): + raise ValueError( + f"Configuration {extract_config.configuration_id} is not extract_v2" + ) + schema_dict = dict(params.data_schema) else: - schema_dict = extraction_schema.to_dict() + schema_dict = dict(extract_config.data_schema) json_schema = jsonref.replace_refs(schema_dict, proxies=False) return MetadataResponse( diff --git a/src/extraction_review/process_file.py b/src/extraction_review/process_file.py index a067d54..a2bbdac 100644 --- a/src/extraction_review/process_file.py +++ b/src/extraction_review/process_file.py @@ -5,6 +5,7 @@ from typing import Annotated, Any, Literal from llama_cloud import AsyncLlamaCloud from llama_cloud.types.beta.extracted_data import ExtractedData, InvalidExtractionData +from llama_cloud.types.configuration_response import ExtractV2Parameters from pydantic import BaseModel from workflows import Context, Workflow, step from workflows.events import Event, StartEvent, StopEvent @@ -70,9 +71,14 @@ class ProcessFileWorkflow(Workflow): file_id = event.file_id logger.info(f"Running file {file_id}") + # Get file metadata (v2: files.list returns AsyncPaginator) try: - files_page = await llama_cloud_client.files.list(file_ids=[file_id]) - file_metadata = files_page.items[0] + file_metadata = None + async for f in llama_cloud_client.files.list(file_ids=[file_id]): + file_metadata = f + break + if file_metadata is None: + raise ValueError(f"File {file_id} not found") filename = file_metadata.name except Exception as e: logger.error(f"Error fetching file metadata {file_id}: {e}", exc_info=True) @@ -89,21 +95,24 @@ class ProcessFileWorkflow(Workflow): Status(level="info", message=f"Extracting data from file {filename}") ) - if extract_config.extraction_agent_id: - # Remote agent mode: delegate schema and settings to the agent - extract_job = await llama_cloud_client.extraction.jobs.extract( - extraction_agent_id=extract_config.extraction_agent_id, - file_id=file_id, + if extract_config.configuration_id: + extract_job = await llama_cloud_client.extract.create( + file_input=file_id, + configuration_id=extract_config.configuration_id, + project_id=project_id, ) else: - # Local mode: use schema and settings from config.json - extract_job = await llama_cloud_client.extraction.run( - config=extract_config.settings.model_dump(), - data_schema=extract_config.json_schema, - file_id=file_id, + extract_job = await llama_cloud_client.extract.create( + file_input=file_id, + configuration=extract_config.model_dump( + exclude={"configuration_id", "product_type"}, + exclude_none=True, + ), project_id=project_id, ) + # Use file_hash from the event (computed by UI from file content) + # or fall back to external_file_id from file metadata for deduplication file_hash = event.file_hash or file_metadata.external_file_id async with ctx.store.edit_state() as state: @@ -137,32 +146,41 @@ class ProcessFileWorkflow(Workflow): if state.extract_job_id is None: raise ValueError("Job ID cannot be null when waiting for its completion") - await llama_cloud_client.extraction.jobs.wait_for_completion( - state.extract_job_id + # Wait for extraction job to complete; v2 returns the completed job + # (with extract_result embedded) directly. + job = await llama_cloud_client.extract.wait_for_completion( + state.extract_job_id, + project_id=project_id, ) - - extracted_result = await llama_cloud_client.extraction.jobs.get_result( - state.extract_job_id - ) - extract_run = await llama_cloud_client.extraction.runs.get( - run_id=extracted_result.run_id + # Re-fetch with extract_metadata expansion for field-level metadata + job = await llama_cloud_client.extract.get( + state.extract_job_id, + expand=["extract_metadata"], + project_id=project_id, ) extracted_event: ExtractedEvent | ExtractedInvalidEvent try: logger.info( - f"Extracted data: {json.dumps(extracted_result.model_dump(), indent=2)}" + f"Extracted data: {json.dumps(job.model_dump(mode='json'), indent=2, default=str)}" ) - if extract_config.extraction_agent_id: - # Remote agent mode: get schema from the agent - agent = await llama_cloud_client.extraction.extraction_agents.get( - extract_config.extraction_agent_id + # Resolve the schema that governs the extracted data. + if extract_config.configuration_id: + config_resp = await llama_cloud_client.configurations.retrieve( + extract_config.configuration_id, + project_id=project_id, ) - schema_class = get_extraction_schema(agent.data_schema) + params = config_resp.parameters + if not isinstance(params, ExtractV2Parameters): + raise ValueError( + f"Configuration {extract_config.configuration_id} is not extract_v2" + ) + schema_class = get_extraction_schema(dict(params.data_schema)) else: - schema_class = get_extraction_schema(extract_config.json_schema) - data = ExtractedData.from_extraction_result( - result=extract_run, + schema_class = get_extraction_schema(dict(extract_config.data_schema)) + + data = ExtractedData.from_extract_job( + job=job, schema=schema_class, file_name=state.filename, file_id=state.file_id, @@ -224,6 +242,7 @@ workflow = ProcessFileWorkflow(timeout=None) if __name__ == "__main__": from pathlib import Path + from dotenv import load_dotenv load_dotenv()