Compare commits

...

6 Commits

Author SHA1 Message Date
github-actions[bot] 8105aa70b6 Release 0.5.12 (#589)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-04-29 15:48:08 +07:00
Marcus Schiesser 23a90625d1 chore: add ruff check 2025-04-29 15:47:13 +07:00
Marcus Schiesser ac789bcb8d chore: check python format 2025-04-29 15:42:10 +07:00
Huu Le 241d82a87d feat: add create-llama artifacts template (python) (#586)
* add artifact template for python

* Add artifact workflows for code and document generation

- Introduced `CodeArtifactWorkflow` and `DocumentArtifactWorkflow` classes to handle code and document artifacts respectively.
- Updated README to include instructions for modifying the factory method to select the appropriate workflow.
- Enhanced clarity in class documentation and improved naming conventions for better understanding.

* bump packages

* fix wrong name

* add ts workflows

* revert change for TS

* docs: fix docs

* add metadata fields

---------

Co-authored-by: Marcus Schiesser <mail@marcusschiesser.de>
2025-04-29 14:22:16 +07:00
github-actions[bot] b16cfd873b chore(release): bump llama-index-server version to 0.1.15 (#576)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2025-04-28 15:55:05 +07:00
Huu Le 3130cdf18d Add support for artifact in llama-index-server (#580)
* support artifact

* migrate poetry to uv

* fix ci

* update ci

* Refactor artifact generation tools by introducing separate CodeGenerator and DocumentGenerator classes. Update app_writer to utilize FunctionAgent for code and document generation workflows. Remove deprecated ArtifactGenerator class. Enhance artifact transformation logic in callbacks. Improve system prompts for clarity and instruction adherence.

* enhance code

* remove previous content from tool input

* fix test

* bump chat ui

* revert changes

* remove dead code

* Add artifact workflows for code and document generation

- Introduced `code_workflow.py` for generating and updating code artifacts based on user requests.
- Introduced `document_workflow.py` for generating and updating document artifacts (Markdown/HTML).
- Created `main.py` to set up FastAPI server with artifact workflows.
- Added a README for setup instructions and usage.
- Implemented UI components for displaying artifact status and progress.
- Updated chat router to remove unused event callbacks.

* remove app_writer workflow

* Refactor artifact workflow classes and UI event handling

- Renamed `ArtifactUIEvents` to `UIEventData` for clarity.
- Introduced `last_artifact` attribute in `ArtifactWorkflow` to streamline artifact retrieval.
- Updated chat history handling to utilize the new `last_artifact` attribute.
- Modified event streaming to use `UIEventData` for consistent event structure.
- Added a new UI component for displaying artifact workflow status and progress.

* Use uv to release package

* Refactor artifact workflows and UI components

- Updated `code_workflow.py` and `document_workflow.py` to improve chat history handling and user message storage.
- Enhanced `ArtifactWorkflow` to utilize optional fields in the `Requirement` model.
- Revised prompt instructions for clarity and conciseness in generating requirements.
- Modified UI event components to reflect changes in workflow stages and improve user feedback.
- Improved error handling for JSON parsing in artifact annotations.

* move code

* Merge remote-tracking branch 'origin/main' into lee/add-artifact

* sort artifact

* fix mypy

* fix adding custom route does not work

* fix mypy

* revert create-llama change

* disable e2e test for python package change

* fix missing set memory

* remove include last artifact in the code

* Add ArtifactEvent model and update workflows to use it
2025-04-28 15:49:20 +07:00
31 changed files with 7291 additions and 6348 deletions
+2
View File
@@ -4,10 +4,12 @@ on:
branches: [main]
paths-ignore:
- "python/llama-index-server/**"
- ".github/workflows/*llama_index_server.yml"
pull_request:
branches: [main]
paths-ignore:
- "python/llama-index-server/**"
- ".github/workflows/*llama_index_server.yml"
jobs:
e2e-python:
@@ -22,7 +22,8 @@ jobs:
working-directory: ./python/llama-index-server
if: |
github.event_name == 'push' &&
!startsWith(github.ref, 'refs/heads/release/llama-index-server-v')
!startsWith(github.ref, 'refs/heads/release/llama-index-server-v') &&
!contains(github.event.head_commit.message, 'Release: llama-index-server v')
steps:
- name: Checkout Repository
@@ -30,17 +31,19 @@ jobs:
with:
fetch-depth: 0
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
- name: Install dependencies
run: poetry install
shell: bash
run: uv sync --all-extras --dev
- name: Setup Git
run: |
@@ -48,15 +51,17 @@ jobs:
git config --global user.name "github-actions[bot]"
- name: Bump patch version
shell: bash
run: |
poetry version patch
uvx --from=toml-cli toml set --toml-path=pyproject.toml project.version $(uvx --from=toml-cli toml get --toml-path=pyproject.toml project.version | awk -F. '{$NF = $NF + 1;}1' OFS=.)
git add pyproject.toml
git commit -m "chore(release): bump version to $(poetry version -s)"
git commit -m "chore(release): bump llama-index-server version to $(uvx --from=toml-cli toml get --toml-path=pyproject.toml project.version)"
- name: Get current version
id: get_version
shell: bash
run: |
version=$(poetry version -s)
version=$(uvx --from=toml-cli toml get --toml-path=pyproject.toml project.version)
echo "current_version=${version}" >> "$GITHUB_OUTPUT"
- name: Create Release PR
@@ -91,31 +96,34 @@ jobs:
- name: Checkout Repository
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
- name: Install dependencies
run: poetry install
shell: bash
run: uv sync --all-extras
- name: Get current version
id: get_version
shell: bash
run: |
version=$(poetry version -s)
version=$(uvx --from=toml-cli toml get --toml-path=pyproject.toml project.version)
echo "current_version=${version}" >> "$GITHUB_OUTPUT"
- name: Build and publish to PyPI
uses: JRubics/poetry-publish@v2.1
with:
python_version: "3.11"
pypi_token: ${{ secrets.PYPI_TOKEN }}
package_directory: "python/llama-index-server"
poetry_install_options: "--without dev"
- name: Build package
shell: bash
run: uv build --no-sources
- name: Publish to PyPI
shell: bash
run: uv publish --token ${{ secrets.PYPI_TOKEN }}
- name: Create GitHub Release
uses: softprops/action-gh-release@v2
+25 -37
View File
@@ -4,7 +4,6 @@ on:
pull_request:
env:
POETRY_VERSION: "1.8.3"
PYTHON_VERSION: "3.9"
jobs:
@@ -21,29 +20,23 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Poetry
run: pipx install poetry==${{ env.POETRY_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- name: Set up python ${{ matrix.python-version }}
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: "poetry"
- name: Configure Poetry
run: |
poetry config virtualenvs.create true
poetry config virtualenvs.in-project true
poetry env use python
- name: Install dependencies
shell: bash
run: poetry install --with dev
run: uv sync --all-extras --dev
- name: Run unit tests
shell: bash
run: |
poetry run pytest tests
run: uv run pytest tests
type-check:
name: Type Check
@@ -54,28 +47,23 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Poetry
run: pipx install poetry==${{ env.POETRY_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: "poetry"
- name: Configure Poetry
run: |
poetry config virtualenvs.create true
poetry config virtualenvs.in-project true
poetry env use python
- name: Install dependencies
shell: bash
run: poetry install --with dev
run: uv sync --all-extras --dev
- name: Run mypy
shell: bash
run: poetry run mypy llama_index
run: uv run mypy llama_index
build:
needs: [unit-test, type-check]
@@ -85,25 +73,25 @@ jobs:
working-directory: python/llama-index-server
steps:
- uses: actions/checkout@v4
- name: Install Poetry
run: pipx install poetry==${{ env.POETRY_VERSION }}
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Clear python cache
- name: Install build package
shell: bash
run: poetry cache clear --all pypi
- name: Build package
shell: bash
run: poetry build
- name: Test installing built package
shell: bash
run: python -m pip install .
run: uv sync --all-extras
- name: Test import
shell: bash
working-directory: ${{ vars.RUNNER_TEMP }}
run: python -c "from llama_index.server import LlamaIndexServer"
run: uv run python -c "from llama_index.server import LlamaIndexServer"
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
+2 -1
View File
@@ -1,3 +1,4 @@
pnpm format
pnpm lint
uvx ruff format --check packages/create-llama/templates/
uvx ruff check .
uvx ruff format . --check
+6
View File
@@ -1,5 +1,11 @@
# create-llama
## 0.5.12
### Patch Changes
- 241d82a: Add artifacts use case (python)
## 0.5.11
### Patch Changes
+1 -1
View File
@@ -562,7 +562,7 @@ const installLlamaIndexServerTemplate = async ({
process.exit(1);
}
await copy("workflow.py", path.join(root, "app"), {
await copy("*.py", path.join(root, "app"), {
parents: true,
cwd: path.join(templatesDir, "components", "workflows", "python", useCase),
});
+2 -1
View File
@@ -57,7 +57,8 @@ export type TemplateUseCase =
| "form_filling"
| "extractor"
| "contract_review"
| "agentic_rag";
| "agentic_rag"
| "artifacts";
// Config for both file and folder
export type FileSourceConfig =
| {
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "create-llama",
"version": "0.5.11",
"version": "0.5.12",
"description": "Create LlamaIndex-powered apps with one command",
"keywords": [
"rag",
+22 -6
View File
@@ -6,7 +6,11 @@ import { ModelConfig, TemplateFramework } from "../helpers/types";
import { PureQuestionArgs, QuestionResults } from "./types";
import { askPostInstallAction, questionHandlers } from "./utils";
type AppType = "agentic_rag" | "financial_report" | "deep_research";
type AppType =
| "agentic_rag"
| "financial_report"
| "deep_research"
| "artifacts";
type SimpleAnswers = {
appType: AppType;
@@ -42,6 +46,12 @@ export const askSimpleQuestions = async (
description:
"Researches and analyzes provided documents from multiple perspectives, generating a comprehensive report with citations to support key findings and insights.",
},
{
title: "Artifacts",
value: "artifacts",
description:
"Build your own Vercel's v0 or OpenAI's canvas-styled UI.",
},
],
},
questionHandlers,
@@ -52,7 +62,7 @@ export const askSimpleQuestions = async (
let useLlamaCloud = false;
if (appType !== "extractor" && appType !== "contract_review") {
if (appType !== "artifacts") {
const { language: newLanguage } = await prompts(
{
type: "select",
@@ -111,10 +121,10 @@ const convertAnswers = async (
args: PureQuestionArgs,
answers: SimpleAnswers,
): Promise<QuestionResults> => {
const MODEL_GPT4o: ModelConfig = {
const MODEL_GPT41: ModelConfig = {
provider: "openai",
apiKey: args.openAiKey,
model: "gpt-4o",
model: "gpt-4.1",
embeddingModel: "text-embedding-3-large",
dimensions: 1536,
isConfigured(): boolean {
@@ -135,13 +145,19 @@ const convertAnswers = async (
template: "llamaindexserver",
dataSources: EXAMPLE_10K_SEC_FILES,
tools: getTools(["interpreter", "document_generator"]),
modelConfig: MODEL_GPT4o,
modelConfig: MODEL_GPT41,
},
deep_research: {
template: "llamaindexserver",
dataSources: EXAMPLE_10K_SEC_FILES,
tools: [],
modelConfig: MODEL_GPT4o,
modelConfig: MODEL_GPT41,
},
artifacts: {
template: "llamaindexserver",
dataSources: [],
tools: [],
modelConfig: MODEL_GPT41,
},
};
@@ -0,0 +1,137 @@
import { Badge } from "@/components/ui/badge";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Progress } from "@/components/ui/progress";
import { Skeleton } from "@/components/ui/skeleton";
import { cn } from "@/lib/utils";
import { Markdown } from "@llamaindex/chat-ui/widgets";
import { ListChecks, Loader2, Wand2 } from "lucide-react";
import { useEffect, useState } from "react";
const STAGE_META = {
plan: {
icon: ListChecks,
badgeText: "Step 1/2: Planning",
gradient: "from-blue-100 via-blue-50 to-white",
progress: 33,
iconBg: "bg-blue-100 text-blue-600",
badge: "bg-blue-100 text-blue-700",
},
generate: {
icon: Wand2,
badgeText: "Step 2/2: Generating",
gradient: "from-violet-100 via-violet-50 to-white",
progress: 66,
iconBg: "bg-violet-100 text-violet-600",
badge: "bg-violet-100 text-violet-700",
},
};
function ArtifactWorkflowCard({ event }) {
const [visible, setVisible] = useState(event?.state !== "completed");
const [fade, setFade] = useState(false);
useEffect(() => {
if (event?.state === "completed") {
setVisible(false);
} else {
setVisible(true);
setFade(false);
}
}, [event?.state]);
if (!event || !visible) return null;
const { state, requirement } = event;
const meta = STAGE_META[state];
if (!meta) return null;
return (
<div className="flex justify-center items-center w-full min-h-[180px] py-2">
<Card
className={cn(
"w-full shadow-md rounded-xl transition-all duration-500",
"border-0",
fade && "opacity-0 pointer-events-none",
`bg-gradient-to-br ${meta.gradient}`,
)}
style={{
boxShadow:
"0 2px 12px 0 rgba(80, 80, 120, 0.08), 0 1px 3px 0 rgba(80, 80, 120, 0.04)",
}}
>
<CardHeader className="flex flex-row items-center gap-2 pb-1 pt-2 px-3">
<div
className={cn(
"rounded-full p-1 flex items-center justify-center",
meta.iconBg,
)}
>
<meta.icon className="w-5 h-5" />
</div>
<CardTitle className="text-base font-semibold flex items-center gap-2">
<Badge className={cn("ml-1", meta.badge, "text-xs px-2 py-0.5")}>
{meta.badgeText}
</Badge>
</CardTitle>
</CardHeader>
<CardContent className="px-3 py-1">
{state === "plan" && (
<div className="flex flex-col items-center gap-2 py-2">
<Loader2 className="animate-spin text-blue-400 w-6 h-6 mb-1" />
<div className="text-sm text-blue-900 font-medium text-center">
Analyzing your request...
</div>
<Skeleton className="w-1/2 h-3 rounded-full mt-1" />
</div>
)}
{state === "generate" && (
<div className="flex flex-col gap-2 py-2">
<div className="flex items-center gap-1">
<Loader2 className="animate-spin text-violet-400 w-4 h-4" />
<span className="text-violet-900 font-medium text-sm">
Working on the requirement:
</span>
</div>
<div className="rounded-lg border border-violet-200 bg-violet-50 px-2 py-1 max-h-24 overflow-auto text-xs">
{requirement ? (
<Markdown content={requirement} />
) : (
<span className="text-violet-400 italic">
No requirements available yet.
</span>
)}
</div>
</div>
)}
</CardContent>
<div className="px-3 pb-2 pt-1">
<Progress
value={meta.progress}
className={cn(
"h-1 rounded-full bg-gray-200",
state === "plan" && "bg-blue-200",
state === "generate" && "bg-violet-200",
)}
indicatorClassName={cn(
"transition-all duration-500",
state === "plan" && "bg-blue-500",
state === "generate" && "bg-violet-500",
)}
/>
</div>
</Card>
</div>
);
}
export default function Component({ events }) {
const aggregateEvents = () => {
if (!events || events.length === 0) return null;
return events[events.length - 1];
};
const event = aggregateEvents();
return <ArtifactWorkflowCard event={event} />;
}
@@ -0,0 +1,69 @@
This is a [LlamaIndex](https://www.llamaindex.ai/) project using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/).
## Getting Started
First, setup the environment with uv:
> **_Note:_** This step is not needed if you are using the dev-container.
```shell
uv sync
```
Then check the parameters that have been pre-configured in the `.env` file in this directory.
Make sure you have set the `OPENAI_API_KEY` for the LLM.
Then, run the development server:
```shell
uv run fastapi dev
```
Then open [http://localhost:8000](http://localhost:8000) with your browser to start the chat UI.
To start the app optimized for **production**, run:
```
uv run fastapi run
```
## Configure LLM and Embedding Model
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [settings.py](app/settings.py).
## Use Case
We have prepared two artifact workflows:
- [Code Workflow](app/code_workflow.py): To generate code and display it in the UI like Vercel's v0.
- [Document Workflow](app/document_workflow.py): Generate and update a document like OpenAI's canvas.
Modify the factory method in [`workflow.py`](app/workflow.py) to decide which artifact workflow to use. Without any changes the Code Workflow is used.
You can start by sending an request on the [chat UI](http://localhost:8000) or you can test the `/api/chat` endpoint with the following curl request:
```
curl --location 'localhost:8000/api/chat' \
--header 'Content-Type: application/json' \
--data '{ "messages": [{ "role": "user", "content": "Create a report comparing the finances of Apple and Tesla" }] }'
```
## Customize the UI
To customize the UI, you can start by modifying the [./components/ui_event.jsx](./components/ui_event.jsx) file.
You can also generate a new code for the workflow using LLM by running the following command:
```
uv run generate_ui
```
## Learn More
To learn more about LlamaIndex, take a look at the following resources:
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows.
- [LlamaIndex Server](https://pypi.org/project/llama-index-server/)
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
@@ -0,0 +1,365 @@
import re
import time
from typing import Any, Literal, Optional, Union
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.llms import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.prompts import PromptTemplate
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.server.api.models import (
Artifact,
ArtifactEvent,
ArtifactType,
ChatRequest,
CodeArtifactData,
UIEvent,
)
from llama_index.server.api.utils import get_last_artifact
from pydantic import BaseModel, Field
class Requirement(BaseModel):
next_step: Literal["answering", "coding"]
language: Optional[str] = None
file_name: Optional[str] = None
requirement: str
class PlanEvent(Event):
user_msg: str
context: Optional[str] = None
class GenerateArtifactEvent(Event):
requirement: Requirement
class SynthesizeAnswerEvent(Event):
pass
class UIEventData(BaseModel):
"""
Event data for updating workflow status to the UI.
"""
state: Literal["plan", "generate", "completed"] = Field(
description="The current state of the workflow. "
"plan: analyze and create a plan for the next step. "
"generate: generate the artifact based on the requirement from the previous step. "
"completed: the workflow is completed. "
)
requirement: Optional[str] = Field(
description="The requirement for generating the artifact. ",
default=None,
)
class CodeArtifactWorkflow(Workflow):
"""
A simple workflow that help generate/update the chat artifact (code, document)
e.g: Help create a NextJS app.
Update the generated code with the user's feedback.
Generate a guideline for the app,...
"""
def __init__(
self,
llm: LLM,
chat_request: ChatRequest,
**kwargs: Any,
):
"""
Args:
llm: The LLM to use.
chat_request: The chat request from the chat app to use.
"""
super().__init__(**kwargs)
self.llm = llm
self.chat_request = chat_request
self.last_artifact = get_last_artifact(chat_request)
@step
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> PlanEvent:
user_msg = ev.user_msg
if user_msg is None:
raise ValueError("user_msg is required to run the workflow")
await ctx.set("user_msg", user_msg)
chat_history = ev.chat_history or []
chat_history.append(
ChatMessage(
role="user",
content=user_msg,
)
)
memory = ChatMemoryBuffer.from_defaults(
chat_history=chat_history,
llm=self.llm,
)
await ctx.set("memory", memory)
return PlanEvent(
user_msg=user_msg,
context=str(self.last_artifact.model_dump_json())
if self.last_artifact
else "",
)
@step
async def planning(
self, ctx: Context, event: PlanEvent
) -> Union[GenerateArtifactEvent, SynthesizeAnswerEvent]:
"""
Based on the conversation history and the user's request
this step will help to provide a good next step for the code or document generation.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="plan",
requirement=None,
),
)
)
prompt = PromptTemplate("""
You are a product analyst responsible for analyzing the user's request and providing the next step for code or document generation.
You are helping user with their code artifact. To update the code, you need to plan a coding step.
Follow these instructions:
1. Carefully analyze the conversation history and the user's request to determine what has been done and what the next step should be.
2. The next step must be one of the following two options:
- "coding": To make the changes to the current code.
- "answering": If you don't need to update the current code or need clarification from the user.
Important: Avoid telling the user to update the code themselves, you are the one who will update the code (by planning a coding step).
3. If the next step is "coding", you may specify the language ("typescript" or "python") and file_name if known, otherwise set them to null.
4. The requirement must be provided clearly what is the user request and what need to be done for the next step in details
as precise and specific as possible, don't be stingy with in the requirement.
5. If the next step is "answering", set language and file_name to null, and the requirement should describe what to answer or explain to the user.
6. Be concise; only return the requirements for the next step.
7. The requirements must be in the following format:
```json
{
"next_step": "answering" | "coding",
"language": "typescript" | "python" | null,
"file_name": string | null,
"requirement": string
}
```
## Example 1:
User request: Create a calculator app.
You should return:
```json
{
"next_step": "coding",
"language": "typescript",
"file_name": "calculator.tsx",
"requirement": "Generate code for a calculator app that has a simple UI with a display and button layout. The display should show the current input and the result. The buttons should include basic operators, numbers, clear, and equals. The calculation should work correctly."
}
```
## Example 2:
User request: Explain how the game loop works.
Context: You have already generated the code for a snake game.
You should return:
```json
{
"next_step": "answering",
"language": null,
"file_name": null,
"requirement": "The user is asking about the game loop. Explain how the game loop works."
}
```
{context}
Now, plan the user's next step for this request:
{user_msg}
""").format(
context=""
if event.context is None
else f"## The context is: \n{event.context}\n",
user_msg=event.user_msg,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# parse the response to Requirement
# 1. use regex to find the json block
json_block = re.search(
r"```(?:json)?\s*([\s\S]*?)\s*```", response.text, re.IGNORECASE
)
if json_block is None:
raise ValueError("No JSON block found in the response.")
# 2. parse the json block to Requirement
requirement = Requirement.model_validate_json(json_block.group(1).strip())
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=requirement.requirement,
),
)
)
# Put the planning result to the memory
# useful for answering step
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"The plan for next step: \n{response.text}",
)
)
await ctx.set("memory", memory)
if requirement.next_step == "coding":
return GenerateArtifactEvent(
requirement=requirement,
)
else:
return SynthesizeAnswerEvent()
@step
async def generate_artifact(
self, ctx: Context, event: GenerateArtifactEvent
) -> SynthesizeAnswerEvent:
"""
Generate the code based on the user's request.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=event.requirement.requirement,
),
)
)
prompt = PromptTemplate("""
You are a skilled developer who can help user with coding.
You are given a task to generate or update a code for a given requirement.
## Follow these instructions:
**1. Carefully read the user's requirements.**
If any details are ambiguous or missing, make reasonable assumptions and clearly reflect those in your output.
If the previous code is provided:
+ Carefully analyze the code with the request to make the right changes.
+ Avoid making a lot of changes from the previous code if the request is not to write the code from scratch again.
**2. For code requests:**
- If the user does not specify a framework or language, default to a React component using the Next.js framework.
- For Next.js, use Shadcn UI components, Typescript, @types/node, @types/react, @types/react-dom, PostCSS, and TailwindCSS.
The import pattern should be:
```
import { ComponentName } from "@/components/ui/component-name"
import { Markdown } from "@llamaindex/chat-ui"
import { cn } from "@/lib/utils"
```
- Ensure the code is idiomatic, production-ready, and includes necessary imports.
- Only generate code relevant to the user's request—do not add extra boilerplate.
**3. Don't be verbose on response**
- No other text or comments only return the code which wrapped by ```language``` block.
- If the user's request is to update the code, only return the updated code.
**4. Only the following languages are allowed: "typescript", "python".**
**5. If there is no code to update, return the reason without any code block.**
## Example:
```typescript
import React from "react";
import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils";
export default function MyComponent() {
return (
<div className="flex flex-col items-center justify-center h-screen">
<Button>Click me</Button>
</div>
);
}
The previous code is:
{previous_artifact}
Now, i have to generate the code for the following requirement:
{requirement}
```
""").format(
previous_artifact=self.last_artifact.model_dump_json()
if self.last_artifact
else "",
requirement=event.requirement,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# Extract the code from the response
language_pattern = r"```(\w+)([\s\S]*)```"
code_match = re.search(language_pattern, response.text)
if code_match is None:
return SynthesizeAnswerEvent()
else:
code = code_match.group(2).strip()
# Put the generated code to the memory
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"Updated the code: \n{response.text}",
)
)
# To show the Canvas panel for the artifact
ctx.write_event_to_stream(
ArtifactEvent(
data=Artifact(
type=ArtifactType.CODE,
created_at=int(time.time()),
data=CodeArtifactData(
language=event.requirement.language or "",
file_name=event.requirement.file_name or "",
code=code,
),
),
)
)
return SynthesizeAnswerEvent()
@step
async def synthesize_answer(
self, ctx: Context, event: SynthesizeAnswerEvent
) -> StopEvent:
"""
Synthesize the answer.
"""
memory: ChatMemoryBuffer = await ctx.get("memory")
chat_history = memory.get()
chat_history.append(
ChatMessage(
role="system",
content="""
You are a helpful assistant who is responsible for explaining the work to the user.
Based on the conversation history, provide an answer to the user's question.
The user has access to the code so avoid mentioning the whole code again in your response.
""",
)
)
response_stream = await self.llm.astream_chat(
messages=chat_history,
)
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="completed",
),
)
)
return StopEvent(result=response_stream)
@@ -0,0 +1,337 @@
import re
import time
from typing import Any, Literal, Optional
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.llms import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.prompts import PromptTemplate
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.server.api.models import (
Artifact,
ArtifactEvent,
ArtifactType,
ChatRequest,
DocumentArtifactData,
UIEvent,
)
from llama_index.server.api.utils import get_last_artifact
from pydantic import BaseModel, Field
class DocumentRequirement(BaseModel):
type: Literal["markdown", "html"]
title: str
requirement: str
class PlanEvent(Event):
user_msg: str
context: Optional[str] = None
class GenerateArtifactEvent(Event):
requirement: DocumentRequirement
class SynthesizeAnswerEvent(Event):
requirement: DocumentRequirement
generated_artifact: str
class UIEventData(BaseModel):
"""
Event data for updating workflow status to the UI.
"""
state: Literal["plan", "generate", "completed"] = Field(
description="The current state of the workflow. "
"plan: analyze and create a plan for the next step. "
"generate: generate the artifact based on the requirement from the previous step. "
"completed: the workflow is completed. "
)
requirement: Optional[str] = Field(
description="The requirement for generating the artifact. ",
default=None,
)
class DocumentArtifactWorkflow(Workflow):
"""
A workflow to help generate or update document artifacts (e.g., Markdown or HTML documents).
Example use cases: Generate a project guideline, update documentation with user feedback, etc.
"""
def __init__(
self,
llm: LLM,
chat_request: ChatRequest,
**kwargs: Any,
):
"""
Args:
llm: The LLM to use.
chat_request: The chat request from the chat app to use.
"""
super().__init__(**kwargs)
self.llm = llm
self.chat_request = chat_request
self.last_artifact = get_last_artifact(chat_request)
@step
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> PlanEvent:
user_msg = ev.user_msg
if user_msg is None:
raise ValueError("user_msg is required to run the workflow")
await ctx.set("user_msg", user_msg)
chat_history = ev.chat_history or []
chat_history.append(
ChatMessage(
role="user",
content=user_msg,
)
)
memory = ChatMemoryBuffer.from_defaults(
chat_history=chat_history,
llm=self.llm,
)
await ctx.set("memory", memory)
return PlanEvent(
user_msg=user_msg,
context=str(self.last_artifact.model_dump_json())
if self.last_artifact
else "",
)
@step
async def planning(self, ctx: Context, event: PlanEvent) -> GenerateArtifactEvent:
"""
Based on the conversation history and the user's request,
this step will provide a clear requirement for the next document generation or update.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="plan",
requirement=None,
),
)
)
prompt = PromptTemplate("""
You are a documentation analyst responsible for analyzing the user's request and providing requirements for document generation or update.
Follow these instructions:
1. Carefully analyze the conversation history and the user's request to determine what has been done and what the next step should be.
2. From the user's request, provide requirements for the next step of the document generation or update.
3. Do not be verbose; only return the requirements for the next step of the document generation or update.
4. Only the following document types are allowed: "markdown", "html".
5. The requirement should be in the following format:
```json
{
"type": "markdown" | "html",
"title": string,
"requirement": string
}
```
## Example:
User request: Create a project guideline document.
You should return:
```json
{
"type": "markdown",
"title": "Project Guideline",
"requirement": "Generate a Markdown document that outlines the project goals, deliverables, and timeline. Include sections for introduction, objectives, deliverables, and timeline."
}
```
User request: Add a troubleshooting section to the guideline.
You should return:
```json
{
"type": "markdown",
"title": "Project Guideline",
"requirement": "Add a 'Troubleshooting' section at the end of the document with common issues and solutions."
}
```
{context}
Now, please plan for the user's request:
{user_msg}
""").format(
context=""
if event.context is None
else f"## The context is: \n{event.context}\n",
user_msg=event.user_msg,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# parse the response to DocumentRequirement
json_block = re.search(r"```json([\s\S]*)```", response.text)
if json_block is None:
raise ValueError("No json block found in the response")
requirement = DocumentRequirement.model_validate_json(
json_block.group(1).strip()
)
# Put the planning result to the memory
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"Planning for the document generation: \n{response.text}",
)
)
await ctx.set("memory", memory)
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=requirement.requirement,
),
)
)
return GenerateArtifactEvent(
requirement=requirement,
)
@step
async def generate_artifact(
self, ctx: Context, event: GenerateArtifactEvent
) -> SynthesizeAnswerEvent:
"""
Generate or update the document based on the user's request.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=event.requirement.requirement,
),
)
)
prompt = PromptTemplate("""
You are a skilled technical writer who can help users with documentation.
You are given a task to generate or update a document for a given requirement.
## Follow these instructions:
**1. Carefully read the user's requirements.**
If any details are ambiguous or missing, make reasonable assumptions and clearly reflect those in your output.
If the previous document is provided:
+ Carefully analyze the document with the request to make the right changes.
+ Avoid making unnecessary changes from the previous document if the request is not to rewrite it from scratch.
**2. For document requests:**
- If the user does not specify a type, default to Markdown.
- Ensure the document is clear, well-structured, and grammatically correct.
- Only generate content relevant to the user's request—do not add extra boilerplate.
**3. Do not be verbose in your response.**
- No other text or comments; only return the document content wrapped by the appropriate code block (```markdown or ```html).
- If the user's request is to update the document, only return the updated document.
**4. Only the following types are allowed: "markdown", "html".**
**5. If there is no change to the document, return the reason without any code block.**
## Example:
```markdown
# Project Guideline
## Introduction
...
```
The previous content is:
{previous_artifact}
Now, please generate the document for the following requirement:
{requirement}
""").format(
previous_artifact=self.last_artifact.model_dump_json()
if self.last_artifact
else "",
requirement=event.requirement,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# Extract the document from the response
language_pattern = r"```(markdown|html)([\s\S]*)```"
doc_match = re.search(language_pattern, response.text)
if doc_match is None:
return SynthesizeAnswerEvent(
requirement=event.requirement,
generated_artifact="There is no change to the document. "
+ response.text.strip(),
)
content = doc_match.group(2).strip()
doc_type = doc_match.group(1)
# Put the generated document to the memory
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"Generated document: \n{response.text}",
)
)
# To show the Canvas panel for the artifact
ctx.write_event_to_stream(
ArtifactEvent(
data=Artifact(
type=ArtifactType.DOCUMENT,
created_at=int(time.time()),
data=DocumentArtifactData(
title=event.requirement.title,
content=content,
type=doc_type, # type: ignore
),
),
)
)
return SynthesizeAnswerEvent(
requirement=event.requirement,
generated_artifact=response.text,
)
@step
async def synthesize_answer(
self, ctx: Context, event: SynthesizeAnswerEvent
) -> StopEvent:
"""
Synthesize the answer for the user.
"""
memory: ChatMemoryBuffer = await ctx.get("memory")
chat_history = memory.get()
chat_history.append(
ChatMessage(
role="system",
content="""
Your responsibility is to explain the work to the user.
If there is no document to update, explain the reason.
If the document is updated, just summarize what changed. Don't need to include the whole document again in the response.
""",
)
)
response_stream = await self.llm.astream_chat(
messages=chat_history,
)
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="completed",
requirement=event.requirement.requirement,
),
)
)
return StopEvent(result=response_stream)
@@ -0,0 +1,15 @@
from app.code_workflow import CodeArtifactWorkflow
# from app.document_workflow import DocumentArtifactWorkflow to generate documents
from llama_index.core.workflow import Workflow
from llama_index.llms.openai import OpenAI
from llama_index.server.api.models import ChatRequest
def create_workflow(chat_request: ChatRequest) -> Workflow:
workflow = CodeArtifactWorkflow(
llm=OpenAI(model="gpt-4.1"),
chat_request=chat_request,
timeout=120.0,
)
return workflow
@@ -12,7 +12,7 @@ dependencies = [
"pydantic<2.10",
"aiostream>=0.5.2,<0.6.0",
"llama-index-core>=0.12.28,<0.13.0",
"llama-index-server>=0.1.14,<0.2.0",
"llama-index-server>=0.1.15,<0.2.0",
]
[project.optional-dependencies]
+24
View File
@@ -0,0 +1,24 @@
# Python files
**/__pycache__
**/build
**/dist
**/venv
**/env
**/llama-index-server.egg-info
# Jupyter files
**/*.ipynb
# Pytest files
**/pytest.ini
**/pytest.ini
# Pytest cache
**/pytest_cache
# Tools
**/.ruff_cache
**/.mypy_cache
**/.pylint.d
**/.pyrightconfig.json
**/.ui
@@ -0,0 +1,52 @@
# Artifacts App
This guide explains how to set up and use the LlamaIndex server with the artifact workflow to write code or documents.
## Prerequisites
- [uv](https://github.com/astral-sh/uv) installed (a fast Python package manager and runner)
- An OpenAI API key
## Steps
1. **Set the OpenAI API Key**
Export your OpenAI API key as an environment variable:
```sh
export OPENAI_API_KEY=your_openai_api_key_here
```
2. **Run the Server Using uv**
Start the server with the following command:
```sh
uv run main.py
```
This will launch the FastAPI server using the workflow defined in `main.py`.
3. **Access the Application**
Open your browser and go to:
```
http://localhost:8000
```
You will see the LlamaIndex Artifact app UI, where you can interact with the workflow.
## Notes
- By default, the server uses the code artifact workflow. If you want to use the document artifact workflow, edit `main.py` and uncomment the following line:
```python
# from examples.artifact.document_workflow import ArtifactWorkflow
```
and comment out the code workflow import.
- The UI provides starter questions to help you get started, or you can enter your own requests.
- The workflow will guide you through planning and generating code or documents based on your input.
@@ -0,0 +1,354 @@
import re
import time
from typing import Any, Literal, Optional, Union
from pydantic import BaseModel
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.llms import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.prompts import PromptTemplate
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.server.api.models import (
Artifact,
ArtifactEvent,
ArtifactType,
ChatRequest,
CodeArtifactData,
UIEvent,
)
from llama_index.server.api.utils import get_last_artifact
class Requirement(BaseModel):
next_step: Literal["answering", "coding"]
language: Optional[str] = None
file_name: Optional[str] = None
requirement: str
class PlanEvent(Event):
user_msg: str
context: Optional[str] = None
class GenerateArtifactEvent(Event):
requirement: Requirement
class SynthesizeAnswerEvent(Event):
pass
class UIEventData(BaseModel):
state: Literal["plan", "generate", "completed"]
requirement: Optional[str] = None
class ArtifactWorkflow(Workflow):
"""
A simple workflow that help generate/update the chat artifact (code, document)
e.g: Help create a NextJS app.
Update the generated code with the user's feedback.
Generate a guideline for the app,...
"""
def __init__(
self,
llm: LLM,
chat_request: ChatRequest,
**kwargs: Any,
):
"""
Args:
llm: The LLM to use.
chat_request: The chat request from the chat app to use.
"""
super().__init__(**kwargs)
self.llm = llm
self.chat_request = chat_request
self.last_artifact = get_last_artifact(chat_request)
@step
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> PlanEvent:
user_msg = ev.user_msg
if user_msg is None:
raise ValueError("user_msg is required to run the workflow")
await ctx.set("user_msg", user_msg)
chat_history = ev.chat_history or []
chat_history.append(
ChatMessage(
role="user",
content=user_msg,
)
)
memory = ChatMemoryBuffer.from_defaults(
chat_history=chat_history,
llm=self.llm,
)
await ctx.set("memory", memory)
return PlanEvent(
user_msg=user_msg,
context=str(self.last_artifact.model_dump_json())
if self.last_artifact
else "",
)
@step
async def planning(
self, ctx: Context, event: PlanEvent
) -> Union[GenerateArtifactEvent, SynthesizeAnswerEvent]:
"""
Based on the conversation history and the user's request
this step will help to provide a good next step for the code or document generation.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="plan",
requirement=None,
),
)
)
prompt = PromptTemplate("""
You are a product analyst responsible for analyzing the user's request and providing the next step for code or document generation.
You are helping user with their code artifact. To update the code, you need to plan a coding step.
Follow these instructions:
1. Carefully analyze the conversation history and the user's request to determine what has been done and what the next step should be.
2. The next step must be one of the following two options:
- "coding": To make the changes to the current code.
- "answering": If you don't need to update the current code or need clarification from the user.
Important: Avoid telling the user to update the code themselves, you are the one who will update the code (by planning a coding step).
3. If the next step is "coding", you may specify the language ("typescript" or "python") and file_name if known, otherwise set them to null.
4. The requirement must be provided clearly what is the user request and what need to be done for the next step in details
as precise and specific as possible, don't be stingy with in the requirement.
5. If the next step is "answering", set language and file_name to null, and the requirement should describe what to answer or explain to the user.
6. Be concise; only return the requirements for the next step.
7. The requirements must be in the following format:
```json
{
"next_step": "answering" | "coding",
"language": "typescript" | "python" | null,
"file_name": string | null,
"requirement": string
}
```
## Example 1:
User request: Create a calculator app.
You should return:
```json
{
"next_step": "coding",
"language": "typescript",
"file_name": "calculator.tsx",
"requirement": "Generate code for a calculator app that has a simple UI with a display and button layout. The display should show the current input and the result. The buttons should include basic operators, numbers, clear, and equals. The calculation should work correctly."
}
```
## Example 2:
User request: Explain how the game loop works.
Context: You have already generated the code for a snake game.
You should return:
```json
{
"next_step": "answering",
"language": null,
"file_name": null,
"requirement": "The user is asking about the game loop. Explain how the game loop works."
}
```
{context}
Now, plan the user's next step for this request:
{user_msg}
""").format(
context=""
if event.context is None
else f"## The context is: \n{event.context}\n",
user_msg=event.user_msg,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# parse the response to Requirement
# 1. use regex to find the json block
json_block = re.search(
r"```(?:json)?\s*([\s\S]*?)\s*```", response.text, re.IGNORECASE
)
if json_block is None:
raise ValueError("No JSON block found in the response.")
# 2. parse the json block to Requirement
requirement = Requirement.model_validate_json(json_block.group(1).strip())
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=requirement.requirement,
),
)
)
# Put the planning result to the memory
# useful for answering step
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"The plan for next step: \n{response.text}",
)
)
await ctx.set("memory", memory)
if requirement.next_step == "coding":
return GenerateArtifactEvent(
requirement=requirement,
)
else:
return SynthesizeAnswerEvent()
@step
async def generate_artifact(
self, ctx: Context, event: GenerateArtifactEvent
) -> SynthesizeAnswerEvent:
"""
Generate the code based on the user's request.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=event.requirement.requirement,
),
)
)
prompt = PromptTemplate("""
You are a skilled developer who can help user with coding.
You are given a task to generate or update a code for a given requirement.
## Follow these instructions:
**1. Carefully read the user's requirements.**
If any details are ambiguous or missing, make reasonable assumptions and clearly reflect those in your output.
If the previous code is provided:
+ Carefully analyze the code with the request to make the right changes.
+ Avoid making a lot of changes from the previous code if the request is not to write the code from scratch again.
**2. For code requests:**
- If the user does not specify a framework or language, default to a React component using the Next.js framework.
- For Next.js, use Shadcn UI components, Typescript, @types/node, @types/react, @types/react-dom, PostCSS, and TailwindCSS.
The import pattern should be:
```
import { ComponentName } from "@/components/ui/component-name"
import { Markdown } from "@llamaindex/chat-ui"
import { cn } from "@/lib/utils"
```
- Ensure the code is idiomatic, production-ready, and includes necessary imports.
- Only generate code relevant to the user's request—do not add extra boilerplate.
**3. Don't be verbose on response**
- No other text or comments only return the code which wrapped by ```language``` block.
- If the user's request is to update the code, only return the updated code.
**4. Only the following languages are allowed: "typescript", "python".**
**5. If there is no code to update, return the reason without any code block.**
## Example:
```typescript
import React from "react";
import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils";
export default function MyComponent() {
return (
<div className="flex flex-col items-center justify-center h-screen">
<Button>Click me</Button>
</div>
);
}
The previous code is:
{previous_artifact}
Now, i have to generate the code for the following requirement:
{requirement}
```
""").format(
previous_artifact=self.last_artifact.model_dump_json()
if self.last_artifact
else "",
requirement=event.requirement,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# Extract the code from the response
language_pattern = r"```(\w+)([\s\S]*)```"
code_match = re.search(language_pattern, response.text)
if code_match is None:
return SynthesizeAnswerEvent()
else:
code = code_match.group(2).strip()
# Put the generated code to the memory
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"Updated the code: \n{response.text}",
)
)
# To show the Canvas panel for the artifact
ctx.write_event_to_stream(
ArtifactEvent(
data=Artifact(
type=ArtifactType.CODE,
created_at=int(time.time()),
data=CodeArtifactData(
language=event.requirement.language or "",
file_name=event.requirement.file_name or "",
code=code,
),
),
)
)
return SynthesizeAnswerEvent()
@step
async def synthesize_answer(
self, ctx: Context, event: SynthesizeAnswerEvent
) -> StopEvent:
"""
Synthesize the answer.
"""
memory: ChatMemoryBuffer = await ctx.get("memory")
chat_history = memory.get()
chat_history.append(
ChatMessage(
role="system",
content="""
You are a helpful assistant who is responsible for explaining the work to the user.
Based on the conversation history, provide an answer to the user's question.
The user has access to the code so avoid mentioning the whole code again in your response.
""",
)
)
response_stream = await self.llm.astream_chat(
messages=chat_history,
)
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="completed",
),
)
)
return StopEvent(result=response_stream)
@@ -0,0 +1,137 @@
import { Badge } from "@/components/ui/badge";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Progress } from "@/components/ui/progress";
import { Skeleton } from "@/components/ui/skeleton";
import { cn } from "@/lib/utils";
import { Markdown } from "@llamaindex/chat-ui/widgets";
import { ListChecks, Loader2, Wand2 } from "lucide-react";
import { useEffect, useState } from "react";
const STAGE_META = {
plan: {
icon: ListChecks,
badgeText: "Step 1/2: Planning",
gradient: "from-blue-100 via-blue-50 to-white",
progress: 33,
iconBg: "bg-blue-100 text-blue-600",
badge: "bg-blue-100 text-blue-700",
},
generate: {
icon: Wand2,
badgeText: "Step 2/2: Generating",
gradient: "from-violet-100 via-violet-50 to-white",
progress: 66,
iconBg: "bg-violet-100 text-violet-600",
badge: "bg-violet-100 text-violet-700",
},
};
function ArtifactWorkflowCard({ event }) {
const [visible, setVisible] = useState(event?.state !== "completed");
const [fade, setFade] = useState(false);
useEffect(() => {
if (event?.state === "completed") {
setVisible(false);
} else {
setVisible(true);
setFade(false);
}
}, [event?.state]);
if (!event || !visible) return null;
const { state, requirement } = event;
const meta = STAGE_META[state];
if (!meta) return null;
return (
<div className="flex justify-center items-center w-full min-h-[180px] py-2">
<Card
className={cn(
"w-full shadow-md rounded-xl transition-all duration-500",
"border-0",
fade && "opacity-0 pointer-events-none",
`bg-gradient-to-br ${meta.gradient}`,
)}
style={{
boxShadow:
"0 2px 12px 0 rgba(80, 80, 120, 0.08), 0 1px 3px 0 rgba(80, 80, 120, 0.04)",
}}
>
<CardHeader className="flex flex-row items-center gap-2 pb-1 pt-2 px-3">
<div
className={cn(
"rounded-full p-1 flex items-center justify-center",
meta.iconBg,
)}
>
<meta.icon className="w-5 h-5" />
</div>
<CardTitle className="text-base font-semibold flex items-center gap-2">
<Badge className={cn("ml-1", meta.badge, "text-xs px-2 py-0.5")}>
{meta.badgeText}
</Badge>
</CardTitle>
</CardHeader>
<CardContent className="px-3 py-1">
{state === "plan" && (
<div className="flex flex-col items-center gap-2 py-2">
<Loader2 className="animate-spin text-blue-400 w-6 h-6 mb-1" />
<div className="text-sm text-blue-900 font-medium text-center">
Analyzing your request...
</div>
<Skeleton className="w-1/2 h-3 rounded-full mt-1" />
</div>
)}
{state === "generate" && (
<div className="flex flex-col gap-2 py-2">
<div className="flex items-center gap-1">
<Loader2 className="animate-spin text-violet-400 w-4 h-4" />
<span className="text-violet-900 font-medium text-sm">
Working on the requirement:
</span>
</div>
<div className="rounded-lg border border-violet-200 bg-violet-50 px-2 py-1 max-h-24 overflow-auto text-xs">
{requirement ? (
<Markdown content={requirement} />
) : (
<span className="text-violet-400 italic">
No requirements available yet.
</span>
)}
</div>
</div>
)}
</CardContent>
<div className="px-3 pb-2 pt-1">
<Progress
value={meta.progress}
className={cn(
"h-1 rounded-full bg-gray-200",
state === "plan" && "bg-blue-200",
state === "generate" && "bg-violet-200",
)}
indicatorClassName={cn(
"transition-all duration-500",
state === "plan" && "bg-blue-500",
state === "generate" && "bg-violet-500",
)}
/>
</div>
</Card>
</div>
);
}
export default function Component({ events }) {
const aggregateEvents = () => {
if (!events || events.length === 0) return null;
return events[events.length - 1];
};
const event = aggregateEvents();
return <ArtifactWorkflowCard event={event} />;
}
@@ -0,0 +1,326 @@
import re
import time
from typing import Any, Literal, Optional
from pydantic import BaseModel
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.llms import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.prompts import PromptTemplate
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.server.api.models import (
Artifact,
ArtifactEvent,
ArtifactType,
ChatRequest,
DocumentArtifactData,
UIEvent,
)
from llama_index.server.api.utils import get_last_artifact
class DocumentRequirement(BaseModel):
type: Literal["markdown", "html"]
title: str
requirement: str
class PlanEvent(Event):
user_msg: str
context: Optional[str] = None
class GenerateArtifactEvent(Event):
requirement: DocumentRequirement
class SynthesizeAnswerEvent(Event):
requirement: DocumentRequirement
generated_artifact: str
class UIEventData(BaseModel):
state: Literal["plan", "generate", "completed"]
requirement: Optional[str]
class ArtifactWorkflow(Workflow):
"""
A workflow to help generate or update document artifacts (e.g., Markdown or HTML documents).
Example use cases: Generate a project guideline, update documentation with user feedback, etc.
"""
def __init__(
self,
llm: LLM,
chat_request: ChatRequest,
**kwargs: Any,
):
"""
Args:
llm: The LLM to use.
chat_request: The chat request from the chat app to use.
"""
super().__init__(**kwargs)
self.llm = llm
self.chat_request = chat_request
self.last_artifact = get_last_artifact(chat_request)
@step
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> PlanEvent:
user_msg = ev.user_msg
if user_msg is None:
raise ValueError("user_msg is required to run the workflow")
await ctx.set("user_msg", user_msg)
chat_history = ev.chat_history or []
chat_history.append(
ChatMessage(
role="user",
content=user_msg,
)
)
memory = ChatMemoryBuffer.from_defaults(
chat_history=chat_history,
llm=self.llm,
)
await ctx.set("memory", memory)
return PlanEvent(
user_msg=user_msg,
context=str(self.last_artifact.model_dump_json())
if self.last_artifact
else "",
)
@step
async def planning(self, ctx: Context, event: PlanEvent) -> GenerateArtifactEvent:
"""
Based on the conversation history and the user's request,
this step will provide a clear requirement for the next document generation or update.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="plan",
requirement=None,
),
)
)
prompt = PromptTemplate("""
You are a documentation analyst responsible for analyzing the user's request and providing requirements for document generation or update.
Follow these instructions:
1. Carefully analyze the conversation history and the user's request to determine what has been done and what the next step should be.
2. From the user's request, provide requirements for the next step of the document generation or update.
3. Do not be verbose; only return the requirements for the next step of the document generation or update.
4. Only the following document types are allowed: "markdown", "html".
5. The requirement should be in the following format:
```json
{
"type": "markdown" | "html",
"title": string,
"requirement": string
}
```
## Example:
User request: Create a project guideline document.
You should return:
```json
{
"type": "markdown",
"title": "Project Guideline",
"requirement": "Generate a Markdown document that outlines the project goals, deliverables, and timeline. Include sections for introduction, objectives, deliverables, and timeline."
}
```
User request: Add a troubleshooting section to the guideline.
You should return:
```json
{
"type": "markdown",
"title": "Project Guideline",
"requirement": "Add a 'Troubleshooting' section at the end of the document with common issues and solutions."
}
```
{context}
Now, please plan for the user's request:
{user_msg}
""").format(
context=""
if event.context is None
else f"## The context is: \n{event.context}\n",
user_msg=event.user_msg,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# parse the response to DocumentRequirement
json_block = re.search(r"```json([\s\S]*)```", response.text)
if json_block is None:
raise ValueError("No json block found in the response")
requirement = DocumentRequirement.model_validate_json(
json_block.group(1).strip()
)
# Put the planning result to the memory
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"Planning for the document generation: \n{response.text}",
)
)
await ctx.set("memory", memory)
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=requirement.requirement,
),
)
)
return GenerateArtifactEvent(
requirement=requirement,
)
@step
async def generate_artifact(
self, ctx: Context, event: GenerateArtifactEvent
) -> SynthesizeAnswerEvent:
"""
Generate or update the document based on the user's request.
"""
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="generate",
requirement=event.requirement.requirement,
),
)
)
prompt = PromptTemplate("""
You are a skilled technical writer who can help users with documentation.
You are given a task to generate or update a document for a given requirement.
## Follow these instructions:
**1. Carefully read the user's requirements.**
If any details are ambiguous or missing, make reasonable assumptions and clearly reflect those in your output.
If the previous document is provided:
+ Carefully analyze the document with the request to make the right changes.
+ Avoid making unnecessary changes from the previous document if the request is not to rewrite it from scratch.
**2. For document requests:**
- If the user does not specify a type, default to Markdown.
- Ensure the document is clear, well-structured, and grammatically correct.
- Only generate content relevant to the user's request—do not add extra boilerplate.
**3. Do not be verbose in your response.**
- No other text or comments; only return the document content wrapped by the appropriate code block (```markdown or ```html).
- If the user's request is to update the document, only return the updated document.
**4. Only the following types are allowed: "markdown", "html".**
**5. If there is no change to the document, return the reason without any code block.**
## Example:
```markdown
# Project Guideline
## Introduction
...
```
The previous content is:
{previous_artifact}
Now, please generate the document for the following requirement:
{requirement}
""").format(
previous_artifact=self.last_artifact.model_dump_json()
if self.last_artifact
else "",
requirement=event.requirement,
)
response = await self.llm.acomplete(
prompt=prompt,
formatted=True,
)
# Extract the document from the response
language_pattern = r"```(markdown|html)([\s\S]*)```"
doc_match = re.search(language_pattern, response.text)
if doc_match is None:
return SynthesizeAnswerEvent(
requirement=event.requirement,
generated_artifact="There is no change to the document. "
+ response.text.strip(),
)
content = doc_match.group(2).strip()
doc_type = doc_match.group(1)
# Put the generated document to the memory
memory: ChatMemoryBuffer = await ctx.get("memory")
memory.put(
ChatMessage(
role="assistant",
content=f"Generated document: \n{response.text}",
)
)
# To show the Canvas panel for the artifact
ctx.write_event_to_stream(
ArtifactEvent(
data=Artifact(
type=ArtifactType.DOCUMENT,
created_at=int(time.time()),
data=DocumentArtifactData(
title=event.requirement.title,
content=content,
type=doc_type, # type: ignore
),
),
)
)
return SynthesizeAnswerEvent(
requirement=event.requirement,
generated_artifact=response.text,
)
@step
async def synthesize_answer(
self, ctx: Context, event: SynthesizeAnswerEvent
) -> StopEvent:
"""
Synthesize the answer for the user.
"""
memory: ChatMemoryBuffer = await ctx.get("memory")
chat_history = memory.get()
chat_history.append(
ChatMessage(
role="system",
content="""
Your responsibility is to explain the work to the user.
If there is no document to update, explain the reason.
If the document is updated, just summarize what changed. Don't need to include the whole document again in the response.
""",
)
)
response_stream = await self.llm.astream_chat(
messages=chat_history,
)
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
data=UIEventData(
state="completed",
requirement=event.requirement.requirement,
),
)
)
return StopEvent(result=response_stream)
@@ -0,0 +1,43 @@
from fastapi import FastAPI
from examples.artifact.code_workflow import ArtifactWorkflow
# To use document artifact workflow, uncomment the following line
# from examples.artifact.document_workflow import ArtifactWorkflow
from llama_index.core.workflow import Workflow
from llama_index.llms.openai import OpenAI
from llama_index.server import LlamaIndexServer, UIConfig
from llama_index.server.api.models import ChatRequest
def create_workflow(chat_request: ChatRequest) -> Workflow:
workflow = ArtifactWorkflow(
llm=OpenAI(model="gpt-4.1"),
chat_request=chat_request,
timeout=120.0,
)
return workflow
def create_app() -> FastAPI:
app = LlamaIndexServer(
workflow_factory=create_workflow,
ui_config=UIConfig(
app_title="Artifact",
starter_questions=[
"Write a simple calculator app",
"Write a guideline on how to use LLM effectively",
],
component_dir="components",
),
)
return app
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
@@ -1,13 +1,14 @@
import logging
import os
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field, field_validator
from llama_index.core.schema import NodeWithScore
from llama_index.core.types import ChatMessage, MessageRole
from llama_index.core.workflow import Event
from llama_index.server.settings import server_settings
from pydantic import BaseModel, Field, field_validator
logger = logging.getLogger("uvicorn")
@@ -22,6 +23,7 @@ class ChatConfig(BaseModel):
class ChatAPIMessage(BaseModel):
role: MessageRole
content: str
annotations: Optional[List[Any]] = None
def to_llamaindex_message(self) -> ChatMessage:
return ChatMessage(role=self.role, content=self.content)
@@ -151,3 +153,54 @@ class UIEvent(Event):
"type": self.type,
"data": self.data.model_dump(),
}
class ArtifactType(str, Enum):
CODE = "code"
DOCUMENT = "document"
class CodeArtifactData(BaseModel):
file_name: str
code: str
language: str
class DocumentArtifactData(BaseModel):
title: str
content: str
type: Literal["markdown", "html"]
class Artifact(BaseModel):
created_at: Optional[int] = None
type: ArtifactType
data: Union[CodeArtifactData, DocumentArtifactData]
@classmethod
def from_message(cls, message: ChatAPIMessage) -> Optional["Artifact"]:
if not message.annotations or not isinstance(message.annotations, list):
return None
for annotation in message.annotations:
if isinstance(annotation, dict) and annotation.get("type") == "artifact":
try:
artifact = cls.model_validate(annotation.get("data"))
return artifact
except Exception as e:
logger.warning(
f"Failed to parse artifact from annotation: {annotation}. Error: {e}"
)
return None
class ArtifactEvent(Event):
type: str = "artifact"
data: Artifact
def to_response(self) -> dict:
return {
"type": self.type,
"data": self.data.model_dump(),
}
@@ -7,14 +7,18 @@ from typing import AsyncGenerator, Callable, Union
from fastapi import APIRouter, BackgroundTasks, HTTPException
from fastapi.responses import StreamingResponse
from llama_index.core.agent.workflow.workflow_events import AgentStream
from llama_index.core.agent.workflow.workflow_events import (
AgentInput,
AgentSetup,
AgentStream,
)
from llama_index.core.workflow import StopEvent, Workflow
from llama_index.server.api.callbacks import (
EventCallback,
LlamaCloudFileDownload,
SourceNodesFromToolCall,
SuggestNextQuestions,
)
from llama_index.server.api.callbacks.base import EventCallback
from llama_index.server.api.callbacks.llamacloud import LlamaCloudFileDownload
from llama_index.server.api.callbacks.stream_handler import StreamHandler
from llama_index.server.api.models import ChatRequest
from llama_index.server.api.utils.vercel_stream import VercelStreamResponse
@@ -114,15 +118,8 @@ async def _stream_content(
elif hasattr(chunk, "delta") and chunk.delta:
yield chunk.delta
stream_started = False
try:
async for event in handler.stream_events():
if not stream_started:
# Start the stream with an empty message
stream_started = True
yield VercelStreamResponse.convert_text("")
# Handle different types of events
if isinstance(event, (AgentStream, StopEvent)):
async for chunk in _text_stream(event):
handler.accumulate_text(chunk)
@@ -133,12 +130,14 @@ async def _stream_content(
event_response = event.to_response()
yield VercelStreamResponse.convert_data(event_response)
else:
yield VercelStreamResponse.convert_data(event.model_dump())
# Ignore unnecessary agent workflow events
if not isinstance(event, (AgentInput, AgentSetup)):
yield VercelStreamResponse.convert_data(event.model_dump())
except asyncio.CancelledError:
logger.warning("Client cancelled the request!")
await handler.cancel_run()
except Exception as e:
logger.error(f"Error in stream response: {e}")
logger.error(f"Error in stream response: {e}", exc_info=True)
yield VercelStreamResponse.convert_error(str(e))
await handler.cancel_run()
@@ -0,0 +1,3 @@
from .chat_request import get_artifacts, get_last_artifact
__all__ = ["get_artifacts", "get_last_artifact"]
@@ -0,0 +1,23 @@
from typing import List, Optional
from llama_index.server.api.models import Artifact, ChatRequest
def get_artifacts(chat_request: ChatRequest) -> List[Artifact]:
"""
Return a list of artifacts sorted by their creation time.
Artifacts without a creation time are placed at the end.
"""
return sorted(
[
artifact
for artifact in (Artifact.from_message(m) for m in chat_request.messages)
if artifact is not None
],
key=lambda a: (a.created_at is None, a.created_at),
)
def get_last_artifact(chat_request: ChatRequest) -> Optional[Artifact]:
artifacts = get_artifacts(chat_request)
return artifacts[-1] if len(artifacts) > 0 else None
@@ -5,7 +5,7 @@ from typing import Optional
import requests
CHAT_UI_VERSION = "0.1.5"
CHAT_UI_VERSION = "0.1.6"
def download_chat_ui(
@@ -5,12 +5,14 @@ from typing import Any, Callable, Optional, Union
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.routing import Mount
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from llama_index.core.workflow import Workflow
from llama_index.server.api.routers import chat_router, custom_components_router
from llama_index.server.chat_ui import download_chat_ui
from llama_index.server.settings import server_settings
from pydantic import BaseModel, Field
class UIConfig(BaseModel):
@@ -162,7 +164,10 @@ class LlamaIndexServer(FastAPI):
)
download_chat_ui(logger=self.logger, target_path=self.ui_config.ui_path)
self._mount_static_files(
directory=self.ui_config.ui_path, path="/", html=True
directory=self.ui_config.ui_path,
path="/",
html=True,
name=self.ui_config.ui_path,
)
self._override_ui_config()
@@ -204,7 +209,11 @@ class LlamaIndexServer(FastAPI):
)
def _mount_static_files(
self, directory: str, path: str, html: bool = False
self,
directory: str,
path: str,
html: bool = False,
name: Optional[str] = None,
) -> None:
"""
Mount static files from a directory if it exists.
@@ -214,7 +223,7 @@ class LlamaIndexServer(FastAPI):
self.mount(
path,
StaticFiles(directory=directory, check_dir=False, html=html),
name=f"{directory}-static",
name=name or f"{directory}-static",
)
def allow_cors(self, origin: str = "*") -> None:
@@ -228,3 +237,19 @@ class LlamaIndexServer(FastAPI):
allow_methods=["*"],
allow_headers=["*"],
)
def add_api_route(self, *args: Any, **kwargs: Any) -> None:
"""
Add an API route to the server.
"""
# Because static files are mounted at the root path by default,
# we need to place them at the end of the routes list.
ui_route = None
for route in self.routes:
if isinstance(route, Mount):
if route.name == self.ui_config.ui_path:
ui_route = route
self.routes.remove(route)
super().add_api_route(*args, **kwargs)
if ui_route:
self.mount(ui_route.path, ui_route.app, name=ui_route.name)
File diff suppressed because it is too large Load Diff
+50 -48
View File
@@ -1,65 +1,67 @@
[build-system]
build-backend = "poetry.core.masonry.api"
requires = ["poetry-core"]
[project]
name = "llama-index-server"
version = "0.1.15"
description = "llama-index fastapi server"
readme = "README.md"
license = "MIT"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.9,<4.0"
dependencies = [
"fastapi[standard]>=0.115.11,<1.0.0",
"cachetools>=5.5.2,<6.0.0",
"requests>=2.32.3,<3.0.0",
"pydantic-settings>=2.8.1,<3.0.0",
"llama-index-core>=0.12.28,<1.0.0",
"llama-index-readers-file>=0.4.6,<1.0.0",
"llama-index-indices-managed-llama-cloud>=0.6.3,<1.0.0",
]
[tool.codespell]
check-filenames = true
check-hidden = true
# Feel free to un-skip examples, and experimental, you will just need to
# work through many typos (--write-changes and --interactive will help)
skip = "*.csv,*.html,*.json,*.jsonl,*.pdf,*.txt,*.ipynb"
[tool.mypy]
disallow_untyped_defs = true
# Remove venv skip when integrated with pre-commit
exclude = ["_static", "build", "examples", "notebooks", "venv"]
ignore_missing_imports = true
namespace_packages = true
explicit_package_bases = true
python_version = "3.10"
[tool.poetry]
authors = ["Your Name <you@example.com>"]
description = "llama-index fastapi server"
exclude = ["**/BUILD"]
license = "MIT"
name = "llama-index-server"
packages = [{include = "llama_index/"}]
readme = "README.md"
version = "0.1.14"
[tool.poetry.dependencies]
python = ">=3.9,<4.0"
fastapi = {extras = ["standard"], version = "^0.115.11"}
cachetools = "^5.5.2"
requests = "^2.32.3"
pydantic-settings = "^2.8.1"
llama-index-core = "^0.12.28"
llama-index-readers-file = "^0.4.6"
llama-index-indices-managed-llama-cloud = "0.6.3"
[build-system]
requires = [ "hatchling>=1.24" ]
build-backend = "hatchling.build"
[tool.poetry.group.dev.dependencies]
black = {extras = ["jupyter"], version = "<=23.9.1,>=23.7.0"}
codespell = {extras = ["toml"], version = ">=v2.2.6"}
e2b-code-interpreter = "^1.1.1"
ipython = "8.10.0"
jupyter = "^1.0.0"
markdown = "^3.7"
mypy = "1.15.0"
pre-commit = "3.2.0"
pylint = "2.15.10"
pytest = "^8.3.5"
pytest-asyncio = "^0.25.3"
pytest-mock = "3.11.1"
ruff = "0.0.292"
tree-sitter-languages = "^1.8.0"
types-Deprecated = ">=0.1.0"
types-PyYAML = "^6.0.12.12"
types-protobuf = "^4.24.0.4"
types-redis = "4.5.5.0"
types-requests = "2.28.11.8" # TODO: unpin when mypy>0.991
types-setuptools = "67.1.0.0"
xhtml2pdf = "^0.2.17"
pytest-cov = "^6.0.0"
llama-cloud = "^0.1.17"
[dependency-groups]
dev = [
"llama-index-llms-openai>=0.3.38",
"black[jupyter]<=23.9.1,>=23.7.0",
"codespell[toml]>=2.2.6",
"e2b-code-interpreter>=1.1.1,<2.0.0",
"ipython==8.10.0",
"jupyter>=1.0.0,<2.0.0",
"markdown>=3.7,<4.0",
"mypy==1.15.0",
"pre-commit==3.2.0",
"pylint==2.15.10",
"pytest>=8.3.5,<9.0.0",
"pytest-asyncio>=0.25.3,<1.0.0",
"pytest-mock==3.11.1",
"ruff==0.0.292",
"tree-sitter-languages>=1.8.0,<2.0.0",
"types-Deprecated>=0.1.0",
"types-PyYAML>=6.0.12.12,<7.0.0.0",
"types-protobuf>=4.24.0.4,<5.0.0.0",
"types-redis==4.5.5.0",
"types-requests==2.28.11.8",
"types-setuptools==67.1.0.0",
"xhtml2pdf>=0.2.17,<1.0.0",
"pytest-cov>=6.0.0,<7.0.0",
"llama-cloud>=0.1.17,<1.0.0",
]
[tool.hatch.build.targets.wheel]
packages = ["llama_index/"]
@@ -1,10 +1,12 @@
import asyncio
import logging
from typing import Any, AsyncGenerator
from unittest.mock import AsyncMock, MagicMock
import pytest
from llama_index.core.agent.workflow.workflow_events import AgentStream
from llama_index.core.types import MessageRole
from llama_index.core.workflow import StopEvent
from llama_index.core.workflow.handler import WorkflowHandler
from llama_index.server.api.models import ChatAPIMessage, ChatRequest
@@ -13,17 +15,19 @@ from llama_index.server.api.utils.vercel_stream import VercelStreamResponse
@pytest.fixture()
def logger():
def logger() -> logging.Logger:
return logging.getLogger("test")
@pytest.fixture()
def chat_request():
return ChatRequest(messages=[ChatAPIMessage(role="user", content="test message")])
def chat_request() -> ChatRequest:
return ChatRequest(
messages=[ChatAPIMessage(role=MessageRole.USER, content="test message")]
)
@pytest.fixture()
def mock_workflow_handler():
def mock_workflow_handler() -> AsyncMock:
handler = AsyncMock(spec=WorkflowHandler)
handler.accumulate_text = MagicMock()
return handler
@@ -32,8 +36,11 @@ def mock_workflow_handler():
class TestEventStream:
@pytest.mark.asyncio()
async def test_stream_content_with_agent_stream(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
mock_workflow_handler.stream_events.return_value = (
self._mock_agent_stream_events()
@@ -48,15 +55,17 @@ class TestEventStream:
]
# Assert
assert len(result) == 3 # Empty start + 2 text chunks
assert result[0] == VercelStreamResponse.convert_text("")
assert result[1] == VercelStreamResponse.convert_text("Hello")
assert result[2] == VercelStreamResponse.convert_text(" World")
assert len(result) == 2
assert result[0] == VercelStreamResponse.convert_text("Hello")
assert result[1] == VercelStreamResponse.convert_text(" World")
@pytest.mark.asyncio()
async def test_stream_content_with_stop_event_string(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
mock_workflow_handler.stream_events.return_value = (
self._mock_stop_event_string()
@@ -71,14 +80,16 @@ class TestEventStream:
]
# Assert
assert len(result) == 2 # Empty start + result string
assert result[0] == VercelStreamResponse.convert_text("")
assert result[1] == VercelStreamResponse.convert_text("Final answer")
assert len(result) == 1
assert result[0] == VercelStreamResponse.convert_text("Final answer")
@pytest.mark.asyncio()
async def test_stream_content_with_stop_event_delta_objects(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
mock_workflow_handler.stream_events.return_value = (
self._mock_stop_event_delta_objects()
@@ -93,15 +104,17 @@ class TestEventStream:
]
# Assert
assert len(result) == 3 # Empty start + 2 delta chunks
assert result[0] == VercelStreamResponse.convert_text("")
assert result[1] == VercelStreamResponse.convert_text("Delta 1")
assert result[2] == VercelStreamResponse.convert_text("Delta 2")
assert len(result) == 2
assert result[0] == VercelStreamResponse.convert_text("Delta 1")
assert result[1] == VercelStreamResponse.convert_text("Delta 2")
@pytest.mark.asyncio()
async def test_stream_content_with_event_with_to_response(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
mock_workflow_handler.stream_events.return_value = (
self._mock_event_with_to_response()
@@ -116,14 +129,16 @@ class TestEventStream:
]
# Assert
assert len(result) == 2 # Empty start + event with to_response
assert result[0] == VercelStreamResponse.convert_text("")
assert result[1] == VercelStreamResponse.convert_data({"event_type": "test"})
assert len(result) == 1
assert result[0] == VercelStreamResponse.convert_data({"event_type": "test"})
@pytest.mark.asyncio()
async def test_stream_content_with_event_with_model_dump(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
mock_workflow_handler.stream_events.return_value = (
self._mock_event_with_model_dump()
@@ -138,17 +153,19 @@ class TestEventStream:
]
# Assert
assert len(result) == 2 # Empty start + event with model_dump
assert result[0] == VercelStreamResponse.convert_text("")
assert result[1] == VercelStreamResponse.convert_data(None)
assert len(result) == 1
assert result[0] == VercelStreamResponse.convert_data(None) # type: ignore
@pytest.mark.asyncio()
async def test_stream_content_with_cancelled_error(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
mock_workflow_handler.stream_events.side_effect = asyncio.CancelledError()
logger.warning = MagicMock()
logger.warning = MagicMock() # type: ignore
# Execute
result = [
@@ -165,12 +182,15 @@ class TestEventStream:
@pytest.mark.asyncio()
async def test_stream_content_with_exception(
self, mock_workflow_handler, chat_request, logger
):
self,
mock_workflow_handler: AsyncMock,
chat_request: ChatRequest,
logger: logging.Logger,
) -> None:
# Setup
error_message = "Test error"
mock_workflow_handler.stream_events.side_effect = Exception(error_message)
logger.error = MagicMock()
logger.error = MagicMock() # type: ignore
# Execute
result = [
@@ -186,7 +206,7 @@ class TestEventStream:
mock_workflow_handler.cancel_run.assert_called_once()
logger.error.assert_called_once()
async def _mock_agent_stream_events(self):
async def _mock_agent_stream_events(self) -> AsyncGenerator[AgentStream, Any]:
yield AgentStream(
delta="Hello", response="", current_agent_name="", tool_calls=[], raw=""
)
@@ -194,7 +214,9 @@ class TestEventStream:
delta=" World", response="", current_agent_name="", tool_calls=[], raw=""
)
async def _mock_agent_stream_with_empty_deltas(self):
async def _mock_agent_stream_with_empty_deltas(
self,
) -> AsyncGenerator[AgentStream, Any]:
yield AgentStream(
delta=" ", # Empty delta with spaces - should be filtered
response="",
@@ -217,14 +239,14 @@ class TestEventStream:
raw="",
)
async def _mock_stop_event_string(self):
async def _mock_stop_event_string(self) -> AsyncGenerator[StopEvent, Any]:
yield StopEvent(result="Final answer")
async def _mock_stop_event_delta_objects(self):
async def generator():
async def _mock_stop_event_delta_objects(self) -> AsyncGenerator[StopEvent, Any]:
async def generator() -> AsyncGenerator[Any, Any]:
# Create proper objects with delta attribute that can be serialized
class ObjectWithDelta:
def __init__(self, delta_value) -> None:
def __init__(self, delta_value: str) -> None:
self.delta = delta_value
yield ObjectWithDelta("Delta 1")
@@ -232,15 +254,15 @@ class TestEventStream:
yield StopEvent(result=generator())
async def _mock_dict_event(self):
async def _mock_dict_event(self) -> AsyncGenerator[dict[Any, Any], Any]:
yield {"key": "value"}
async def _mock_event_with_to_response(self):
async def _mock_event_with_to_response(self) -> AsyncGenerator[Any, Any]:
event = MagicMock()
event.to_response.return_value = {"event_type": "test"}
yield event
async def _mock_event_with_model_dump(self):
async def _mock_event_with_model_dump(self) -> AsyncGenerator[Any, Any]:
event = MagicMock()
event.model_dump.return_value = {"name": "test_event"}
# Override to_response to return None - this means convert_data(None) will be called
+5100
View File
File diff suppressed because it is too large Load Diff