Files
llama_cloud_services/examples/misc/parse_classify_extract_workflow.ipynb
T
2026-02-02 11:42:47 -06:00

1141 lines
44 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Document Classification + Extraction Workflow with LlamaCloud + LlamaIndex Workflows\n",
"\n",
"<a href=\"https://colab.research.google.com/github/run-llama/llama_cloud_services/blob/main/examples/misc/parse_classify_extract_workflow.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>\n",
"\n",
"This notebook shows a multi-step agentic document workflow that uses the **parsing**, **classification** and **extraction** modules in LlamaCloud, orchestrated through **LlamaIndex Workflows**. The workflow can take in a complex input document, parse it into clean markdown, classify it according to its subtype, and extract data according to a specified schema for that subtype. This allows you to automate document extraction of various types within the same workflow instead of having to manually separate the data beforehand. \n",
"\n",
"This notebook uses the following modules:\n",
"1. **Parse (LlamaParse)** - Extract and convert documents to markdown\n",
"2. **Classify** - Categorize documents based on their content\n",
"3. **Extract (LlamaExtract)** - Extract structured data using the markdown as input via SourceText\n",
"4. **LlamaIndex Workflows** - Event-driven orchestration of the parse, classify and extract steps\n",
"\n",
"The workflow is implemented as a proper LlamaIndex Workflow with separate steps for parsing, classification, and extraction, connected by typed events. This provides modularity, observability, and type safety."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> **⚠️ DEPRECATION NOTICE**>> This example uses the deprecated `llama-cloud-services` package, which will be maintained until **May 1, 2026**.>> **Please migrate to:**> - **Python**: `pip install llama-cloud>=1.0` ([GitHub](https://github.com/run-llama/llama-cloud-py))> - **New Package Documentation**: https://docs.cloud.llamaindex.ai/>> The new package provides the same functionality with improved performance and support."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup and Installation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install required packages\n",
"%pip install llama-cloud-services\n",
"%pip install python-dotenv"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"✅ API key configured\n"
]
}
],
"source": [
"import os\n",
"import nest_asyncio\n",
"from getpass import getpass\n",
"from dotenv import load_dotenv\n",
"\n",
"# Load environment variables\n",
"load_dotenv()\n",
"nest_asyncio.apply()\n",
"\n",
"# Set up API key\n",
"# os.environ[\"LLAMA_CLOUD_API_KEY\"] = \"\" # edit it\n",
"\n",
"# Setup Base URL\n",
"# os.envrion[\"LLAMA_CLOUD_BASE_URL\"] = \"https://api.cloud.eu.llamaindex.ai/\" # update if necessay\n",
"\n",
"print(\"✅ API key configured\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Download Sample Documents\n",
"\n",
"Let's download some sample documents to work with:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Downloading financial_report.pdf...\n",
"✅ Downloaded financial_report.pdf\n",
"📁 technical_spec.pdf already exists\n",
"\n",
"📂 Sample documents ready!\n"
]
}
],
"source": [
"import requests\n",
"import os\n",
"\n",
"# Create directory for sample documents\n",
"os.makedirs(\"sample_docs\", exist_ok=True)\n",
"\n",
"# Download sample documents\n",
"docs_to_download = {\n",
" \"financial_report.pdf\": \"https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/10k/uber_2021.pdf\",\n",
" \"technical_spec.pdf\": \"https://www.ti.com/lit/ds/symlink/lm317.pdf\",\n",
"}\n",
"\n",
"for filename, url in docs_to_download.items():\n",
" filepath = f\"sample_docs/{filename}\"\n",
" if not os.path.exists(filepath):\n",
" print(f\"Downloading {filename}...\")\n",
" response = requests.get(url)\n",
" if response.status_code == 200:\n",
" with open(filepath, \"wb\") as f:\n",
" f.write(response.content)\n",
" print(f\"✅ Downloaded {filename}\")\n",
" else:\n",
" print(f\"❌ Failed to download {filename}\")\n",
" else:\n",
" print(f\"📁 {filename} already exists\")\n",
"\n",
"print(\"\\n📂 Sample documents ready!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Phase 1: Document Parsing\n",
"\n",
"First, let's parse our documents using LlamaParse to extract clean markdown content."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔄 Parsing documents...\n",
"Started parsing the file under job_id 530c187a-bd2d-4eea-b38d-9e5738eab465\n",
".✅ Parsed financial report (Job ID: 530c187a-bd2d-4eea-b38d-9e5738eab465)\n",
"Started parsing the file under job_id a6e27710-776b-4445-8b94-8d75959ff5db\n",
"✅ Parsed technical spec (Job ID: a6e27710-776b-4445-8b94-8d75959ff5db)\n",
"\n",
"📄 Parsing complete!\n"
]
}
],
"source": [
"from llama_cloud_services.parse.base import LlamaParse\n",
"from llama_cloud_services.parse.utils import ResultType\n",
"import asyncio\n",
"\n",
"# Initialize the parser\n",
"parser = LlamaParse(\n",
" result_type=ResultType.MD, # Get markdown output\n",
" verbose=True,\n",
" language=\"en\",\n",
" # Premium mode for better accuracy\n",
" premium_mode=True,\n",
" # Extract tables as HTML for better structure\n",
" output_tables_as_HTML=True,\n",
" # Parse only first few pages for demo\n",
")\n",
"\n",
"print(\"🔄 Parsing documents...\")\n",
"\n",
"# Parse the financial report\n",
"financial_result = await parser.aparse(\"sample_docs/financial_report.pdf\")\n",
"print(f\"✅ Parsed financial report (Job ID: {financial_result.job_id})\")\n",
"\n",
"# Parse the technical specification\n",
"technical_result = await parser.aparse(\"sample_docs/technical_spec.pdf\")\n",
"print(f\"✅ Parsed technical spec (Job ID: {technical_result.job_id})\")\n",
"\n",
"print(\"\\n📄 Parsing complete!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Extract Markdown Content\n",
"\n",
"Now let's get the markdown content from our parsed documents:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"📋 Financial Report Markdown (first 500 chars):\n",
"\n",
"\n",
"# UNITED STATES\n",
"# SECURITIES AND EXCHANGE COMMISSION\n",
"Washington, D.C. 20549\n",
"\n",
"## FORM 10-K\n",
"\n",
"(Mark One)\n",
"\n",
"☒ ANNUAL REPORT PURSUANT TO SECTION 13 OR 15(d) OF THE SECURITIES EXCHANGE ACT OF 1934\n",
"For the fiscal year ended December 31, 2021\n",
"OR\n",
"☐ TRANSITION REPORT PURSUANT TO SECTION 13 OR 15(d) OF THE SECURITIES EXCHANGE ACT OF 1934\n",
"For the transition period from_____ to _____\n",
"Commission File Number: 001-38902\n",
"\n",
"# UBER TECHNOLOGIES, INC.\n",
"(Exact name of registrant as specified in its charter)\n",
"\n",
"Delaware\n",
"...\n",
"\n",
"📋 Technical Spec Markdown (first 500 chars):\n",
"\n",
"\n",
"LM317\n",
"SLVS044Z SEPTEMBER 1997 REVISED APRIL 2025\n",
"\n",
"# LM317 3-Pin Adjustable Regulator\n",
"\n",
"## 1 Features\n",
"\n",
"- Output voltage range:\n",
" Adjustable: 1.25V to 37V\n",
"- Output current: 1.5A\n",
"- Line regulation: 0.01%/V (typ)\n",
"- Load regulation: 0.1% (typ)\n",
"- Internal short-circuit current limiting\n",
"- Thermal overload protection\n",
"- Output safe-area compensation (new chip)\n",
"- PSRR: 80dB at 120Hz for CADJ = 10μF (new chip)\n",
"- Packages:\n",
" 4-pin, SOT-223 (DCY)\n",
" 3-pin, TO-263 (KTT)\n",
" 3-pin, TO-220 (KCS, KCT),\n",
"...\n",
"\n",
"📏 Financial report markdown length: 1338499 characters\n",
"📏 Technical spec markdown length: 92483 characters\n"
]
}
],
"source": [
"# Get markdown content from parsed documents\n",
"financial_markdown = await financial_result.aget_markdown()\n",
"technical_markdown = await technical_result.aget_markdown()\n",
"\n",
"print(\"📋 Financial Report Markdown (first 500 chars):\")\n",
"print(financial_markdown[:500])\n",
"print(\"...\\n\")\n",
"\n",
"print(\"📋 Technical Spec Markdown (first 500 chars):\")\n",
"print(technical_markdown[:500])\n",
"print(\"...\\n\")\n",
"\n",
"print(f\"📏 Financial report markdown length: {len(financial_markdown)} characters\")\n",
"print(f\"📏 Technical spec markdown length: {len(technical_markdown)} characters\")\n",
"\n",
"document_texts = [financial_markdown, technical_markdown]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Phase 2: Document Classification\n",
"\n",
"Next, let's classify our documents based on their content using `LlamaClassify`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🏷️ Setting up document classification...\n",
"📝 Created 3 classification rules\n"
]
}
],
"source": [
"from llama_cloud_services.beta.classifier.client import LlamaClassify\n",
"from llama_cloud.types import ClassifierRule\n",
"from llama_cloud_services.files.client import FileClient\n",
"from llama_cloud.client import AsyncLlamaCloud\n",
"\n",
"# Initialize the classify client\n",
"api_key = os.environ[\"LLAMA_CLOUD_API_KEY\"]\n",
"classify_client = LlamaClassify.from_api_key(api_key)\n",
"\n",
"print(\"🏷️ Setting up document classification...\")\n",
"\n",
"# Define classification rules\n",
"classification_rules = [\n",
" ClassifierRule(\n",
" type=\"financial_document\",\n",
" description=\"Documents containing financial data, revenue, expenses, SEC filings, or financial statements\",\n",
" ),\n",
" ClassifierRule(\n",
" type=\"technical_specification\",\n",
" description=\"Technical datasheets, component specifications, engineering documents, or technical manuals\",\n",
" ),\n",
" ClassifierRule(\n",
" type=\"general_document\",\n",
" description=\"General business documents, contracts, or other unspecified document types\",\n",
" ),\n",
"]\n",
"\n",
"print(f\"📝 Created {len(classification_rules)} classification rules\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Try Classification Independently\n",
"\n",
"Let's test the classification on one of our parsed documents to see how it works:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔍 Classifying financial document...\n",
" Document length: 1,338,499 characters\n",
"\n",
"✅ Classification Result:\n",
" Type: financial_document\n",
" Confidence: 100.00%\n",
" Reasoning: This document is a Form 10-K, which is an annual report required by the U.S. Securities and Exchange Commission (SEC) for publicly traded companies. It contains financial data, information about the c...\n",
"\n",
"======================================================================\n"
]
}
],
"source": [
"# Let's classify the financial document\n",
"print(\"🔍 Classifying financial document...\")\n",
"print(f\" Document length: {len(financial_markdown):,} characters\\n\")\n",
"\n",
"# Write to temp file for classification\n",
"import tempfile\n",
"from pathlib import Path\n",
"\n",
"with tempfile.NamedTemporaryFile(\n",
" mode=\"w\", suffix=\".md\", delete=False, encoding=\"utf-8\"\n",
") as tmp:\n",
" tmp.write(financial_markdown)\n",
" temp_financial_path = Path(tmp.name)\n",
"\n",
"# Classify the document\n",
"financial_classification = await classify_client.aclassify_file_path(\n",
" rules=classification_rules, file_input_path=str(temp_financial_path)\n",
")\n",
"\n",
"doc_type = financial_classification.items[0].result.type\n",
"confidence = financial_classification.items[0].result.confidence\n",
"reasoning = financial_classification.items[0].result.reasoning\n",
"\n",
"print(f\"✅ Classification Result:\")\n",
"print(f\" Type: {doc_type}\")\n",
"print(f\" Confidence: {confidence:.2%}\")\n",
"print(\n",
" f\" Reasoning: {reasoning[:200]}...\"\n",
" if reasoning and len(reasoning) > 200\n",
" else f\" Reasoning: {reasoning}\"\n",
")\n",
"\n",
"print(\"\\n\" + \"=\" * 70)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Phase 3: Structured Data Extraction using SourceText\n",
"\n",
"Now comes the key part - using the markdown content as input for structured data extraction via SourceText."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"⚙️ LlamaExtract initialized\n"
]
}
],
"source": [
"from llama_cloud_services.extract.extract import LlamaExtract, SourceText\n",
"from llama_cloud.types import ExtractConfig, ExtractMode\n",
"from pydantic import BaseModel, Field\n",
"from typing import List, Optional\n",
"\n",
"# Initialize LlamaExtract\n",
"llama_extract = LlamaExtract(api_key=api_key, verbose=True)\n",
"\n",
"print(\"⚙️ LlamaExtract initialized\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define Extraction Schemas\n",
"\n",
"Let's define different schemas for different document types:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"📋 Extraction schemas defined\n"
]
}
],
"source": [
"# Schema for financial documents\n",
"class FinancialMetrics(BaseModel):\n",
" company_name: str = Field(description=\"Name of the company\")\n",
" document_type: str = Field(\n",
" description=\"Type of financial document (10-K, 10-Q, annual report, etc.)\"\n",
" )\n",
" fiscal_year: int = Field(description=\"Fiscal year of the report\")\n",
" revenue_2021: str = Field(description=\"Total revenue in 2021\")\n",
" net_income_2021: str = Field(description=\"Net income in 2021\")\n",
" key_business_segments: List[str] = Field(\n",
" default=[], description=\"Main business segments or divisions\"\n",
" )\n",
" risk_factors: List[str] = Field(\n",
" default=[], description=\"Key risk factors mentioned\"\n",
" )\n",
"\n",
"\n",
"# Schema for technical specifications\n",
"class VoltageRange(BaseModel):\n",
" min_voltage: Optional[float] = Field(description=\"Minimum voltage\")\n",
" max_voltage: Optional[float] = Field(description=\"Maximum voltage\")\n",
" unit: str = Field(default=\"V\", description=\"Voltage unit\")\n",
"\n",
"\n",
"class TechnicalSpec(BaseModel):\n",
" component_name: str = Field(description=\"Name of the technical component\")\n",
" manufacturer: Optional[str] = Field(description=\"Manufacturer name\")\n",
" part_number: Optional[str] = Field(description=\"Part or model number\")\n",
" description: str = Field(description=\"Brief description of the component\")\n",
" operating_voltage: Optional[VoltageRange] = Field(\n",
" description=\"Operating voltage range\"\n",
" )\n",
" maximum_current: Optional[float] = Field(\n",
" description=\"Maximum current rating in amperes\"\n",
" )\n",
" key_features: List[str] = Field(\n",
" default=[], description=\"Key features and capabilities\"\n",
" )\n",
" applications: List[str] = Field(default=[], description=\"Typical applications\")\n",
"\n",
"\n",
"print(\"📋 Extraction schemas defined\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Building the Complete Workflow\n",
"\n",
"Now that we've seen how parsing works, let's build a complete 3-step workflow (Parse → Classify → Extract) using LlamaIndex Workflows. We'll define the workflow structure here, and you can see it in action below where we also demonstrate the classification and extraction modules independently.\n",
"\n",
"### Install Workflows Package\n",
"\n",
"First, let's install the LlamaIndex workflows package:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install llama-index-workflows llama-index-utils-workflow"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define the Workflow\n",
"\n",
"Let's restructure the document processing into a proper LlamaIndex Workflow with separate classification and extraction steps:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔧 Workflow defined!\n"
]
}
],
"source": [
"import tempfile\n",
"from pathlib import Path\n",
"from llama_cloud import ExtractConfig\n",
"from workflows import Workflow, step, Context\n",
"from workflows.events import Event, StartEvent, StopEvent\n",
"\n",
"\n",
"# Define workflow events\n",
"class ParseEvent(Event):\n",
" \"\"\"Event emitted after parsing\"\"\"\n",
"\n",
" file_path: str\n",
" markdown_content: str\n",
" job_id: str\n",
"\n",
"\n",
"class ClassifyEvent(Event):\n",
" \"\"\"Event emitted after classification\"\"\"\n",
"\n",
" markdown_content: str\n",
" temp_path: str\n",
" doc_type: str\n",
" confidence: float\n",
"\n",
"\n",
"class ExtractEvent(Event):\n",
" \"\"\"Event emitted after extraction\"\"\"\n",
"\n",
" doc_type: str\n",
" confidence: float\n",
" extracted_data: dict\n",
" markdown_length: int\n",
" temp_path: str\n",
" markdown_sample: str\n",
"\n",
"\n",
"class DocumentWorkflow(Workflow):\n",
" \"\"\"\n",
" Complete document processing workflow: Parse → Classify → Extract\n",
" \"\"\"\n",
"\n",
" def __init__(\n",
" self,\n",
" parser,\n",
" classify_client,\n",
" classification_rules,\n",
" llama_extract,\n",
" financial_schema,\n",
" technical_schema,\n",
" **kwargs,\n",
" ):\n",
" super().__init__(**kwargs)\n",
" self.parser = parser\n",
" self.classify_client = classify_client\n",
" self.classification_rules = classification_rules\n",
" self.llama_extract = llama_extract\n",
" self.financial_schema = financial_schema\n",
" self.technical_schema = technical_schema\n",
"\n",
" @step\n",
" async def parse_document(self, ctx: Context, ev: StartEvent) -> ParseEvent:\n",
" \"\"\"\n",
" Step 1: Parse the document to extract markdown\n",
" \"\"\"\n",
" file_path = ev.file_path\n",
" print(f\"📄 Step 1: Parsing document: {file_path}...\")\n",
"\n",
" # Parse the document\n",
" parse_result = await self.parser.aparse(file_path)\n",
" markdown_content = await parse_result.aget_markdown()\n",
" job_id = parse_result.job_id\n",
"\n",
" print(f\" ✅ Parsed successfully (Job ID: {job_id})\")\n",
" print(f\" 📝 Extracted {len(markdown_content):,} characters\")\n",
"\n",
" # Write event to stream for monitoring\n",
" parse_event = ParseEvent(\n",
" file_path=file_path,\n",
" markdown_content=markdown_content,\n",
" job_id=job_id,\n",
" )\n",
" ctx.write_event_to_stream(parse_event)\n",
"\n",
" return parse_event\n",
"\n",
" @step\n",
" async def classify_document(self, ctx: Context, ev: ParseEvent) -> ClassifyEvent:\n",
" \"\"\"\n",
" Step 2: Classify the document based on its content\n",
" \"\"\"\n",
" markdown_content = ev.markdown_content\n",
" print(\"🏷️ Step 2: Classifying document...\")\n",
"\n",
" # Write markdown to temp file for classification\n",
" with tempfile.NamedTemporaryFile(\n",
" mode=\"w\", suffix=\".md\", delete=False, encoding=\"utf-8\"\n",
" ) as tmp:\n",
" tmp.write(markdown_content)\n",
" temp_path = Path(tmp.name)\n",
"\n",
" # Classify the document\n",
" classification = await self.classify_client.aclassify_file_path(\n",
" rules=self.classification_rules, file_input_path=str(temp_path)\n",
" )\n",
" doc_type = classification.items[0].result.type\n",
" confidence = classification.items[0].result.confidence\n",
"\n",
" print(f\" ✅ Classified as: {doc_type} (confidence: {confidence:.2f})\")\n",
"\n",
" # Write event to stream for monitoring\n",
" classify_event = ClassifyEvent(\n",
" markdown_content=markdown_content,\n",
" temp_path=str(temp_path),\n",
" doc_type=doc_type,\n",
" confidence=confidence,\n",
" )\n",
" ctx.write_event_to_stream(classify_event)\n",
"\n",
" return classify_event\n",
"\n",
" @step\n",
" async def extract_data(self, ctx: Context, ev: ClassifyEvent) -> ExtractEvent:\n",
" \"\"\"\n",
" Step 3: Extract structured data based on classification\n",
" \"\"\"\n",
" print(\"🔍 Step 3: Extracting structured data using SourceText...\")\n",
"\n",
" # Choose schema based on classification\n",
" if \"financial\" in ev.doc_type.lower():\n",
" schema = self.financial_schema\n",
" print(\" 📊 Using FinancialMetrics schema\")\n",
" elif \"technical\" in ev.doc_type.lower():\n",
" schema = self.technical_schema\n",
" print(\" 🔧 Using TechnicalSpec schema\")\n",
" else:\n",
" schema = self.financial_schema # Default fallback\n",
" print(\" 📊 Using default FinancialMetrics schema\")\n",
"\n",
" # Create SourceText from markdown content\n",
" source_text = SourceText(\n",
" text_content=ev.markdown_content,\n",
" filename=f\"{os.path.basename(ev.temp_path)}_markdown.md\",\n",
" )\n",
"\n",
" # Configure extraction\n",
" extract_config = ExtractConfig(\n",
" extraction_mode=\"BALANCED\",\n",
" )\n",
"\n",
" # Perform extraction\n",
" extraction_result = self.llama_extract.extract(\n",
" data_schema=schema, config=extract_config, files=source_text\n",
" )\n",
"\n",
" print(\" ✅ Extraction complete!\")\n",
"\n",
" # Create markdown sample\n",
" markdown_sample = (\n",
" ev.markdown_content[:200] + \"...\"\n",
" if len(ev.markdown_content) > 200\n",
" else ev.markdown_content\n",
" )\n",
"\n",
" extract_event = ExtractEvent(\n",
" doc_type=ev.doc_type,\n",
" confidence=ev.confidence,\n",
" extracted_data=extraction_result.data,\n",
" markdown_length=len(ev.markdown_content),\n",
" temp_path=ev.temp_path,\n",
" markdown_sample=markdown_sample,\n",
" )\n",
" ctx.write_event_to_stream(extract_event)\n",
"\n",
" return extract_event\n",
"\n",
" @step\n",
" async def finalize_results(self, ctx: Context, ev: ExtractEvent) -> StopEvent:\n",
" \"\"\"\n",
" Step 4: Finalize and return results\n",
" \"\"\"\n",
" result = {\n",
" \"file_path\": ev.temp_path,\n",
" \"markdown_length\": ev.markdown_length,\n",
" \"classification\": ev.doc_type,\n",
" \"confidence\": ev.confidence,\n",
" \"extracted_data\": ev.extracted_data,\n",
" \"markdown_sample\": ev.markdown_sample,\n",
" }\n",
"\n",
" return StopEvent(result=result)\n",
"\n",
"\n",
"print(\"🔧 Workflow defined!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Workflow Structure\n",
"\n",
"The workflow consists of four steps connected by typed events:\n",
"\n",
"```\n",
"┌─────────────┐\n",
"│ StartEvent │ (file_path)\n",
"└──────┬──────┘\n",
" │\n",
" ▼\n",
"┌──────────────────┐\n",
"│ parse_document │ Step 1: Parse PDF to markdown\n",
"└──────┬───────────┘\n",
" │\n",
" ▼\n",
"┌─────────────┐\n",
"│ ParseEvent │ (markdown_content, job_id)\n",
"└──────┬──────┘\n",
" │\n",
" ▼\n",
"┌─────────────────────┐\n",
"│ classify_document │ Step 2: Classification\n",
"└──────┬──────────────┘\n",
" │\n",
" ▼\n",
"┌──────────────┐\n",
"│ ClassifyEvent│ (doc_type, confidence, markdown_content)\n",
"└──────┬───────┘\n",
" │\n",
" ▼\n",
"┌──────────────┐\n",
"│ extract_data │ Step 3: Extraction with schema selection\n",
"└──────┬───────┘\n",
" │\n",
" ▼\n",
"┌──────────────┐\n",
"│ ExtractEvent │ (extracted_data, doc_type, confidence)\n",
"└──────┬───────┘\n",
" │\n",
" ▼\n",
"┌──────────────────┐\n",
"│ finalize_results │ Step 4: Format and return results\n",
"└──────┬───────────┘\n",
" │\n",
" ▼\n",
"┌─────────────┐\n",
"│ StopEvent │ (final result dictionary)\n",
"└─────────────┘\n",
"```\n",
"\n",
"**Key Features:**\n",
"- **Step 1 (parse_document)**: Takes a file path and parses the document into clean markdown\n",
"- **Step 2 (classify_document)**: Takes markdown content and classifies it into document types\n",
"- **Step 3 (extract_data)**: Selects appropriate schema based on classification and extracts structured data\n",
"- **Step 4 (finalize_results)**: Packages all results into final output format\n",
"- Events are written to the stream for real-time monitoring\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Visualize the Workflow\n",
"\n",
"Let's visualize the workflow structure to see the flow of events:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Initialize the workflow\n",
"workflow = DocumentWorkflow(\n",
" parser=parser,\n",
" classify_client=classify_client,\n",
" classification_rules=classification_rules,\n",
" llama_extract=llama_extract,\n",
" financial_schema=FinancialMetrics,\n",
" technical_schema=TechnicalSpec,\n",
" timeout=300,\n",
" verbose=True,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"document_workflow.html\n"
]
}
],
"source": [
"# Draw the workflow visualization\n",
"from llama_index.utils.workflow import draw_all_possible_flows\n",
"\n",
"draw_all_possible_flows(\n",
" workflow,\n",
" filename=\"document_workflow.html\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The workflow has been visualized and saved to `document_workflow.html`. You can open this file in a browser to see the interactive workflow diagram.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The workflow visualization shows:\n",
"1. **StartEvent** → **parse_document** step\n",
"2. **ParseEvent** → **classify_document** step\n",
"3. **ClassifyEvent** → **extract_data** step \n",
"4. **ExtractEvent** → **finalize_results** step\n",
"5. **StopEvent** (final output)\n",
"\n",
"Each step is connected by typed events, allowing for clean separation of concerns and easy monitoring of the workflow execution.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the Workflow on Both Documents\n",
"\n",
"Now let's run the workflow on both documents and monitor the events:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"======================================================================\n",
"🚀 Processing Document 1: sample_docs/financial_report.pdf\n",
"======================================================================\n",
"\n",
"Running step parse_document\n",
"📄 Step 1: Parsing document: sample_docs/financial_report.pdf...\n",
"Started parsing the file under job_id bb53c6bf-79cc-4f63-9c97-16983d59f29d\n",
". ✅ Parsed successfully (Job ID: bb53c6bf-79cc-4f63-9c97-16983d59f29d)\n",
" 📝 Extracted 1,338,499 characters\n",
"Step parse_document produced event ParseEvent\n",
"📄 Parse Event: Extracted 1,338,499 characters\n",
"Running step classify_document\n",
"🏷️ Step 2: Classifying document...\n",
" ✅ Classified as: financial_document (confidence: 1.00)\n",
"Step classify_document produced event ClassifyEvent\n",
"📊 Classification Event: financial_document (1.00)\n",
"Running step extract_data\n",
"🔍 Step 3: Extracting structured data using SourceText...\n",
" 📊 Using FinancialMetrics schema\n",
".. ✅ Extraction complete!\n",
"Step extract_data produced event ExtractEvent\n",
"Running step finalize_results\n",
"Step finalize_results produced event StopEvent\n",
"✅ Extraction Event: 7 fields extracted\n",
"\n",
"✅ Document 1 processed successfully!\n",
"\n",
"======================================================================\n",
"🚀 Processing Document 2: sample_docs/technical_spec.pdf\n",
"======================================================================\n",
"\n",
"Running step parse_document\n",
"📄 Step 1: Parsing document: sample_docs/technical_spec.pdf...\n",
"Started parsing the file under job_id 944905c1-3c49-431a-ad86-4436d16f3d1c\n",
" ✅ Parsed successfully (Job ID: 944905c1-3c49-431a-ad86-4436d16f3d1c)\n",
" 📝 Extracted 92,483 characters\n",
"Step parse_document produced event ParseEvent\n",
"📄 Parse Event: Extracted 92,483 characters\n",
"Running step classify_document\n",
"🏷️ Step 2: Classifying document...\n",
" ✅ Classified as: technical_specification (confidence: 1.00)\n",
"Step classify_document produced event ClassifyEvent\n",
"📊 Classification Event: technical_specification (1.00)\n",
"Running step extract_data\n",
"🔍 Step 3: Extracting structured data using SourceText...\n",
" 🔧 Using TechnicalSpec schema\n",
" ✅ Extraction complete!\n",
"Step extract_data produced event ExtractEvent\n",
"Running step finalize_results\n",
"Step finalize_results produced event StopEvent\n",
"✅ Extraction Event: 8 fields extracted\n",
"\n",
"✅ Document 2 processed successfully!\n",
"\n",
"\n",
"📋 Processed 2 documents successfully!\n"
]
}
],
"source": [
"# Process both documents through the workflow\n",
"results = []\n",
"\n",
"# Define the document files to process\n",
"document_files = [\n",
" \"sample_docs/financial_report.pdf\",\n",
" \"sample_docs/technical_spec.pdf\",\n",
"]\n",
"\n",
"for i, file_path in enumerate(document_files, 1):\n",
" print(f\"\\n{'='*70}\")\n",
" print(f\"🚀 Processing Document {i}: {file_path}\")\n",
" print(f\"{'='*70}\\n\")\n",
"\n",
" try:\n",
" # Run the workflow\n",
" handler = workflow.run(file_path=file_path)\n",
"\n",
" # Monitor events as they are emitted\n",
" async for event in handler.stream_events():\n",
" if isinstance(event, ParseEvent):\n",
" print(\n",
" f\"📄 Parse Event: Extracted {len(event.markdown_content):,} characters\"\n",
" )\n",
" elif isinstance(event, ClassifyEvent):\n",
" print(\n",
" f\"📊 Classification Event: {event.doc_type} ({event.confidence:.2f})\"\n",
" )\n",
" elif isinstance(event, ExtractEvent):\n",
" print(\n",
" f\"✅ Extraction Event: {len(event.extracted_data)} fields extracted\"\n",
" )\n",
"\n",
" # Get final result\n",
" result = await handler\n",
" results.append(result)\n",
"\n",
" print(f\"\\n✅ Document {i} processed successfully!\")\n",
"\n",
" except Exception as e:\n",
" print(f\"❌ Error processing document {i}: {str(e)}\")\n",
" import traceback\n",
"\n",
" traceback.print_exc()\n",
"\n",
"print(f\"\\n\\n📋 Processed {len(results)} documents successfully!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Final Results Summary\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"📈 COMPLETE WORKFLOW RESULTS SUMMARY\n",
"======================================================================\n",
"\n",
"📄 Document 1: tmpuyxzpd3x.md\n",
" 📊 Classification: financial_document (confidence: 1.00)\n",
" 📝 Markdown length: 1,338,499 characters\n",
" 📋 Markdown sample: \n",
"\n",
"# UNITED STATES\n",
"# SECURITIES AND EXCHANGE COMMISSION\n",
"Washington, D.C. 20549\n",
"\n",
"## FORM 10-K\n",
"\n",
"(Mark O...\n",
" 🎯 Extracted fields: 7 fields\n",
" • company_name: Uber Technologies, Inc.\n",
" • document_type: Annual Report on Form 10-K\n",
" • fiscal_year: 2021\n",
" • revenue_2021: $17,455 and $21,764\n",
" • net_income_2021: $(496) to (700)\n",
" • key_business_segments: ['Borrower and the Restricted Subsidiaries', 'Holdings', 'Guarantors', 'Material Domestic Subsidiaries', 'Material Foreign Subsidiaries']\n",
" • risk_factors: ['Indemnification obligations of the borrower for losses, claims, damages, liabilities, and out-of-pocket expenses incurred by agents, lenders, arrangers, and related parties in connection with the agreement or loans, except in certain cases such as gross negligence, bad faith, willful misconduct, or material breach by the indemnitee.', \"Borrower not required to indemnify any indemnitee for settlements entered into without the borrower's consent.\", 'Limitation of liability for special, indirect, consequential, or punitive damages, and for damages from unauthorized use of information, except for direct damages resulting from gross negligence, bad faith, or willful misconduct.', 'Obligation of the borrower to indemnify the administrative agent for liabilities arising from performance of duties, except in cases of gross negligence, bad faith, or willful misconduct.', 'Limitations and conditions on assignments and participations of lender rights, including restrictions on assignments to disqualified institutions, loan parties, affiliates of loan parties, defaulting lenders, and natural persons.', 'Setoff rights for lenders and issuing banks after an event of default, allowing them to apply borrower deposits toward obligations under the agreement.', 'Potential for increased obligations under the agreement as a result of changes in law affecting payment terms.', 'Requirement for the borrower and guarantors to provide information to comply with anti-money laundering rules and the USA PATRIOT Act.']\n",
"\n",
"📄 Document 2: tmp7ower2xm.md\n",
" 📊 Classification: technical_specification (confidence: 1.00)\n",
" 📝 Markdown length: 92,483 characters\n",
" 📋 Markdown sample: \n",
"\n",
"LM317\n",
"SLVS044Z SEPTEMBER 1997 REVISED APRIL 2025\n",
"\n",
"# LM317 3-Pin Adjustable Regulator\n",
"\n",
"## 1 Fea...\n",
" 🎯 Extracted fields: 8 fields\n",
" • component_name: LM317\n",
" • manufacturer: Texas Instruments\n",
" • part_number: LM317, SLVS044Z\n",
" • description: The LM317 is an adjustable three-pin, positive-voltage regulator capable of supplying more than 1.5A (typically up to 1.5A) over an output voltage range of 1.25V to 37V. The device requires only two external resistors to set the output voltage. It features a typical line regulation of 0.01% and typical load regulation of 0.1%. The LM317 includes current limiting, thermal overload protection, and safe operating area protection. Overload protection remains functional even if the ADJUST pin is disconnected. The regulator is used in applications such as constant-current battery-charger circuits, slow turn-on 15V regulator circuits, AC voltage-regulator circuits, current-limited charger circuits, and high-current and adjustable regulator circuits. It is available in packages including SOT-223 (DCY), TO-220 (KCS), and TO-263 (KTT).\n",
" • operating_voltage: {'min_voltage': 1.25, 'max_voltage': 37.0, 'unit': 'V'}\n",
" • maximum_current: 4.0\n",
" • key_features: ['Adjustable output voltage range: 1.25V to 37V', 'Output current up to 1.5A (up to 4A with external pass elements)', 'Line regulation: typically 0.01%/V', 'Load regulation: typically 0.1%', 'Internal short-circuit current limiting / Current limiting', 'Thermal overload protection / Thermal shutdown', 'Output safe-area compensation / Safe operating area protection', 'PSRR: 80dB at 120Hz for CADJ = 10μF (new chip)', 'NPN Darlington output drive', 'Programmable feedback', 'Multiple package options (SOT-223, TO-220, TO-263)', 'Can be used in constant-current, battery-charging, and regulator applications']\n",
" • applications: ['Multifunction printers, AC drive power stage modules, Electricity meters, Servo drive control modules, Merchant network and server PSU, Adjustable voltage regulator, 0V to 30V regulator circuit, Regulator circuit with improved ripple rejection, Precision current-limiter, Tracking preregulator, 1.25V to 20V regulator, Battery charger circuit, Constant-current battery charger circuits, Slow turn-on regulator, AC voltage-regulator, Current-limited charger circuits, High-current adjustable regulator circuits, General-purpose adjustable power supply']\n",
"\n",
"✨ Workflow completed successfully!\n"
]
}
],
"source": [
"print(\"📈 COMPLETE WORKFLOW RESULTS SUMMARY\")\n",
"print(\"=\" * 70)\n",
"\n",
"for i, result in enumerate(results, 1):\n",
" print(f\"\\n📄 Document {i}: {os.path.basename(result['file_path'])}\")\n",
" print(\n",
" f\" 📊 Classification: {result['classification']} (confidence: {result['confidence']:.2f})\"\n",
" )\n",
" print(f\" 📝 Markdown length: {result['markdown_length']:,} characters\")\n",
" print(f\" 📋 Markdown sample: {result['markdown_sample'][:100]}...\")\n",
" print(f\" 🎯 Extracted fields: {len(result['extracted_data'])} fields\")\n",
"\n",
" # Print all keyvalue pairs\n",
" extracted = result[\"extracted_data\"]\n",
" for key, value in extracted.items():\n",
" print(f\" • {key}: {value}\")\n",
"\n",
"print(\"\\n✨ Workflow completed successfully!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"The notebook shows you how to build an e2e document **Classify → Extract** workflow using LlamaCloud. This uses some of our core building blocks around **classification** interleaved with **document extraction**.\n",
"\n",
"### Main Components:\n",
"\n",
"1. **LlamaParse** (`llama_cloud_services.parse.base.LlamaParse`):\n",
" - Converts documents to clean, structured markdown\n",
" - Preserves document structure and formatting\n",
" - Handles various file types (PDF, DOCX, etc.)\n",
"\n",
"2. **LlamaClassify** (`llama_cloud_services.beta.classifier.client.LlamaClassify`):\n",
" - Automatically categorizes documents based on content\n",
" - Uses customizable rules for classification\n",
" - Provides confidence scores for classifications\n",
"\n",
"3. **LlamaExtract with SourceText** (`llama_cloud_services.extract.extract.LlamaExtract`, `SourceText`):\n",
" - Extracts structured data using custom Pydantic schemas\n",
" - You can either feed in the file directly (in which case parsing will happen under the hood), or the parsed text through the **SourceText** object (which is the case in this example) \n",
"\n",
"**Benefits of an e2e workflow**: The main benefit of doing Classify -> Extract, instead of only Extract, is the fact that you can handle documents of different types/different expected schemas within the same workflow, without having to separate out the data before and running separate extractions on each data subset. "
]
}
],
"metadata": {
"kernelspec": {
"display_name": "llama_parse",
"language": "python",
"name": "llama_parse"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}