Update workflow templates to use remote repo (#22)

* Refactor: Use remote templates for scaffolding

Co-authored-by: adrian <adrian@runllama.ai>

* Fix formats

* chore: vbump

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Clelia (Astra) Bertelli <clelia@runllama.ai>
This commit is contained in:
Adrian Lyjak
2025-09-29 09:22:03 -04:00
committed by GitHub
parent bc4bf37b4b
commit 4b447f987b
29 changed files with 61 additions and 1314 deletions
+8 -1
View File
@@ -152,7 +152,14 @@ vibe-llama scaffold # launch the terminal interface
> [!NOTE]
>
> _You can find all the examples in the [`templates` folder](./templates/)_
> Starters are pulled from GitHub template repositories via [copier](https://copier.readthedocs.io/en/stable/):\_
>
> - `https://github.com/run-llama/template-workflow-basic`
> - `https://github.com/run-llama/template-workflow-document-parsing`
> - `https://github.com/run-llama/template-workflow-human-in-the-loop`
> - `https://github.com/run-llama/template-workflow-invoice-extraction`
> - `https://github.com/run-llama/template-workflow-rag`
> - `https://github.com/run-llama/template-workflow-web-scraping`
## SDK
+1 -4
View File
@@ -13,7 +13,7 @@ dev = [
[project]
name = "vibe-llama"
version = "0.4.4"
version = "0.4.5"
description = "vibe-llama is a set of tools that are designed to help developers build working and reliable applications with LlamaIndex, LlamaCloud Services and llama-index-workflows."
readme = "README.md"
requires-python = ">=3.10"
@@ -39,9 +39,6 @@ vibe-llama = "vibe_llama.main:main"
[tool.hatch.build.targets.wheel]
only-include = ["src/vibe_llama"]
[tool.hatch.build.targets.wheel.force-include]
"templates" = "vibe_llama/templates"
[tool.hatch.build.targets.wheel.sources]
"src" = ""
+13 -13
View File
@@ -1,8 +1,7 @@
import os
from pathlib import Path
from typing import Optional, Literal, get_args, cast, Tuple
from copier import run_copy
import copier
# enum-like type for project names
ProjectName = Literal[
@@ -17,6 +16,16 @@ ProjectName = Literal[
# Expose a typed tuple of just the allowed names for convenience/choices
PROJECTS: Tuple[ProjectName, ...] = cast(Tuple[ProjectName, ...], get_args(ProjectName))
# Map local template names to remote GitHub copier templates
TEMPLATES: dict[ProjectName, str] = {
"basic": "gh:run-llama/template-workflow-basic",
"document_parsing": "gh:run-llama/template-workflow-document-parsing",
"human_in_the_loop": "gh:run-llama/template-workflow-human-in-the-loop",
"invoice_extraction": "gh:run-llama/template-workflow-invoice-extraction",
"rag": "gh:run-llama/template-workflow-rag",
"web_scraping": "gh:run-llama/template-workflow-web-scraping",
}
async def create_scaffold(
request: ProjectName = "basic",
@@ -34,17 +43,8 @@ async def create_scaffold(
# Ensure destination directory exists
os.makedirs(actual_path, exist_ok=True)
# Copy the selected template directory into destination using Copier
try:
template_src = str(
Path(__file__).resolve().parents[1] / "templates" / request
)
run_copy(template_src, actual_path)
except Exception:
template_src = str(
Path(__file__).resolve().parents[3] / "templates" / request
)
run_copy(template_src, actual_path, exclude=[".venv", "uv.lock"])
template_src = TEMPLATES[request]
copier.run_copy(template_src, actual_path)
return f"[bold green]SUCCESS✅[/]\nYour workflow was written to: {os.path.join(actual_path, 'workflow.py')}.\nFind project details at: {os.path.join(actual_path, 'pyproject.toml')}.\nInstall all necessary dependencies with [on gray]cd {actual_path} && pip install .[/]"
+1 -1
View File
@@ -120,7 +120,7 @@ class VibeLlamaScaffold:
Download a template.
Args:
template_name (ProjectName): Name of the template. You can find all the available templates [here](https://github.com/run-llama/vibe-llama/blob/main/templates/). Defaults to `basic` if not provided
template_name (ProjectName): Name of the template. Defaults to `basic` if not provided
save_path (Optional[str]): Path where to save the downloaded template. Defaults to `.vibe-llama/scaffold` if not provided
"""
result = await create_scaffold(template_name, save_path)
-52
View File
@@ -1,52 +0,0 @@
# Email Workflow Example
This example demonstrates how to build an event-driven, async-first workflow for sending emails using [llama-index-workflows](https://github.com/run-llama/llama-index-workflows). The workflow uses an LLM to generate email content and sends emails to internal recipients only.
## Installation
Install all required dependencies (including llama-index-workflows and OpenAI LLM support):
```bash
pip install -e .
```
## Usage
Run the workflow from the command line:
```bash
python -m basic.workflow \
--sender you@mycompany.com \
--receiver recipient1@mycompany.com \
--receiver recipient2@mycompany.com \
--subject "Quarterly Update" \
--draft "Here's a draft for the quarterly update email."
```
**Note:**
- The sender and all receivers must use `@mycompany.com` emails.
- You must set your `OPENAI_API_KEY` in the environment before running.
## Workflow Overview
- **prepare_email**:
Initializes the email client and uses an LLM to generate a fully-formed email from your draft and subject.
Emits a `PrepareEmail` event for each receiver.
- **send_email**:
Sends the generated email to each receiver using the internal email client.
Updates email statistics.
- **collect_email_stats**:
Collects results from all send attempts and outputs a summary of successes and failures.
## Customization
- Replace the `EmailClient` logic with your own email sending implementation.
- Extend the workflow with additional steps or validation as needed.
## References
- [llama-index-workflows documentation](https://github.com/run-llama/llama-index-workflows)
- [OpenAI LLM integration](https://github.com/run-llama/llama-index-llms-openai)
-17
View File
@@ -1,17 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "basic"
version = "0.1.0"
description = "A base example that showcases usage patterns for workflows"
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-workflows",
"llama-index-llms-openai"
]
[tool.llamadeploy.workflows]
default = "basic.workflow:workflow"
-172
View File
@@ -1,172 +0,0 @@
from pydantic import BaseModel, ConfigDict
from workflows import Workflow, step, Context
from workflows.events import (
Event,
StartEvent,
StopEvent,
)
from typing import Annotated, Optional
from workflows.resource import Resource
from llama_index.llms.openai import OpenAI
# replace with an actual email sending client
class EmailClient:
def __init__(self, sender_email: str):
if not self._internal_email(sender_email):
print("Sorry, you are not allowed to use this email client")
return
self.sender_email = sender_email
def send(self, receiver_email: str, subject: str, content: str) -> bool:
if not self._internal_email(receiver_email):
print(
"Sorry, we cannot send an email to a person outside of the organization"
)
return False
print(
f"Sent an email from {self.sender_email} to {receiver_email} with subject '{subject}' and content:\n{content}"
)
return True
def _internal_email(self, email: str) -> bool:
return email.endswith("@mycompany.com")
class EmailStats:
def __init__(self):
self.success = 0
self.fail = 0
def update(self, result: bool):
if result:
self.success += 1
else:
self.fail += 1
class PrepareEmail(Event):
receiver: str
subject: str
content: str
class SendEmail(Event):
success: bool
class EmailFlowState(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
email_num: int = 0
email_client: Optional[EmailClient] = None
async def get_llm(*args, **kwargs) -> OpenAI:
return OpenAI("gpt-4.1")
async def get_email_stats(*args, **kwargs) -> EmailStats:
return EmailStats()
class EmailFlow(Workflow):
@step
async def prepare_email(
self,
ev: StartEvent,
ctx: Context[EmailFlowState],
llm: Annotated[OpenAI, Resource(get_llm)],
) -> PrepareEmail | StopEvent | None:
async with ctx.store.edit_state() as state:
cl = EmailClient(sender_email=ev.sender)
if hasattr(cl, "sender_email"):
state.email_client = cl
state.email_num = len(ev.receivers)
else:
return StopEvent(
result="It is not possible to send emails from your current address: please use a mycompany.com address and try again."
)
email_content = await llm.acomplete(
f"Given this email draft: {ev.draft} and subject: {ev.subject}, can you please create an fully-formed email message to send?"
)
for receiver in ev.receivers:
ctx.send_event(
PrepareEmail(
receiver=receiver, subject=ev.subject, content=email_content.text
)
)
@step
async def send_email(
self,
ev: PrepareEmail,
ctx: Context[EmailFlowState],
stats: Annotated[EmailStats, Resource(get_email_stats)],
) -> SendEmail:
state = await ctx.store.get_state()
succ = state.email_client.send(ev.receiver, ev.subject, ev.content) # type: ignore
stats.update(succ)
return SendEmail(success=succ)
@step
async def collect_email_stats(
self,
ev: SendEmail,
ctx: Context[EmailFlowState],
stats: Annotated[EmailStats, Resource(get_email_stats)],
) -> StopEvent | None:
state = await ctx.store.get_state()
evs = ctx.collect_events(ev, [SendEmail] * state.email_num)
if evs:
return StopEvent(
result=f"Sent {stats.success} emails, failed to send {stats.fail} emails"
)
async def main(sender: str, receivers: list[str], subject: str, draft: str) -> None:
w = EmailFlow(timeout=60, verbose=False)
result = await w.run(
sender=sender, receivers=receivers, subject=subject, draft=draft
)
print(str(result))
workflow = EmailFlow(timeout=None)
if __name__ == "__main__":
import asyncio
import os
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"-s",
"--sender",
required=True,
help="Sender email (must end with @mycompany.com)",
)
parser.add_argument(
"-r",
"--receiver",
required=True,
action="append",
help="Email for the receiver (must end with @mycompany.com). Can be repeated",
)
parser.add_argument("-t", "--subject", required=True, help="Subject of the email")
parser.add_argument("-d", "--draft", required=True, help="Draft for the email")
args = parser.parse_args()
if not os.getenv("OPENAI_API_KEY", None):
raise ValueError(
"You need to set OPENAI_API_KEY in your environment before using this workflow"
)
asyncio.run(
main(
sender=args.sender,
receivers=args.receiver,
subject=args.subject,
draft=args.draft,
)
)
-49
View File
@@ -1,49 +0,0 @@
# Document Parsing Workflow Example
This workflow demonstrates how to use [llama-index-workflows](https://github.com/run-llama/llama-index-workflows) to parse documents with LlamaParse in three modes: cost-effective, agentic, and agentic plus. The workflow is event-driven and async-first, making it suitable for intelligent automation and scalable document processing.
## Installation
Install all required dependencies (including llama-index-workflows and LlamaCloud services):
```bash
pip install -e .
```
## Usage
Run the workflow from the command line:
```bash
python -m document_parsing.workflow \
--path /path/to/document.pdf \
--mode agentic
```
**Modes:**
- `cost_effective`: Uses LLM-based parsing for lower cost.
- `agentic`: Uses agentic parsing with OpenAI GPT-4.
- `agentic_plus`: Uses agentic parsing with Anthropic Sonnet.
**Note:**
- You must set your `LLAMA_CLOUD_API_KEY` in the environment before running.
## Workflow Overview
- **choose_document_parsing_mode**:
Selects the parsing mode based on user input and emits the corresponding event.
- **parse_document_cost_effective / agentic / agentic_plus**:
Parses the document using the selected LlamaParse mode and outputs the result as markdown.
## Customization
- Extend the workflow to add post-processing, validation, or data extraction steps.
- Integrate with other LlamaIndex components for advanced document analytics.
## References
- [llama-index-workflows documentation](https://github.com/run-llama/llama-index-workflows)
- [LlamaParse documentation](https://docs.cloud.llamaindex.ai/llamaparse/getting_started)
-17
View File
@@ -1,17 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "document-parsing"
version = "0.1.0"
description = "A workflow that, using LlamaParse, parses unstructured documents and returns their raw text content."
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-workflows",
"llama-cloud-services"
]
[tool.llamadeploy.workflows]
default = "document_parsing.workflow:workflow"
@@ -1,165 +0,0 @@
from pydantic import BaseModel
from typing import Annotated
from llama_cloud_services import LlamaParse
from workflows import Workflow, step, Context
from workflows.events import StartEvent, Event, StopEvent
from workflows.resource import Resource
class ParseDocumentCostEffectiveEvent(Event):
pass
class ParseDocumentAgenticEvent(Event):
pass
class ParseDocumentAgenticPlusEvent(Event):
pass
async def get_llama_parse_cost_effective(*args, **kwargs) -> LlamaParse:
return LlamaParse(
parse_mode="parse_page_with_llm",
high_res_ocr=True,
adaptive_long_table=True,
outlined_table_extraction=True,
output_tables_as_HTML=True,
result_type="markdown",
)
async def get_llama_parse_agentic(*args, **kwargs) -> LlamaParse:
return LlamaParse(
parse_mode="parse_page_with_agent",
model="openai-gpt-4-1-mini",
high_res_ocr=True,
adaptive_long_table=True,
outlined_table_extraction=True,
output_tables_as_HTML=True,
result_type="markdown",
)
async def get_llama_parse_agentic_plus(*args, **kwargs) -> LlamaParse:
return LlamaParse(
parse_mode="parse_page_with_agent",
model="anthropic-sonnet-4.0",
high_res_ocr=True,
adaptive_long_table=True,
outlined_table_extraction=True,
output_tables_as_HTML=True,
result_type="markdown",
)
class DocumentProcessingState(BaseModel):
document_path: str = ""
class DocumentProcessingWorkflow(Workflow):
@step
async def choose_document_parsing_mode(
self, ev: StartEvent, ctx: Context[DocumentProcessingState]
) -> (
ParseDocumentCostEffectiveEvent
| ParseDocumentAgenticEvent
| ParseDocumentAgenticPlusEvent
):
async with ctx.store.edit_state() as state:
state.document_path = ev.document_path
if ev.parsing_mode == "cost_effective":
return ParseDocumentCostEffectiveEvent()
elif ev.parsing_mode == "agentic":
return ParseDocumentAgenticEvent()
else:
return ParseDocumentAgenticPlusEvent()
@step
async def parse_document_cost_effective(
self,
ev: ParseDocumentCostEffectiveEvent,
ctx: Context[DocumentProcessingState],
parser: Annotated[LlamaParse, Resource(get_llama_parse_cost_effective)],
) -> StopEvent:
state = await ctx.store.get_state()
result = await parser.aparse(state.document_path)
if isinstance(result, list):
documents = []
for r in result:
documents.extend(await r.aget_markdown_documents())
else:
documents = await result.aget_markdown_documents()
text = "\\n\\n---\\n\\n".join([document.text for document in documents])
return StopEvent(result=text)
@step
async def parse_document_agentic(
self,
ev: ParseDocumentAgenticEvent,
ctx: Context[DocumentProcessingState],
parser: Annotated[LlamaParse, Resource(get_llama_parse_agentic)],
) -> StopEvent:
state = await ctx.store.get_state()
result = await parser.aparse(state.document_path)
if isinstance(result, list):
documents = []
for r in result:
documents.extend(await r.aget_markdown_documents())
else:
documents = await result.aget_markdown_documents()
text = "\\n\\n---\\n\\n".join([document.text for document in documents])
return StopEvent(result=text)
@step
async def parse_document_agentic_plus(
self,
ev: ParseDocumentAgenticPlusEvent,
ctx: Context[DocumentProcessingState],
parser: Annotated[LlamaParse, Resource(get_llama_parse_agentic_plus)],
) -> StopEvent:
state = await ctx.store.get_state()
result = await parser.aparse(state.document_path)
if isinstance(result, list):
documents = []
for r in result:
documents.extend(await r.aget_markdown_documents())
else:
documents = await result.aget_markdown_documents()
text = "\\n\\n---\\n\\n".join([document.text for document in documents])
return StopEvent(result=text)
async def main(document_path: str, parsing_mode: str) -> None:
wf = DocumentProcessingWorkflow(
timeout=1800
) # allow processing jobs up to 30 minutes
result = await wf.run(document_path=document_path, parsing_mode=parsing_mode)
print(str(result))
workflow = DocumentProcessingWorkflow(timeout=None)
if __name__ == "__main__":
import os
import asyncio
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-p", "--path", help="Document path", required=True)
parser.add_argument(
"-m",
"--mode",
help="Parsing Mode",
choices=["cost_effective", "agentic", "agentic_plus"],
required=False,
default="agentic",
)
args = parser.parse_args()
if not os.getenv("LLAMA_CLOUD_API_KEY", None):
raise ValueError(
"You need to set LLAMA_CLOUD_API_KEY in your environment before using this workflow"
)
asyncio.run(main(args.path, args.mode))
-48
View File
@@ -1,48 +0,0 @@
# Human-in-the-Loop Flight Booking Workflow Example
This workflow demonstrates how to build a human-in-the-loop, event-driven workflow for flight search and booking using [llama-index-workflows](https://github.com/run-llama/llama-index-workflows). The workflow uses an LLM to extract flight details from user input, presents flight options, and requires human approval before booking.
## Installation
Install all required dependencies (including llama-index-workflows and OpenAI LLM support):
```bash
pip install -e .
```
## Usage
Run the workflow from the command line:
```bash
python -m human_in_the_loop.workflow \
--message "I want to fly from San Francisco to Paris on July 10th"
```
**Note:**
- You must set your `OPENAI_API_KEY` in the environment before running.
## Workflow Overview
- **search_for_flight**:
Uses an LLM to extract flight details from the user's message and searches for available flights.
Emits a `FlightSearchEvent` with candidate flights.
- **chosen_flight**:
Waits for human input to select a flight and confirm booking.
Books the flight if approved, or exits if declined.
## Human-in-the-Loop Pattern
- The workflow pauses to request human input for flight selection and booking approval.
- Input is provided interactively via the command line.
## Customization
- Replace the `FlightsAPI` logic with your own flight search and booking implementation.
- Extend the workflow to support additional validation or multi-step approval.
## References
- [llama-index-workflows documentation](https://github.com/run-llama/llama-index-workflows)
@@ -1,17 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "human-in-the-loop"
version = "0.1.0"
description = "A workflow showcasing how to use human in the loop"
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-workflows",
"llama-index-llms-openai"
]
[tool.llamadeploy.workflows]
default = "human_in_the_loop.workflow:workflow"
@@ -1,171 +0,0 @@
import random
from pydantic import BaseModel, Field
from workflows import Workflow, step, Context
from workflows.resource import Resource
from typing import Annotated
from workflows.events import (
StartEvent,
StopEvent,
InputRequiredEvent,
HumanResponseEvent,
)
from llama_index.llms.openai import OpenAIResponses
from llama_index.core.llms import ChatMessage
from llama_index.core.llms.structured_llm import StructuredLLM
# replace with an actual flight searcher
class FlightsAPI:
def __init__(self) -> None:
self.allowed_departure = [
"San Francisco",
"San Jose",
"Los Angeles",
"New York",
]
self.allowed_arrival = ["Paris", "London", "Berlin", "Rome"]
self.allowed_hours = ["7.00 AM", "12.00 AM", "5.00 PM", "10.00 PM"]
def search_flights(
self, departure: str, arrival: str, date: str
) -> str | list[str]:
if arrival not in self.allowed_arrival:
return "Sorry, we do not have planes that go to " + arrival
if departure not in self.allowed_departure:
return "Sorry, we do not have planes departing from " + departure
allowed_hours = self.allowed_hours[self.allowed_departure.index(departure) :]
flights = []
for hour in allowed_hours:
flights.append(
f"Flight from {departure} to {arrival} at {hour} on {date} for {random.randint(200, 400)}$"
)
return flights
def book_flight(self, flight: str) -> str:
n = random.randint(0, 1)
if n == 0:
return f"Successfully booked: {flight}"
return "Sorry, something went wrong while booking your flight"
class FlightSearchEvent(InputRequiredEvent):
candidate_flights: list[str]
class FlightChoiceEvent(HumanResponseEvent):
chosen_flight: str
continue_booking: bool
async def get_flights_api(*args, **kwargs) -> FlightsAPI:
return FlightsAPI()
class FlightSearchDetails(BaseModel):
departure_location: str = Field(description="Departure location")
arrival_location: str = Field(description="Arrival location")
date: str = Field(description="Flight date")
async def get_llm(*args, **kwargs) -> StructuredLLM:
return OpenAIResponses("gpt-4.1").as_structured_llm(FlightSearchDetails)
class FlightSearchWorkflow(Workflow):
@step
async def search_for_flight(
self,
ev: StartEvent,
ctx: Context,
llm: Annotated[StructuredLLM, Resource(get_llm)],
flight_api: Annotated[FlightsAPI, Resource(get_flights_api)],
) -> StopEvent | FlightSearchEvent:
response = await llm.achat(
[
ChatMessage(
content=f"Extract flight details from this request: {ev.message}"
)
]
)
if response.message.content:
flight_details = FlightSearchDetails.model_validate_json(
response.message.content
)
else:
return StopEvent(result="Unable to get details for your flight")
flights = flight_api.search_flights(
departure=flight_details.departure_location,
arrival=flight_details.arrival_location,
date=flight_details.date,
)
if isinstance(flights, str):
return StopEvent(result=flights)
else:
return FlightSearchEvent(candidate_flights=flights)
@step
async def chosen_flight(
self,
ev: FlightChoiceEvent,
flight_api: Annotated[FlightsAPI, Resource(get_flights_api)],
ctx: Context,
) -> StopEvent:
if ev.continue_booking:
booking = flight_api.book_flight(ev.chosen_flight)
return StopEvent(result=booking)
else:
return StopEvent(result="No permission to book, exiting...")
async def main(message: str) -> None:
w = FlightSearchWorkflow(timeout=100, verbose=False)
handler = w.run(message=message)
async for ev in handler.stream_events():
if isinstance(ev, FlightSearchEvent):
print("Flights:\n" + "\n- ".join(ev.candidate_flights) + "\n\n")
are_ok = input("Are the flights ok for you? [yes/no] ")
if are_ok.lower().strip() != "yes":
handler.ctx.send_event(
FlightChoiceEvent(chosen_flight="", continue_booking=False)
) # type: ignore
break
res = input("Choose a flight: ")
while res not in ev.candidate_flights:
res = input(
"Sorry, that flight is not available, can you choose one flight from the above, please? Your choice: "
)
appr = input(f"Do you wish to continue with booking for {res}? [yes/no] ")
if appr.lower().strip() == "yes":
handler.ctx.send_event(
FlightChoiceEvent(chosen_flight=res, continue_booking=True)
) # type: ignore
else:
handler.ctx.send_event(
FlightChoiceEvent(chosen_flight=res, continue_booking=False)
) # type: ignore
result = await handler
print(str(result))
workflow = FlightSearchWorkflow(timeout=None)
if __name__ == "__main__":
import asyncio
import os
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"-m", "--message", required=True, help="Flight you would like to take"
)
args = parser.parse_args()
if not os.getenv("OPENAI_API_KEY", None):
raise ValueError(
"You need to set OPENAI_API_KEY in your environment before using this workflow"
)
asyncio.run(main(message=args.message))
-56
View File
@@ -1,56 +0,0 @@
# Invoice Extraction Workflow Example
This workflow demonstrates how to use [llama-index-workflows](https://github.com/run-llama/llama-index-workflows) and LlamaExtract to extract invoice data from documents in a human-in-the-loop, event-driven fashion. The workflow supports three extraction modes: base, advanced, and premium, and allows human approval before finalizing results.
## Installation
Install all required dependencies (including llama-index-workflows and LlamaCloud services):
```bash
pip install -e .
```
## Usage
Run the workflow from the command line:
```bash
python -m invoice_extraction.workflow \
--path /path/to/invoice.pdf \
--mode advanced
```
**Modes:**
- `base`: Fast extraction, minimal reasoning.
- `advanced`: Multimodal extraction with improved OCR and reasoning.
- `premium`: Highest accuracy, citations, and confidence scores.
**Note:**
- You must set your `LLAMA_CLOUD_API_KEY` in the environment before running.
## Workflow Overview
- **invoice_extraction**:
Extracts invoice data using LlamaExtract and the selected extraction mode.
Emits a `FeedbackRequiredEvent` with the extracted results.
- **human_feedback**:
Waits for human approval of the extracted data.
If approved, outputs the result; if declined, restarts extraction.
## Human-in-the-Loop Pattern
- The workflow pauses for human feedback after extraction.
- Input is provided interactively via the command line.
## Customization
- Extend the `InvoiceData` schema for additional invoice fields.
- Integrate with other LlamaIndex components for downstream analytics.
## References
- [llama-index-workflows documentation](https://github.com/run-llama/llama-index-workflows)
- [LlamaExtract documentation](https://docs.cloud.llamaindex.ai/llamaextract/getting_started)
@@ -1,17 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "invoice-extraction"
version = "0.1.0"
description = "A workflow that, given an invoice, extracts several key details using LlamaExtract"
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-workflows",
"llama-cloud-services"
]
[tool.llamadeploy.workflows]
default = "invoice_extraction.workflow:workflow"
@@ -1,149 +0,0 @@
from pydantic import BaseModel, Field
from workflows import Workflow, step, Context
from workflows.events import (
StartEvent,
StopEvent,
InputRequiredEvent,
HumanResponseEvent,
)
from workflows.resource import Resource
from typing import Annotated
from llama_cloud_services import LlamaExtract
from llama_cloud_services.extract import ExtractConfig, ExtractMode
class InvoiceData(BaseModel):
invoice_date: str = Field(description="Date on the invoice")
customer: str = Field(description="Customer reported on the invoice")
amount_due: float = Field(description="Amount due")
class FeedbackRequiredEvent(InputRequiredEvent):
extraction_result: str
class HumanFeedbackEvent(HumanResponseEvent):
approved: bool
async def get_invoice_extractor(*args, **kwargs):
return LlamaExtract()
class InvoiceExtractWorkflow(Workflow):
@step
async def invoice_extraction(
self,
ev: StartEvent,
ctx: Context,
extractor: Annotated[LlamaExtract, Resource(get_invoice_extractor)],
) -> FeedbackRequiredEvent:
async with ctx.store.edit_state() as state:
state.extraction_mode = ev.extraction_mode
state.path = ev.path
if ev.extraction_mode == "base":
config = ExtractConfig(
extraction_mode=ExtractMode.FAST,
high_resolution_mode=False, # Better OCR accuracy
invalidate_cache=False, # Bypass cached results
cite_sources=False, # Enable source citations
use_reasoning=False, # Enable reasoning (not in FAST mode)
confidence_scores=False, # MULTIMODAL/PREMIUM only
)
elif ev.extraction_mode == "advanced":
config = ExtractConfig(
extraction_mode=ExtractMode.MULTIMODAL,
high_resolution_mode=True, # Better OCR accuracy
invalidate_cache=False, # Bypass cached results
cite_sources=False, # Enable source citations
use_reasoning=True, # Enable reasoning (not in FAST mode)
confidence_scores=False, # MULTIMODAL/PREMIUM only
)
else:
config = ExtractConfig(
extraction_mode=ExtractMode.PREMIUM,
high_resolution_mode=True, # Better OCR accuracy
invalidate_cache=False, # Bypass cached results
cite_sources=True, # Enable source citations
use_reasoning=True, # Enable reasoning (not in FAST mode)
confidence_scores=True, # MULTIMODAL/PREMIUM only
)
result = await extractor.aextract(
data_schema=InvoiceData, config=config, files=[ev.path]
)
extracted_data: list[InvoiceData] = []
if isinstance(result, list):
for r in result:
extracted_data.append(InvoiceData.model_validate(r.data))
else:
extracted_data.append(InvoiceData.model_validate(result.data))
async with ctx.store.edit_state() as state:
state.extraction_result = "\\n\\n---\\n\\n".join(
[
f"Invoice Date: {d.invoice_date}\\nCustomer: {d.customer}\\nAmount Due: {d.amount_due}"
for d in extracted_data
]
)
return FeedbackRequiredEvent(
extraction_result="\\n\\n---\\n\\n".join(
[
f"Invoice Date: {d.invoice_date}\\nCustomer: {d.customer}\\nAmount Due: {d.amount_due}"
for d in extracted_data
]
)
)
@step
async def human_feedback(
self, ev: HumanFeedbackEvent, ctx: Context
) -> StopEvent | StartEvent:
state = await ctx.store.get_state()
if ev.approved:
return StopEvent(result=state.extraction_result)
else:
return StartEvent(path=state.path, extraction_mode=state.extraction_mode) # type: ignore
async def main(path: str, extraction_mode: str) -> None:
w = InvoiceExtractWorkflow(timeout=1800, verbose=False)
handler = w.run(path=path, extraction_mode=extraction_mode)
async for ev in handler.stream_events():
if isinstance(ev, FeedbackRequiredEvent):
print("Extraction Result:\\n\\n" + ev.extraction_result + "\\n\\n")
res = input("Approve? [yes/no]: ")
if res.lower().strip() == "yes":
handler.ctx.send_event(HumanFeedbackEvent(approved=True)) # type: ignore
else:
handler.ctx.send_event(HumanFeedbackEvent(approved=False)) # type: ignore
result = await handler
print(str(result))
workflow = InvoiceExtractWorkflow(timeout=None)
if __name__ == "__main__":
import asyncio
import os
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"-p", "--path", required=True, help="Path to the invoice to extract"
)
parser.add_argument(
"-m",
"--mode",
required=True,
help="Extraction mode",
choices=["base", "advanced", "premium"],
)
args = parser.parse_args()
if not os.getenv("LLAMA_CLOUD_API_KEY", None):
raise ValueError(
"You need to set LLAMA_CLOUD_API_KEY in your environment before using this workflow"
)
asyncio.run(main(path=args.path, extraction_mode=args.mode))
-45
View File
@@ -1,45 +0,0 @@
# Retrieval-Augmented Generation (RAG) Workflow Example
This workflow demonstrates how to use [llama-index-workflows](https://github.com/run-llama/llama-index-workflows) to build a retrieval-augmented generation pipeline. Documents are ingested, indexed, retrieved, and used to answer queries with an LLM.
## Installation
Install all required dependencies (including llama-index-workflows and OpenAI LLM support):
```bash
pip install -e .
```
## Usage
Run the workflow from the command line:
```bash
python -m rag.workflow \
--path /path/to/documents/ \
--query "What are the main findings in these reports?"
```
**Note:**
- You must set your `OPENAI_API_KEY` in the environment before running.
## Workflow Overview
- **document_processing_step**:
Loads documents from the specified directory and creates a vector index.
- **retrieve_step**:
Retrieves the top-k relevant documents for the input query.
- **generate_step**:
Uses an LLM to answer the query based on the retrieved documents.
## Customization
- Adjust the retrieval parameters (e.g., `top_k`) for different use cases.
- Extend the workflow to support other document formats or post-processing.
## References
- [llama-index-workflows documentation](https://github.com/run-llama/llama-index-workflows)
-21
View File
@@ -1,21 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "rag"
version = "0.1.0"
description = "A workflow that embeds, indexes and queries your documents on the fly, providing you with a simple RAG pipeline."
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-llms-openai",
"llama-index-embeddings-openai",
"llama-index-workflows"
]
[tool.hatch.build.targets.wheel]
packages = ["src/rag"]
[tool.llamadeploy.workflows]
default = "rag.workflow:workflow"
View File
-91
View File
@@ -1,91 +0,0 @@
from pydantic import ConfigDict
from typing import Annotated
from llama_index.llms.openai import OpenAI
from workflows import Workflow, step, Context
from workflows.events import StartEvent, Event, StopEvent
from workflows.resource import Resource
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.schema import NodeWithScore
from llama_index.core.llms import LLM
class IndexCreatedEvent(Event):
model_config = ConfigDict(arbitrary_types_allowed=True)
index: VectorStoreIndex
class RetrievalEvent(Event):
documents: list[NodeWithScore]
async def get_llm(*args, **kwargs) -> LLM:
return OpenAI(model="gpt-4.1")
class RAGWorkflow(Workflow):
@step
async def document_processing_step(
self, ev: StartEvent, ctx: Context
) -> IndexCreatedEvent:
async with ctx.store.edit_state() as state:
state.query = ev.query
docs = await SimpleDirectoryReader(ev.path).aload_data()
index = VectorStoreIndex.from_documents(documents=docs)
return IndexCreatedEvent(index=index)
@step
async def retrieve_step(
self, ev: IndexCreatedEvent, ctx: Context
) -> RetrievalEvent:
state = await ctx.store.get_state()
retrieved_documents = await ev.index.as_retriever(top_k=5).aretrieve(
state.query
)
return RetrievalEvent(documents=retrieved_documents)
@step
async def generate_step(
self, ev: RetrievalEvent, llm: Annotated[LLM, Resource(get_llm)], ctx: Context
) -> StopEvent:
state = await ctx.store.get_state()
docs = "\\n\\n---\\n\\n".join(
[
f"Content: {node.text}\\nScore: {node.score if node.score else -1}"
for node in ev.documents
]
)
prompt = f"Based on these documents:\\n\\n```md\\n{docs}\\n```\\n\\nAnswer this query: {state.query}"
response = await llm.acomplete(prompt)
return StopEvent(result=response.text)
workflow = RAGWorkflow(timeout=None)
async def main(path: str, query: str):
w = RAGWorkflow(timeout=300)
result = await w.run(path=path, query=query)
print(str(result))
if __name__ == "__main__":
import os
import asyncio
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"-p",
"--path",
help="Path to the directory with the files to ingest",
required=True,
)
parser.add_argument("-q", "--query", help="Retrieval query", required=True)
args = parser.parse_args()
if not os.getenv("OPENAI_API_KEY", None):
raise ValueError(
"You need to set OPENAI_API_KEY in your environment before using this workflow"
)
asyncio.run(main(args.path, args.query))
-45
View File
@@ -1,45 +0,0 @@
# Web Scraping Workflow Example
This workflow demonstrates how to use [llama-index-workflows](https://github.com/run-llama/llama-index-workflows) and Google Gemini to summarize the content of multiple URLs in an event-driven, async-first fashion.
## Installation
Install all required dependencies (including llama-index-workflows and Google GenAI support):
```bash
pip install -e .
```
## Usage
Run the workflow from the command line:
```bash
python -m web_scraping.workflow \
--url https://example.com/page1 \
--url https://example.com/page2
```
**Note:**
- You must set your `GOOGLE_API_KEY` in the environment before running.
## Workflow Overview
- **process_urls**:
Receives a list of URLs and emits a `URLReadEvent` for each.
- **get_url_content**:
Uses Google Gemini to summarize the content of each URL.
- **finalize**:
Collects all summaries and outputs the combined result.
## Customization
- Extend the workflow to extract additional metadata or perform further analysis.
- Integrate with other LlamaIndex components for downstream tasks.
## References
- [llama-index-workflows documentation](https://github.com/run-llama/llama-index-workflows)
-17
View File
@@ -1,17 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "web-scraping"
version = "0.1.0"
description = "A workflow that, given several urls, scrapes and summarizes their content."
requires-python = ">=3.10"
readme = "README.md"
dependencies = [
"llama-index-workflows",
"llama-index-llms-google-genai"
]
[tool.llamadeploy.workflows]
default = "web_scraping.workflow:workflow"
@@ -1,108 +0,0 @@
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.core.llms import ChatMessage
from google.genai.types import Tool, GenerateContentConfig, UrlContext
from typing import Annotated
from pydantic import BaseModel
from workflows import Workflow, step, Context
from workflows.events import Event, StartEvent, StopEvent
from workflows.resource import Resource
model_id = "gemini-2.5-flash"
url_context_tool = Tool(url_context=UrlContext())
config = GenerateContentConfig(
tools=[url_context_tool],
response_modalities=["TEXT"],
)
class URLState(BaseModel):
processed_urls: int = 0
final_content: str = ""
async def get_llm(*args, **kwargs) -> GoogleGenAI:
return GoogleGenAI(model=model_id, generation_config=config)
class URLReadEvent(Event):
url: str
class URLContentEvent(Event):
content: str
class WebScrapeWorkflow(Workflow):
@step
async def process_urls(
self, ev: StartEvent, ctx: Context[URLState]
) -> URLReadEvent | None:
async with ctx.store.edit_state() as state:
state.processed_urls = len(ev.urls)
for url in ev.urls:
ctx.send_event(URLReadEvent(url=url))
@step
async def get_url_content(
self,
ev: URLReadEvent,
llm: Annotated[GoogleGenAI, Resource(get_llm)],
ctx: Context[URLState],
) -> URLContentEvent:
response = llm.chat(
[
ChatMessage(
role="user",
content=f"Can you please summarize the context of this URL: {ev.url}",
)
]
)
async with ctx.store.edit_state() as state:
state.final_content += (
f"### Summary for {ev.url}\\n\\n{response.message.content}\\n\\n"
)
return URLContentEvent(content=response.message.content or "")
@step
async def finalize(
self, ev: URLContentEvent, ctx: Context[URLState]
) -> StopEvent | None:
state = await ctx.store.get_state()
events = ctx.collect_events(ev, [URLContentEvent] * state.processed_urls)
if events:
return StopEvent(result=state.final_content)
workflow = WebScrapeWorkflow(timeout=None)
async def main(urls: list[str]):
w = WebScrapeWorkflow(timeout=300)
result = await w.run(urls=urls)
print(str(result))
if __name__ == "__main__":
import os
import asyncio
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"--url",
help="URLs whose content needs to be summarised",
required=True,
action="append",
)
args = parser.parse_args()
if not os.getenv("GOOGLE_API_KEY", None):
raise ValueError(
"You need to set GOOGLE_API_KEY in your environment before using this workflow"
)
asyncio.run(main(args.url))
+38 -38
View File
@@ -1,51 +1,51 @@
from pathlib import Path
import pytest
import subprocess
import toml
import os
from src.vibe_llama.scaffold import PROJECTS, create_scaffold
from src.vibe_llama.scaffold import create_scaffold
from src.vibe_llama.scaffold.terminal import app1, app2
from prompt_toolkit.application import Application
def test_template_pyprojects_sync_with_catalog() -> None:
"""Ensure static template pyprojects match the PROJECTS catalog."""
@pytest.fixture(autouse=True)
def stub_copier_run_copy(monkeypatch, tmp_path):
"""Stub out copier.run_copy used by scaffold to avoid network access.
Writes minimal project files into the destination directory so tests
can assert their presence without cloning remote templates.
"""
import copier
def _stub_run_copy(template_src, dst_path, *args, **kwargs):
# Infer module name from the template source string
src = str(template_src)
src = src.split("/")[-1]
src = src.removeprefix("template-workflow-")
module_name = (src or "basic").replace("-", "_")
dst = Path(dst_path)
(dst / "src" / module_name).mkdir(parents=True, exist_ok=True)
(dst / "pyproject.toml").write_text(
"""[project]
name = "{name}"
version = "0.1.0"
description = "Test template"
readme = "README.md"
dependencies = []
""".format(name=module_name.replace("_", "-"))
)
(dst / "README.md").write_text(f"# {module_name}\n")
(dst / "src" / module_name / "workflow.py").write_text("workflow = object()\n")
monkeypatch.setattr(copier, "run_copy", _stub_run_copy)
yield
def test_dummy_templates_removed() -> None:
"""Ensure local templates directory is not used anymore."""
templates_root = Path(__file__).resolve().parents[2] / "templates"
for name in PROJECTS:
p = templates_root / name / "pyproject.toml"
assert p.is_file(), f"Missing pyproject for template {name}"
data = toml.loads(p.read_text())
assert data["project"]["name"] == name.replace("_", "-")
assert data["project"]["version"] == "0.1.0"
assert data["project"]["description"] is not None
assert data["project"]["readme"] == "README.md"
assert data["project"]["dependencies"] is not None
# Test that the workflow can be imported and validated
# Change to the template directory to run the validation
template_dir = templates_root / name
module_name = name.replace("-", "_")
# Run validation in a subprocess
result = subprocess.run(
[
"uv",
"run",
"python",
"-c",
f"from {module_name}.workflow import workflow; workflow._validate()",
],
cwd=template_dir,
capture_output=False,
check=True,
text=True,
)
assert result.returncode == 0, (
f"Workflow validation failed for {name}: {result.stderr}"
)
assert templates_root.exists()
@pytest.mark.asyncio