invoice-extraction: migrate workflow to llama-cloud v2 SDK (#252)

* invoice-extraction: migrate workflow to llama-cloud v2 SDK

- Bump llama-cloud pin from >=1.3.0,<2 to >=2.3.0,<3.
- Replace v1 client.extraction.extract(...) with v2 client.extract.create
  (inline ExtractConfigurationParam) + wait_for_completion.
- Translate v1 extraction_mode branches (base/advanced/premium) to v2
  ExtractConfigurationParam: base → tier=cost_effective, advanced →
  tier=agentic, premium → tier=agentic + cite_sources + confidence_scores.
- Drop the v1-only ExtractConfigParam import.
- Read extracted data from job.extract_result (dict or list) in place of
  result.data.
- Use a context manager for the file upload so the file handle is closed.
- Fix "\\n\\n" literal backslash-n in extraction_result and __main__.

Bumps invoice-extraction version 0.1.3 -> 0.2.0.

* drop spurious Any cast on job.extract_result
This commit is contained in:
Adrian Lyjak
2026-04-16 15:37:02 -04:00
committed by GitHub
parent db73a39e25
commit 59d8ad5af8
2 changed files with 27 additions and 42 deletions
+1 -1
View File
@@ -10,7 +10,7 @@ requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-workflows>=2.16.0,<3.0.0",
"llama-cloud>=1.3.0,<2",
"llama-cloud>=2.3.0,<3",
]
[dependency-groups]
+26 -41
View File
@@ -3,7 +3,7 @@ from pathlib import Path
from typing import Annotated
from llama_cloud import AsyncLlamaCloud
from llama_cloud.types.extraction.extract_config_param import ExtractConfigParam
from llama_cloud.types.extract_configuration_param import ExtractConfigurationParam
from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import (
@@ -58,53 +58,38 @@ class InvoiceExtractWorkflow(Workflow):
state.extraction_mode = ev.extraction_mode
state.path = ev.path
config: ExtractConfigParam
configuration: ExtractConfigurationParam = {
"data_schema": InvoiceData.model_json_schema(),
}
if ev.extraction_mode == "base":
config = {
"extraction_mode": "FAST",
"high_resolution_mode": False,
"invalidate_cache": False,
"cite_sources": False,
"use_reasoning": False,
"confidence_scores": False,
}
configuration["tier"] = "cost_effective"
elif ev.extraction_mode == "advanced":
config = {
"extraction_mode": "MULTIMODAL",
"high_resolution_mode": True,
"invalidate_cache": False,
"cite_sources": False,
"use_reasoning": True,
"confidence_scores": False,
}
configuration["tier"] = "agentic"
else:
config = {
"extraction_mode": "PREMIUM",
"high_resolution_mode": True,
"invalidate_cache": False,
"cite_sources": True,
"use_reasoning": True,
"confidence_scores": True,
}
configuration["tier"] = "agentic"
configuration["cite_sources"] = True
configuration["confidence_scores"] = True
uploaded = await client.files.create(
file=Path(ev.path).open("rb"),
purpose="extract",
)
result = await client.extraction.extract(
config=config,
data_schema=InvoiceData.model_json_schema(),
file_id=uploaded.id,
with Path(ev.path).open("rb") as f:
uploaded = await client.files.create(file=f, purpose="extract")
extract_job = await client.extract.create(
file_input=uploaded.id,
configuration=configuration,
)
job = await client.extract.wait_for_completion(extract_job.id)
extracted_data: list[InvoiceData] = []
if isinstance(result.data, list):
for r in result.data:
result = job.extract_result
if isinstance(result, list):
for r in result:
extracted_data.append(InvoiceData.model_validate(r))
elif result.data is not None:
extracted_data.append(InvoiceData.model_validate(result.data))
extraction_result = "\\n\\n---\\n\\n".join(
elif result is not None:
extracted_data.append(InvoiceData.model_validate(result))
extraction_result = "\n\n---\n\n".join(
[
f"Invoice Date: {d.invoice_date}\\nCustomer: {d.customer}\\nAmount Due: {d.amount_due}"
f"Invoice Date: {d.invoice_date}\nCustomer: {d.customer}\nAmount Due: {d.amount_due}"
for d in extracted_data
]
)
@@ -128,7 +113,7 @@ async def main(path: str, extraction_mode: str) -> None:
handler = w.run(path=path, extraction_mode=extraction_mode)
async for ev in handler.stream_events():
if isinstance(ev, FeedbackRequiredEvent):
print("Extraction Result:\\n\\n" + ev.extraction_result + "\\n\\n")
print("Extraction Result:\n\n" + ev.extraction_result + "\n\n")
res = input("Approve? [yes/no]: ")
if res.lower().strip() == "yes":
handler.ctx.send_event(HumanFeedbackEvent(approved=True))