mirror of
https://github.com/run-llama/observability-blog-code.git
synced 2026-06-30 21:57:55 -04:00
first commit
This commit is contained in:
+16
@@ -0,0 +1,16 @@
|
||||
# Python-generated files
|
||||
__pycache__/
|
||||
*.py[oc]
|
||||
build/
|
||||
dist/
|
||||
wheels/
|
||||
*.egg-info
|
||||
|
||||
# Virtual environments
|
||||
.venv
|
||||
|
||||
# env variables
|
||||
.env
|
||||
|
||||
# db
|
||||
workflows.db
|
||||
@@ -0,0 +1 @@
|
||||
3.13
|
||||
@@ -0,0 +1,3 @@
|
||||
# observability-blog-code
|
||||
|
||||
Code for the [blog on observability](https://www.notion.so/llamaindex/Observability-in-LlamaIndex-Agent-Workflows-what-why-and-how-2abdb4b7d41a80b583fcc488e69d81b5?source=copy_link)
|
||||
@@ -0,0 +1,13 @@
|
||||
# compose.yaml
|
||||
name: jaeger-tracing
|
||||
|
||||
services:
|
||||
jaeger:
|
||||
image: jaegertracing/all-in-one:latest
|
||||
ports:
|
||||
- 16686:16686
|
||||
- 4317:4317
|
||||
- 4318:4318
|
||||
- 9411:9411
|
||||
environment:
|
||||
- COLLECTOR_ZIPKIN_HOST_PORT=:9411
|
||||
Binary file not shown.
@@ -0,0 +1,30 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "financial-classifier"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"llama-cloud-services>=0.6.79",
|
||||
"llama-index-observability-otel>=0.2.1",
|
||||
"llama-index-workflows>=2.11.1",
|
||||
"opentelemetry-exporter-otlp-proto-http>=1.38.0",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
only-include = ["src/financial_classifier"]
|
||||
|
||||
[tool.hatch.build.targets.wheel.sources]
|
||||
"src" = ""
|
||||
|
||||
[tool.llamadeploy.workflows]
|
||||
classify-and-extract = "financial_classifier.workflow:workflow"
|
||||
|
||||
[tool.llamadeploy]
|
||||
name = "financial-classifier"
|
||||
env_files = [".env"]
|
||||
llama_cloud = true
|
||||
@@ -0,0 +1,25 @@
|
||||
import asyncio
|
||||
import httpx
|
||||
from financial_classifier.events import InputDocumentEvent, ProgressEvent
|
||||
from workflows.client import WorkflowClient
|
||||
|
||||
async def run_workflow():
|
||||
httpx_client = httpx.AsyncClient(base_url="http://127.0.0.1:8000/deployments/financial-classifier")
|
||||
wf_client = WorkflowClient(httpx_client=httpx_client)
|
||||
data = await wf_client.run_workflow_nowait(workflow_name="classify-and-extract", start_event=InputDocumentEvent(path="financial_document.pdf"))
|
||||
async for event in wf_client.get_workflow_events(data.handler_id):
|
||||
ev = event.load_event()
|
||||
if isinstance(ev, ProgressEvent):
|
||||
print(ev.message)
|
||||
result = None
|
||||
while result is None:
|
||||
handler_data = await wf_client.get_result(data.handler_id)
|
||||
result = handler_data.result
|
||||
await asyncio.sleep(0.1)
|
||||
print(f"Final result:\n{result}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run_workflow())
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
from workflows.events import StartEvent, StopEvent, Event
|
||||
from typing import Literal
|
||||
from pydantic import ConfigDict
|
||||
from .resources import CashflowStatement, IncomeStatement, BalanceSheet
|
||||
|
||||
class ProgressEvent(Event): # used to monitor progress
|
||||
message: str
|
||||
|
||||
class InputDocumentEvent(StartEvent):
|
||||
path: str
|
||||
|
||||
class ClassificationEvent(Event):
|
||||
classification: Literal["income_statement", "cashflow_statement", "balance_sheet"]
|
||||
reasons: str
|
||||
|
||||
class ExtractedDataEvent(StopEvent):
|
||||
extracted_data: CashflowStatement | IncomeStatement | BalanceSheet | None
|
||||
error: str | None = None
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
@@ -0,0 +1,33 @@
|
||||
from llama_index_instrumentation import get_dispatcher
|
||||
from llama_index_instrumentation.base.event import BaseEvent
|
||||
from typing import Any
|
||||
from llama_index.observability.otel import LlamaIndexOpenTelemetry
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
|
||||
dispatcher = get_dispatcher()
|
||||
|
||||
class ClassificationMetadata(BaseEvent):
|
||||
duration: float
|
||||
metadata: dict[str, Any]
|
||||
|
||||
@classmethod
|
||||
def class_name(cls) -> str:
|
||||
return "ClassificationMetadata"
|
||||
|
||||
class ExtractionMetadata(BaseEvent):
|
||||
duration: float
|
||||
metadata: dict[str, Any]
|
||||
|
||||
@classmethod
|
||||
def class_name(cls) -> str:
|
||||
return "ExtractionMetadata"
|
||||
|
||||
|
||||
span_exporter = OTLPSpanExporter("http://0.0.0.0:4318/v1/traces")
|
||||
|
||||
instrumentor = LlamaIndexOpenTelemetry(
|
||||
service_name_or_resource="financial_classifier.custom_traces",
|
||||
span_exporter=span_exporter,
|
||||
)
|
||||
@@ -0,0 +1,35 @@
|
||||
import os
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from llama_cloud_services import LlamaExtract
|
||||
from llama_cloud_services.beta.classifier import ClassifyClient
|
||||
|
||||
class IncomeStatement(BaseModel):
|
||||
"""Financial performance over a period"""
|
||||
period_end: str = Field(description="End date of reporting period")
|
||||
revenue: float = Field(description="Total income from sales/services")
|
||||
expenses: float = Field(description="Total costs incurred")
|
||||
net_income: float = Field(description="Profit or loss (revenue - expenses)")
|
||||
currency: str | None = Field(default=None, description="Currency code")
|
||||
|
||||
class CashflowStatement(BaseModel):
|
||||
"""Cash movement over a period"""
|
||||
period_end: str = Field(description="End date of reporting period")
|
||||
operating_cashflow: float = Field(description="Cash from core business operations")
|
||||
investing_cashflow: float = Field(description="Cash from investments/asset purchases")
|
||||
financing_cashflow: float = Field(description="Cash from debt/equity activities")
|
||||
net_change: float = Field(description="Total change in cash position")
|
||||
|
||||
class BalanceSheet(BaseModel):
|
||||
"""Financial position at a point in time"""
|
||||
report_date: str = Field(description="Snapshot date")
|
||||
total_assets: float = Field(description="Everything the company owns")
|
||||
total_liabilities: float = Field(description="Everything the company owes")
|
||||
equity: float = Field(description="Owner's stake (assets - liabilities)")
|
||||
currency: str | None = Field(default=None, description="Currency code")
|
||||
|
||||
async def get_llama_extract(*args, **kwargs) -> LlamaExtract:
|
||||
return LlamaExtract(api_key=os.getenv("LLAMA_CLOUD_API_KEY"))
|
||||
|
||||
async def get_llama_classify(*args, **kwargs) -> ClassifyClient:
|
||||
return ClassifyClient.from_api_key(api_key=os.getenv("LLAMA_CLOUD_API_KEY", ""))
|
||||
@@ -0,0 +1,77 @@
|
||||
import time
|
||||
from workflows import Workflow, step, Context
|
||||
from workflows.resource import Resource
|
||||
from typing import Annotated, cast
|
||||
from llama_cloud_services.beta.classifier import ClassifyClient
|
||||
from llama_cloud_services.extract import LlamaExtract, ExtractConfig
|
||||
from llama_cloud.types.extract_run import ExtractRun
|
||||
from llama_cloud.types.classifier_rule import ClassifierRule
|
||||
from llama_cloud.types.classification_result import ClassificationResult
|
||||
from .events import InputDocumentEvent, ClassificationEvent, ExtractedDataEvent, ProgressEvent
|
||||
from .resources import get_llama_classify, get_llama_extract, BalanceSheet, IncomeStatement, CashflowStatement
|
||||
from .instrumentation import instrumentor, ExtractionMetadata, ClassificationMetadata, dispatcher
|
||||
|
||||
class FinancialClassifierWorkflow(Workflow):
|
||||
@step
|
||||
async def classify_input_file(self, ev: InputDocumentEvent, classifier: Annotated[ClassifyClient, Resource(get_llama_classify)], ctx: Context) -> ClassificationEvent | ExtractedDataEvent:
|
||||
ctx.write_event_to_stream(ProgressEvent(message=(f"Classifying {ev.path}...")))
|
||||
async with ctx.store.edit_state() as state:
|
||||
state.input_file_path = ev.path
|
||||
rules = [
|
||||
ClassifierRule(
|
||||
type="income_statement",
|
||||
description="Shows revenue, expenses, and profit/loss over a period"
|
||||
),
|
||||
ClassifierRule(
|
||||
type="cashflow_statement",
|
||||
description="Tracks cash movements across operating, investing, and financing activities"
|
||||
),
|
||||
ClassifierRule(
|
||||
type="balance_sheet",
|
||||
description="Lists assets, liabilities, and equity at a specific date"
|
||||
)
|
||||
]
|
||||
start = time.time()
|
||||
result = await classifier.aclassify(rules=rules, files=ev.path)
|
||||
classification_result = result.items[0].result
|
||||
if classification_result is not None and classification_result.type is not None:
|
||||
classification_result = cast(ClassificationResult, classification_result)
|
||||
dispatcher.event(event=ClassificationMetadata(duration=time.time()-start, metadata={"confidence": classification_result.confidence}))
|
||||
return ClassificationEvent(
|
||||
classification=classification_result.type, # type: ignore
|
||||
reasons=classification_result.reasoning,
|
||||
)
|
||||
else:
|
||||
return ExtractedDataEvent(extracted_data=None, error="Failed to produce a classification for the input file")
|
||||
|
||||
@step
|
||||
async def extract_details_from_file(self, ev: ClassificationEvent, extractor: Annotated[LlamaExtract, Resource(get_llama_extract)], ctx: Context) -> ExtractedDataEvent:
|
||||
ctx.write_event_to_stream(ProgressEvent(message=(f"File classified as {ev.classification} because of the following reasons: {ev.reasons}")))
|
||||
ctx.write_event_to_stream(ProgressEvent(message=("Extracting details...")))
|
||||
if ev.classification == "balance_sheet":
|
||||
data_model = BalanceSheet
|
||||
elif ev.classification == "cashflow_statement":
|
||||
data_model = CashflowStatement
|
||||
else:
|
||||
data_model = IncomeStatement
|
||||
state = await ctx.store.get_state()
|
||||
start = time.time()
|
||||
result = cast(ExtractRun, (await extractor.aextract(data_schema=data_model, config=ExtractConfig(), files=state.input_file_path)))
|
||||
if result.data is not None:
|
||||
dispatcher.event(event=ExtractionMetadata(duration=time.time()-start, metadata=result.extraction_metadata or {}))
|
||||
data = data_model.model_validate(result.data)
|
||||
ctx.write_event_to_stream(ProgressEvent(message=(f"Extracted the following data:\n{data.model_dump_json(indent=4)}")))
|
||||
return ExtractedDataEvent(
|
||||
extracted_data=data
|
||||
)
|
||||
else:
|
||||
return ExtractedDataEvent(
|
||||
extracted_data=None,
|
||||
error="It was not possible to extract the data from the provided input file"
|
||||
)
|
||||
|
||||
instrumentor.start_registering()
|
||||
workflow = FinancialClassifierWorkflow(timeout=600)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user