chore: replace Python examples with llama-deploy (#701)

This commit is contained in:
Thuc Pham
2025-07-11 10:50:54 +07:00
committed by GitHub
parent b618e91e99
commit 28b46be22a
49 changed files with 2546 additions and 772 deletions
+5
View File
@@ -0,0 +1,5 @@
---
"create-llama": patch
---
chore: replace Python examples with llama-deploy
-9
View File
@@ -63,15 +63,6 @@ jobs:
run: pnpm run pack-install
working-directory: packages/create-llama
- name: Build and store server package
run: |
pnpm run build
wheel_file=$(ls dist/*.whl | head -n 1)
mkdir -p "${{ runner.temp }}"
cp "$wheel_file" "${{ runner.temp }}/"
echo "SERVER_PACKAGE_PATH=${{ runner.temp }}/$(basename "$wheel_file")" >> $GITHUB_ENV
working-directory: python/llama-index-server
- name: Run Playwright tests for Python
run: pnpm run e2e:python
env:
@@ -3,11 +3,8 @@ import { exec } from "child_process";
import fs from "fs";
import path from "path";
import util from "util";
import {
ALL_USE_CASES,
TemplateFramework,
TemplateVectorDB,
} from "../../helpers/types";
import { TemplateFramework, TemplateUseCase, TemplateVectorDB } from "../../helpers";
import { ALL_PYTHON_USE_CASES } from "../../helpers/use-case";
import { RunCreateLlamaOptions, createTestDir, runCreateLlama } from "../utils";
const execAsync = util.promisify(exec);
@@ -17,11 +14,15 @@ const vectorDb: TemplateVectorDB = process.env.VECTORDB
? (process.env.VECTORDB as TemplateVectorDB)
: "none";
const useCases: TemplateUseCase[] = vectorDb === "llamacloud" ? [
"agentic_rag", "deep_research", "financial_report"
] : ALL_PYTHON_USE_CASES
test.describe("Mypy check", () => {
test.describe.configure({ retries: 0 });
test.describe("LlamaIndexServer", async () => {
for (const useCase of ALL_USE_CASES) {
for (const useCase of useCases) {
test(`should pass mypy for use case: ${useCase}`, async () => {
const cwd = await createTestDir();
await createAndCheckLlamaProject({
@@ -2,11 +2,11 @@ import { expect, test } from "@playwright/test";
import { ChildProcess } from "child_process";
import fs from "fs";
import path from "path";
import { type TemplateFramework, type TemplateVectorDB } from "../../helpers";
import {
ALL_USE_CASES,
type TemplateFramework,
type TemplateVectorDB,
} from "../../helpers";
ALL_PYTHON_USE_CASES,
ALL_TYPESCRIPT_USE_CASES,
} from "../../helpers/use-case";
import { createTestDir, runCreateLlama } from "../utils";
const templateFramework: TemplateFramework = process.env.FRAMEWORK
@@ -17,10 +17,15 @@ const vectorDb: TemplateVectorDB = process.env.VECTORDB
: "none";
const llamaCloudProjectName = "create-llama";
const llamaCloudIndexName = "e2e-test";
const allUseCases =
templateFramework === "nextjs"
? ALL_TYPESCRIPT_USE_CASES
: ALL_PYTHON_USE_CASES;
const isPythonLlamaDeploy = templateFramework === "fastapi";
const userMessage = "Write a blog post about physical standards for letters";
for (const useCase of ALL_USE_CASES) {
for (const useCase of allUseCases) {
test.describe(`Test use case ${useCase} ${templateFramework} ${vectorDb}`, async () => {
let port: number;
let cwd: string;
@@ -35,7 +40,7 @@ for (const useCase of ALL_USE_CASES) {
templateFramework,
vectorDb,
port,
postInstallAction: "runApp",
postInstallAction: isPythonLlamaDeploy ? "dependencies" : "runApp",
useCase,
llamaCloudProjectName,
llamaCloudIndexName,
@@ -50,6 +55,11 @@ for (const useCase of ALL_USE_CASES) {
});
test("Frontend should have a title", async ({ page }) => {
test.skip(
isPythonLlamaDeploy,
"Skip frontend tests for Python LllamaDeploy",
);
await page.goto(`http://localhost:${port}`);
await expect(page.getByText("Built by LlamaIndex")).toBeVisible({
timeout: 5 * 60 * 1000,
@@ -60,8 +70,10 @@ for (const useCase of ALL_USE_CASES) {
page,
}) => {
test.skip(
useCase === "financial_report" || useCase === "deep_research",
"Skip chat tests for financial report and deep research.",
useCase === "financial_report" ||
useCase === "deep_research" ||
isPythonLlamaDeploy,
"Skip chat tests for financial report and deep research. Also skip for Python LlamaDeploy",
);
await page.goto(`http://localhost:${port}`);
await page.fill("form textarea", userMessage);
@@ -4,11 +4,11 @@ import fs from "fs";
import path from "path";
import util from "util";
import {
ALL_USE_CASES,
TemplateFramework,
TemplateUseCase,
TemplateVectorDB,
} from "../../helpers/types";
import { ALL_TYPESCRIPT_USE_CASES } from "../../helpers/use-case";
import { createTestDir, runCreateLlama } from "../utils";
const execAsync = util.promisify(exec);
@@ -21,7 +21,7 @@ const vectorDb: TemplateVectorDB = process.env.VECTORDB
test.describe("Test resolve TS dependencies", () => {
test.describe.configure({ retries: 0 });
for (const useCase of ALL_USE_CASES) {
for (const useCase of ALL_TYPESCRIPT_USE_CASES) {
const optionDescription = `useCase: ${useCase}, vectorDb: ${vectorDb}`;
test.describe(`${optionDescription}`, () => {
test(`${optionDescription}`, async () => {
+65 -23
View File
@@ -1,20 +1,17 @@
import fs from "fs/promises";
import path from "path";
import {
EnvVar,
InstallTemplateArgs,
ModelConfig,
TemplateFramework,
TemplateType,
TemplateUseCase,
TemplateVectorDB,
} from "./types";
import { TSYSTEMS_LLMHUB_API_URL } from "./providers/llmhub";
export type EnvVar = {
name?: string;
description?: string;
value?: string;
};
import { USE_CASE_CONFIGS } from "./use-case";
const renderEnvVar = (envVars: EnvVar[]): string => {
return envVars.reduce(
@@ -228,7 +225,15 @@ Otherwise, use CHROMA_HOST and CHROMA_PORT config above`,
}
};
const getModelEnvs = (modelConfig: ModelConfig): EnvVar[] => {
const getModelEnvs = (
modelConfig: ModelConfig,
framework: TemplateFramework,
template: TemplateType,
useCase: TemplateUseCase,
): EnvVar[] => {
const isPythonLlamaDeploy =
framework === "fastapi" && template === "llamaindexserver";
return [
{
name: "MODEL",
@@ -240,10 +245,25 @@ const getModelEnvs = (modelConfig: ModelConfig): EnvVar[] => {
description: "Name of the embedding model to use.",
value: modelConfig.embeddingModel,
},
{
name: "CONVERSATION_STARTERS",
description: "The questions to help users get started (multi-line).",
},
...(isPythonLlamaDeploy
? [
{
name: "NEXT_PUBLIC_STARTER_QUESTIONS",
description:
"Initial questions to display in the chat (`starterQuestions`)",
value: JSON.stringify(
USE_CASE_CONFIGS[useCase]?.starterQuestions ?? [],
),
},
]
: [
{
name: "CONVERSATION_STARTERS",
description:
"The questions to help users get started (multi-line).",
},
]),
...(USE_CASE_CONFIGS[useCase]?.additionalEnvVars ?? []),
...(modelConfig.provider === "openai"
? [
{
@@ -251,14 +271,18 @@ const getModelEnvs = (modelConfig: ModelConfig): EnvVar[] => {
description: "The OpenAI API key to use.",
value: modelConfig.apiKey,
},
{
name: "LLM_TEMPERATURE",
description: "Temperature for sampling from the model.",
},
{
name: "LLM_MAX_TOKENS",
description: "Maximum number of tokens to generate.",
},
...(isPythonLlamaDeploy
? []
: [
{
name: "LLM_TEMPERATURE",
description: "Temperature for sampling from the model.",
},
{
name: "LLM_MAX_TOKENS",
description: "Maximum number of tokens to generate.",
},
]),
]
: []),
...(modelConfig.provider === "anthropic"
@@ -367,11 +391,12 @@ const getModelEnvs = (modelConfig: ModelConfig): EnvVar[] => {
const getFrameworkEnvs = (
framework: TemplateFramework,
template?: TemplateType,
port?: number,
): EnvVar[] => {
const sPort = port?.toString() || "8000";
const result: EnvVar[] = [];
if (framework === "fastapi") {
if (framework === "fastapi" && template !== "llamaindexserver") {
result.push(
...[
{
@@ -403,6 +428,7 @@ export const createBackendEnvFile = async (
| "template"
| "port"
| "useLlamaParse"
| "useCase"
>,
) => {
// Init env values
@@ -418,11 +444,27 @@ export const createBackendEnvFile = async (
]
: []),
...getVectorDBEnvs(opts.vectorDb, opts.framework, opts.template),
...getFrameworkEnvs(opts.framework, opts.port),
...getModelEnvs(opts.modelConfig),
...getFrameworkEnvs(opts.framework, opts.template, opts.port),
...getModelEnvs(
opts.modelConfig,
opts.framework,
opts.template,
opts.useCase,
),
];
// Render and write env file
const content = renderEnvVar(envVars);
await fs.writeFile(path.join(root, envFileName), content);
const isPythonLlamaDeploy =
opts.framework === "fastapi" && opts.template === "llamaindexserver";
// each llama-deploy service will need a .env inside its directory
// this .env will be copied along with workflow code when service is deployed
// so that we need to put the .env file inside src/ instead of root
const envPath = isPythonLlamaDeploy
? path.join(root, "src", envFileName)
: path.join(root, envFileName);
await fs.writeFile(envPath, content);
console.log(`Created '${envFileName}' file. Please check the settings.`);
};
+18 -12
View File
@@ -117,8 +117,13 @@ const downloadFile = async (url: string, destPath: string) => {
const prepareContextData = async (
root: string,
dataSources: TemplateDataSource[],
isPythonLlamaDeploy: boolean,
) => {
await makeDir(path.join(root, "data"));
const dataDir = isPythonLlamaDeploy
? path.join(root, "ui", "data")
: path.join(root, "data");
await makeDir(dataDir);
for (const dataSource of dataSources) {
const dataSourceConfig = dataSource?.config as FileSourceConfig;
// If the path is URLs, download the data and save it to the data directory
@@ -128,8 +133,7 @@ const prepareContextData = async (
dataSourceConfig.url.toString(),
);
const destPath = path.join(
root,
"data",
dataDir,
dataSourceConfig.filename ??
path.basename(dataSourceConfig.url.toString()),
);
@@ -137,11 +141,7 @@ const prepareContextData = async (
} else {
// Copy local data
console.log("Copying data from path:", dataSourceConfig.path);
const destPath = path.join(
root,
"data",
path.basename(dataSourceConfig.path),
);
const destPath = path.join(dataDir, path.basename(dataSourceConfig.path));
await fsExtra.copy(dataSourceConfig.path, destPath);
}
}
@@ -156,6 +156,9 @@ export const installTemplate = async (props: InstallTemplateArgs) => {
await installTSTemplate(props);
}
const isPythonLlamaDeploy =
props.framework === "fastapi" && props.template === "llamaindexserver";
// This is a backend, so we need to copy the test data and create the env file.
// Copy the environment file to the target directory.
@@ -164,6 +167,7 @@ export const installTemplate = async (props: InstallTemplateArgs) => {
await prepareContextData(
props.root,
props.dataSources.filter((ds) => ds.type === "file"),
isPythonLlamaDeploy,
);
if (
@@ -183,10 +187,12 @@ export const installTemplate = async (props: InstallTemplateArgs) => {
);
}
// Create outputs directory
await makeDir(path.join(props.root, "output/tools"));
await makeDir(path.join(props.root, "output/uploaded"));
await makeDir(path.join(props.root, "output/llamacloud"));
if (!isPythonLlamaDeploy) {
// Create outputs directory (llama-deploy doesn't need this)
await makeDir(path.join(props.root, "output/tools"));
await makeDir(path.join(props.root, "output/uploaded"));
await makeDir(path.join(props.root, "output/llamacloud"));
}
};
export * from "./types";
+51 -38
View File
@@ -7,27 +7,33 @@ import { isUvAvailable, tryUvSync } from "./uv";
import { assetRelocator, copy } from "./copy";
import { templatesDir } from "./dir";
import {
InstallTemplateArgs,
ModelConfig,
TemplateDataSource,
TemplateVectorDB,
} from "./types";
interface Dependency {
name: string;
version?: string;
extras?: string[];
constraints?: Record<string, string>;
}
import { Dependency, InstallTemplateArgs } from "./types";
import { USE_CASE_CONFIGS } from "./use-case";
const getAdditionalDependencies = (
modelConfig: ModelConfig,
vectorDb?: TemplateVectorDB,
dataSources?: TemplateDataSource[],
opts: Pick<
InstallTemplateArgs,
| "framework"
| "template"
| "useCase"
| "modelConfig"
| "vectorDb"
| "dataSources"
>,
) => {
const { framework, template, useCase, modelConfig, vectorDb, dataSources } =
opts;
const dependencies: Dependency[] = [];
const isPythonLlamaDeploy =
framework === "fastapi" && template === "llamaindexserver";
const useCaseDependencies =
USE_CASE_CONFIGS[useCase]?.additionalDependencies ?? [];
if (isPythonLlamaDeploy && useCaseDependencies.length > 0) {
dependencies.push(...useCaseDependencies);
}
// Add vector db dependencies
switch (vectorDb) {
case "mongo": {
@@ -412,13 +418,17 @@ const installLlamaIndexServerTemplate = async ({
process.exit(1);
}
await copy("*.py", path.join(root, "app"), {
const srcDir = path.join(root, "src");
const uiDir = path.join(root, "ui");
// copy workflow code to src folder
await copy("*.py", srcDir, {
parents: true,
cwd: path.join(templatesDir, "components", "use-cases", "python", useCase),
});
// copy model provider settings to app folder
await copy("**", path.join(root, "app"), {
// copy model provider settings to src folder
await copy("**", srcDir, {
cwd: path.join(
templatesDir,
"components",
@@ -428,32 +438,26 @@ const installLlamaIndexServerTemplate = async ({
),
});
// Copy custom UI component code
await copy(`*`, path.join(root, "components"), {
// copy ts server to ui folder
await copy("**", uiDir, {
parents: true,
cwd: path.join(templatesDir, "components", "ts-proxy"),
});
// Copy custom UI components to ui/components folder
await copy(`*`, path.join(uiDir, "components"), {
parents: true,
cwd: path.join(templatesDir, "components", "ui", "use-cases", useCase),
});
// Copy layout components to layout folder in root
await copy("*", path.join(root, "layout"), {
// Copy layout components to ui/layout folder
await copy("*", path.join(uiDir, "layout"), {
parents: true,
cwd: path.join(templatesDir, "components", "ui", "layout"),
});
if (useLlamaParse) {
await copy("index.py", path.join(root, "app"), {
parents: true,
cwd: path.join(
templatesDir,
"components",
"vectordbs",
"llamaindexserver",
"llamacloud",
"python",
),
});
// TODO: Consider moving generate.py to app folder.
await copy("generate.py", path.join(root), {
await copy("**", srcDir, {
parents: true,
cwd: path.join(
templatesDir,
@@ -471,6 +475,12 @@ const installLlamaIndexServerTemplate = async ({
cwd: path.join(templatesDir, "components", "use-cases", "python", useCase),
rename: assetRelocator,
});
// Clean up, remove generate.py and index.py for non-data use cases
if (["code_generator", "document_generator", "hitl"].includes(useCase)) {
await fs.unlink(path.join(srcDir, "generate.py"));
await fs.unlink(path.join(srcDir, "index.py"));
}
};
export const installPythonTemplate = async ({
@@ -517,11 +527,14 @@ export const installPythonTemplate = async ({
}
console.log("Adding additional dependencies");
const addOnDependencies = getAdditionalDependencies(
const addOnDependencies = getAdditionalDependencies({
framework,
template,
useCase,
modelConfig,
vectorDb,
dataSources,
);
});
await addDependencies(root, addOnDependencies);
+59 -1
View File
@@ -1,4 +1,5 @@
import { SpawnOptions, spawn } from "child_process";
import { SpawnOptions, exec, spawn } from "child_process";
import waitPort from "wait-port";
import { TemplateFramework, TemplateType } from "./types";
const createProcess = (
@@ -47,6 +48,58 @@ export function runTSApp(appPath: string, port: number) {
});
}
// TODO: support run multiple LlamaDeploy server in the same machine
async function runPythonLlamaDeployServer(
appPath: string,
port: number = 4501,
) {
console.log("Starting llama_deploy server...", port);
const serverProcess = exec("uv run -m llama_deploy.apiserver", {
cwd: appPath,
env: {
...process.env,
LLAMA_DEPLOY_APISERVER_PORT: `${port}`,
},
});
// Pipe output to console
serverProcess.stdout?.pipe(process.stdout);
serverProcess.stderr?.pipe(process.stderr);
// Wait for the server to be ready
console.log("Waiting for server to be ready...");
await waitPort({ port, host: "localhost", timeout: 30000 });
// create the deployment with explicit host configuration
console.log("llama_deploy server started, creating deployment...", port);
await createProcess(
"uv",
[
"run",
"llamactl",
"-s",
`http://localhost:${port}`,
"deploy",
"llama_deploy.yml",
],
{
stdio: "inherit",
cwd: appPath,
shell: true,
},
);
console.log(`Deployment created successfully!`);
// Keep the main process alive and handle cleanup
return new Promise(() => {
process.on("SIGINT", () => {
console.log("\nShutting down...");
serverProcess.kill();
process.exit(0);
});
});
}
export async function runApp(
appPath: string,
template: TemplateType,
@@ -57,6 +110,11 @@ export async function runApp(
// Start the app
const defaultPort = framework === "nextjs" ? 3000 : 8000;
if (template === "llamaindexserver" && framework === "fastapi") {
await runPythonLlamaDeployServer(appPath, port);
return;
}
const appRunner = framework === "fastapi" ? runFastAPIApp : runTSApp;
await appRunner(appPath, port || defaultPort, template);
} catch (error) {
+13 -8
View File
@@ -49,14 +49,6 @@ export type TemplateUseCase =
| "document_generator"
| "hitl";
export const ALL_USE_CASES: TemplateUseCase[] = [
"agentic_rag",
"deep_research",
"financial_report",
"code_generator",
"document_generator",
"hitl",
];
// Config for both file and folder
export type FileSourceConfig =
| {
@@ -97,3 +89,16 @@ export interface InstallTemplateArgs {
postInstallAction: TemplatePostInstallAction;
useCase: TemplateUseCase;
}
export type EnvVar = {
name?: string;
description?: string;
value?: string;
};
export interface Dependency {
name: string;
version?: string;
extras?: string[];
constraints?: Record<string, string>;
}
+84
View File
@@ -0,0 +1,84 @@
import { Dependency, EnvVar, TemplateUseCase } from "./types";
export const ALL_TYPESCRIPT_USE_CASES: TemplateUseCase[] = [
"agentic_rag",
"deep_research",
"financial_report",
"code_generator",
"document_generator",
"hitl",
];
export const ALL_PYTHON_USE_CASES: TemplateUseCase[] = [
"agentic_rag",
"deep_research",
"financial_report",
"code_generator",
"document_generator",
];
export const USE_CASE_CONFIGS: Record<
TemplateUseCase,
{
starterQuestions: string[];
additionalEnvVars?: EnvVar[];
additionalDependencies?: Dependency[];
}
> = {
agentic_rag: {
starterQuestions: [
"Letter standard in the document",
"Summarize the document",
],
},
financial_report: {
starterQuestions: [
"Compare Apple and Tesla financial performance",
"Generate a PDF report for Tesla financial",
],
additionalEnvVars: [
{
name: "E2B_API_KEY",
description: "The E2B API key to use to use code interpreter tool",
},
],
additionalDependencies: [
{
name: "e2b-code-interpreter",
version: ">=1.1.1,<2.0.0",
},
{
name: "markdown",
version: ">=3.7,<4.0",
},
{
name: "xhtml2pdf",
version: ">=0.2.17,<1.0.0",
},
],
},
deep_research: {
starterQuestions: [
"Research about Apple and Tesla",
"Financial performance of Tesla",
],
},
code_generator: {
starterQuestions: [
"Generate a code for a simple calculator",
"Generate a code for a todo list app",
],
},
document_generator: {
starterQuestions: [
"Generate a document about LlamaIndex",
"Generate a document about LLM",
],
},
hitl: {
starterQuestions: [
"List all the files in the current directory",
"Check git status",
],
},
};
+25 -12
View File
@@ -21,7 +21,7 @@ export const askQuestions = async (
askModels: askModelsFromArgs,
} = args;
const { useCase, framework } = await prompts(
const { useCase } = await prompts(
[
{
type: useCaseFromArgs ? null : "select",
@@ -65,20 +65,28 @@ export const askQuestions = async (
],
initial: 0,
},
{
type: frameworkFromArgs ? null : "select",
name: "framework",
message: "What language do you want to use?",
choices: [
{ title: "Python (FastAPI)", value: "fastapi" },
{ title: "Typescript (NextJS)", value: "nextjs" },
],
initial: 0,
},
],
questionHandlers,
);
const { framework } = await prompts(
{
type: frameworkFromArgs ? null : "select",
name: "framework",
message: "What language do you want to use?",
choices: [
// For Python Human in the Loop use case, please refer to this chat-ui example:
// https://github.com/run-llama/chat-ui/blob/main/examples/llamadeploy/chat/src/cli_workflow.py
...(useCase !== "hitl"
? [{ title: "Python (FastAPI)", value: "fastapi" }]
: []),
{ title: "Typescript (NextJS)", value: "nextjs" },
],
initial: 0,
},
questionHandlers,
);
const finalUseCase = (useCaseFromArgs ?? useCase) as TemplateUseCase;
const finalFramework = (frameworkFromArgs ?? framework) as TemplateFramework;
if (!finalUseCase) {
@@ -102,7 +110,12 @@ export const askQuestions = async (
// Ask for LlamaCloud
let llamaCloudKey = llamaCloudKeyFromArgs ?? process.env.LLAMA_CLOUD_API_KEY;
let vectorDb: TemplateVectorDB = vectorDbFromArgs ?? "none";
if (!vectorDbFromArgs && useCaseConfig.dataSources) {
if (
!vectorDbFromArgs &&
useCaseConfig.dataSources &&
!["code_generator", "document_generator", "hitl"].includes(finalUseCase) // these use cases don't use data so no need to ask for LlamaCloud
) {
const { useLlamaCloud } = await prompts(
{
type: "toggle",
@@ -0,0 +1,9 @@
import { LlamaIndexServer } from "@llamaindex/server";
new LlamaIndexServer({
uiConfig: {
componentsDir: "components",
layoutDir: "layout",
llamaDeploy: { deployment: "chat", workflow: "workflow" },
},
}).start();
@@ -0,0 +1,18 @@
{
"name": "llamaindex-server-ui",
"version": "0.0.1",
"private": true,
"scripts": {
"dev": "nodemon --exec tsx index.ts"
},
"dependencies": {
"@llamaindex/server": "0.2.10",
"dotenv": "^16.4.7"
},
"devDependencies": {
"@types/node": "^20.10.3",
"nodemon": "^3.1.10",
"tsx": "4.7.2",
"typescript": "^5.3.2"
}
}
@@ -21,7 +21,7 @@ export default function Header() {
</a>
<img
className="h-[24px] w-[24px] rounded-sm"
src="/llama.png"
src="https://ui.llamaindex.ai/llama.png"
alt="Llama Logo"
/>
</div>
@@ -1,59 +1,106 @@
This is a [LlamaIndex](https://www.llamaindex.ai/) simple agentic RAG project using [Agent Workflows](https://docs.llamaindex.ai/en/stable/examples/agent/agent_workflow_basic/).
# LlamaIndex Workflow Example
## Getting Started
This is a [LlamaIndex](https://www.llamaindex.ai/) project that using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/) deployed with [LlamaDeploy](https://github.com/run-llama/llama_deploy).
First, setup the environment with uv:
LlamaDeploy is a system for deploying and managing LlamaIndex workflows, while LlamaIndexServer provides a pre-built TypeScript server with an integrated chat UI that can connect directly to LlamaDeploy deployments. This example shows how you can quickly set up a complete chat application by combining these two technologies/
> **_Note:_** This step is not needed if you are using the dev-container.
## Prerequisites
```shell
If you haven't installed uv, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/) to install it.
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [src/settings.py](src/settings.py).
Please setup their API keys in the `src/.env` file.
## Installation
Both the SDK and the CLI are part of the LlamaDeploy Python package. To install, just run:
```bash
uv sync
```
Then check the parameters that have been pre-configured in the `.env` file in this directory.
Make sure you have set the `OPENAI_API_KEY` for the LLM.
If you don't have uv installed, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/).
Second, generate the embeddings of the documents in the `./data` directory:
## Generate Index
Generate the embeddings of the documents in the `./data` directory:
```shell
uv run generate
```
Third, run the development server:
## Running the Deployment
```shell
uv run fastapi dev
```
Then open [http://localhost:8000](http://localhost:8000) with your browser to start the chat UI.
To start the app optimized for **production**, run:
At this point we have all we need to run this deployment. Ideally, we would have the API server already running
somewhere in the cloud, but to get started let's start an instance locally. Run the following python script
from a shell:
```
uv run fastapi run
$ uv run -m llama_deploy.apiserver
INFO: Started server process [10842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:4501 (Press CTRL+C to quit)
```
## Configure LLM and Embedding Model
From another shell, use the CLI, `llamactl`, to create the deployment:
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [settings.py](app/settings.py).
```
$ uv run llamactl deploy llama_deploy.yml
Deployment successful: chat
```
## UI Interface
LlamaDeploy will serve the UI through the apiserver. Point the browser to [http://localhost:4501/deployments/chat/ui](http://localhost:4501/deployments/chat/ui) to interact with your deployment through a user-friendly interface.
## API endpoints
You can find all the endpoints in the [API documentation](http://localhost:4501/docs). To get started, you can try the following endpoints:
Create a new task:
```bash
curl -X POST 'http://localhost:4501/deployments/chat/tasks/create' \
-H 'Content-Type: application/json' \
-d '{
"input": "{\"user_msg\":\"Hello\",\"chat_history\":[]}",
"service_id": "workflow"
}'
```
Stream events:
```bash
curl 'http://localhost:4501/deployments/chat/tasks/0b411be6-005d-43f0-9b6b-6a0017f08002/events?session_id=dd36442c-45ca-4eaa-8d75-b4e6dad1a83e&raw_event=true' \
-H 'Content-Type: application/json'
```
Note that the task_id and session_id are returned when creating a new task.
## Use Case
We have prepared an [example workflow](./app/workflow.py) for the agentic RAG use case, where you can ask questions about the example documents in the [./data](./data) directory.
We have prepared an [example workflow](./src/workflow.py) for the agentic RAG use case, where you can ask questions about the example documents in the [./data](./data) directory.
To update the workflow, you can modify the code in [`src/workflow.py`](src/workflow.py).
You can start by sending an request on the [chat UI](http://localhost:8000) or you can test the `/api/chat` endpoint with the following curl request:
## Customize the UI
```
curl --location 'localhost:8000/api/chat' \
--header 'Content-Type: application/json' \
--data '{ "messages": [{ "role": "user", "content": "What standards for a letter exist?" }] }'
```
The UI is served by LLamaIndexServer package, you can configure the UI by modifying the `uiConfig` in the [ui/index.ts](ui/index.ts) file.
The following are the available options:
- `starterQuestions`: Predefined questions for chat interface
- `componentsDir`: Directory for custom event components
- `layoutDir`: Directory for custom layout components
- `llamaCloudIndexSelector`: Enable LlamaCloud integration
- `llamaDeploy`: The LlamaDeploy configration (deployment name and workflow name that defined in the [llama_deploy.yml](llama_deploy.yml) file)
## Learn More
To learn more about LlamaIndex, take a look at the following resources:
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows.
- [LlamaDeploy GitHub Repository](https://github.com/run-llama/llama_deploy)
- [Chat-UI Documentation](https://ts.llamaindex.ai/docs/chat-ui)
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
@@ -0,0 +1,106 @@
from typing import Any, List, Optional
from llama_index.core import QueryBundle
from llama_index.core.postprocessor.types import BaseNodePostprocessor
from llama_index.core.prompts import PromptTemplate
from llama_index.core.query_engine.retriever_query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import Accumulate
from llama_index.core.schema import NodeWithScore
from llama_index.core.tools.query_engine import QueryEngineTool
# Used as a prompt for synthesizer
# Override this prompt by setting the `CITATION_PROMPT` environment variable
CITATION_PROMPT = """
Context information is below.
------------------
{context_str}
------------------
The context are multiple text chunks, each text chunk has its own citation_id at the beginning.
Use the citation_id for citation construction.
Answer the following query with citations:
------------------
{query_str}
------------------
## Citation format
[citation:id]
Where:
- [citation:] is a matching pattern which is required for all citations.
- `id` is the `citation_id` provided in the context or previous response.
Example:
```
Here is a response that uses context information [citation:90ca859f-4f32-40ca-8cd0-edfad4fb298b]
and other ideas that don't use context information [citation:17b2cc9a-27ae-4b6d-bede-5ca60fc00ff4] .\n
The citation block will be displayed automatically with useful information for the user in the UI [citation:1c606612-e75f-490e-8374-44e79f818d19] .
```
## Requirements:
1. Always include citations for every fact from the context information in your response.
2. Make sure that the citation_id is correct with the context, don't mix up the citation_id with other information.
Now, you answer the query with citations:
"""
class NodeCitationProcessor(BaseNodePostprocessor):
"""
Add a new field `citation_id` to the metadata of the node by copying the id from the node.
Useful for citation construction.
"""
def _postprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
) -> List[NodeWithScore]:
for node_score in nodes:
node_score.node.metadata["citation_id"] = node_score.node.node_id
return nodes
class CitationSynthesizer(Accumulate):
"""
Overload the Accumulate synthesizer to:
1. Update prepare node metadata for citation id
2. Update text_qa_template to include citations
"""
def __init__(self, **kwargs: Any) -> None:
text_qa_template = kwargs.pop("text_qa_template", None)
if text_qa_template is None:
text_qa_template = PromptTemplate(template=CITATION_PROMPT)
super().__init__(text_qa_template=text_qa_template, **kwargs)
# Add this prompt to your agent system prompt
CITATION_SYSTEM_PROMPT = (
"\nAnswer the user question using the response from the query tool. "
"It's important to respect the citation information in the response. "
"Don't mix up the citation_id, keep them at the correct fact."
)
def enable_citation(query_engine_tool: QueryEngineTool) -> QueryEngineTool:
"""
Enable citation for a query engine tool by using CitationSynthesizer and NodePostprocessor.
Note: This function will override the response synthesizer of your query engine.
"""
query_engine = query_engine_tool.query_engine
if not isinstance(query_engine, RetrieverQueryEngine):
raise ValueError(
"Citation feature requires a RetrieverQueryEngine. Your tool's query engine is a "
f"{type(query_engine)}."
)
# Update the response synthesizer and node postprocessors
query_engine._response_synthesizer = CitationSynthesizer()
query_engine._node_postprocessors += [NodeCitationProcessor()]
query_engine_tool._query_engine = query_engine
# Update tool metadata
query_engine_tool.metadata.description += "\nThe output will include citations with the format [citation:id] for each chunk of information in the knowledge base."
return query_engine_tool
@@ -0,0 +1,47 @@
import os
from typing import Any, Optional
from llama_index.core.base.base_query_engine import BaseQueryEngine
from llama_index.core.indices.base import BaseIndex
from llama_index.core.tools.query_engine import QueryEngineTool
def create_query_engine(index: BaseIndex, **kwargs: Any) -> BaseQueryEngine:
"""
Create a query engine for the given index.
Args:
index: The index to create a query engine for.
params (optional): Additional parameters for the query engine, e.g: similarity_top_k
"""
top_k = int(os.getenv("TOP_K", 0))
if top_k != 0 and kwargs.get("filters") is None:
kwargs["similarity_top_k"] = top_k
return index.as_query_engine(**kwargs)
def get_query_engine_tool(
index: BaseIndex,
name: Optional[str] = None,
description: Optional[str] = None,
**kwargs: Any,
) -> QueryEngineTool:
"""
Get a query engine tool for the given index.
Args:
index: The index to create a query engine for.
name (optional): The name of the tool.
description (optional): The description of the tool.
"""
if name is None:
name = "query_index"
if description is None:
description = "Use this tool to retrieve information from a knowledge base. Provide a specific query and can call the tool multiple times if necessary."
query_engine = create_query_engine(index, **kwargs)
tool = QueryEngineTool.from_defaults(
query_engine=query_engine,
name=name,
description=description,
)
return tool
@@ -1,18 +1,18 @@
from typing import Optional
from dotenv import load_dotenv
from app.index import get_index
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.settings import Settings
from llama_index.server.api.models import ChatRequest
from llama_index.server.tools.index import get_query_engine_tool
from llama_index.server.tools.index.citation import (
CITATION_SYSTEM_PROMPT,
enable_citation,
)
from src.index import get_index
from src.query import get_query_engine_tool
from src.citation import CITATION_SYSTEM_PROMPT, enable_citation
from src.settings import init_settings
def create_workflow(chat_request: Optional[ChatRequest] = None) -> AgentWorkflow:
index = get_index(chat_request=chat_request)
def create_workflow() -> AgentWorkflow:
load_dotenv()
init_settings()
index = get_index()
if index is None:
raise RuntimeError(
"Index not found! Please run `uv run generate` to index the data first."
@@ -30,3 +30,6 @@ def create_workflow(chat_request: Optional[ChatRequest] = None) -> AgentWorkflow
llm=Settings.llm,
system_prompt=system_prompt,
)
workflow = create_workflow()
@@ -1,65 +1,99 @@
This is a [LlamaIndex](https://www.llamaindex.ai/) project using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/).
# LlamaIndex Workflow Example
## Getting Started
This is a [LlamaIndex](https://www.llamaindex.ai/) project that using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/) deployed with [LlamaDeploy](https://github.com/run-llama/llama_deploy).
First, setup the environment with uv:
LlamaDeploy is a system for deploying and managing LlamaIndex workflows, while LlamaIndexServer provides a pre-built TypeScript server with an integrated chat UI that can connect directly to LlamaDeploy deployments. This example shows how you can quickly set up a complete chat application by combining these two technologies/
> **_Note:_** This step is not needed if you are using the dev-container.
## Prerequisites
```shell
If you haven't installed uv, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/) to install it.
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [src/settings.py](src/settings.py).
Please setup their API keys in the `src/.env` file.
## Installation
Both the SDK and the CLI are part of the LlamaDeploy Python package. To install, just run:
```bash
uv sync
```
Then check the parameters that have been pre-configured in the `.env` file in this directory.
Make sure you have set the `OPENAI_API_KEY` for the LLM.
If you don't have uv installed, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/).
Then, run the development server:
## Running the Deployment
```shell
uv run fastapi dev
```
Then open [http://localhost:8000](http://localhost:8000) with your browser to start the chat UI.
To start the app optimized for **production**, run:
At this point we have all we need to run this deployment. Ideally, we would have the API server already running
somewhere in the cloud, but to get started let's start an instance locally. Run the following python script
from a shell:
```
uv run fastapi run
$ uv run -m llama_deploy.apiserver
INFO: Started server process [10842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:4501 (Press CTRL+C to quit)
```
## Configure LLM and Embedding Model
From another shell, use the CLI, `llamactl`, to create the deployment:
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [settings.py](app/settings.py).
```
$ uv run llamactl deploy llama_deploy.yml
Deployment successful: chat
```
## UI Interface
LlamaDeploy will serve the UI through the apiserver. Point the browser to [http://localhost:4501/deployments/chat/ui](http://localhost:4501/deployments/chat/ui) to interact with your deployment through a user-friendly interface.
## API endpoints
You can find all the endpoints in the [API documentation](http://localhost:4501/docs). To get started, you can try the following endpoints:
Create a new task:
```bash
curl -X POST 'http://localhost:4501/deployments/chat/tasks/create' \
-H 'Content-Type: application/json' \
-d '{
"input": "{\"user_msg\":\"Hello\",\"chat_history\":[]}",
"service_id": "workflow"
}'
```
Stream events:
```bash
curl 'http://localhost:4501/deployments/chat/tasks/0b411be6-005d-43f0-9b6b-6a0017f08002/events?session_id=dd36442c-45ca-4eaa-8d75-b4e6dad1a83e&raw_event=true' \
-H 'Content-Type: application/json'
```
Note that the task_id and session_id are returned when creating a new task.
## Use Case
AI-powered code generator that can help you generate app with a chat interface, code editor and app preview.
To update the workflow, you can modify the code in [`workflow.py`](app/workflow.py).
You can start by sending an request on the [chat UI](http://localhost:8000) or you can test the `/api/chat` endpoint with the following curl request:
```
curl --location 'localhost:8000/api/chat' \
--header 'Content-Type: application/json' \
--data '{ "messages": [{ "role": "user", "content": "Create a report comparing the finances of Apple and Tesla" }] }'
```
To update the workflow, you can modify the code in [`src/workflow.py`](src/workflow.py).
## Customize the UI
To customize the UI, you can start by modifying the [./components/ui_event.jsx](./components/ui_event.jsx) file.
The UI is served by LLamaIndexServer package, you can configure the UI by modifying the `uiConfig` in the [ui/index.ts](ui/index.ts) file.
You can also generate a new code for the workflow using LLM by running the following command:
The following are the available options:
```
uv run generate_ui
```
- `starterQuestions`: Predefined questions for chat interface
- `componentsDir`: Directory for custom event components
- `layoutDir`: Directory for custom layout components
- `llamaCloudIndexSelector`: Enable LlamaCloud integration
- `llamaDeploy`: The LlamaDeploy configration (deployment name and workflow name that defined in the [llama_deploy.yml](llama_deploy.yml) file)
## Learn More
To learn more about LlamaIndex, take a look at the following resources:
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows.
- [LlamaIndex Server](https://pypi.org/project/llama-index-server/)
- [LlamaDeploy GitHub Repository](https://github.com/run-llama/llama_deploy)
- [Chat-UI Documentation](https://ts.llamaindex.ai/docs/chat-ui)
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
@@ -0,0 +1,131 @@
import json
import re
from typing import List, Optional, Any
from pydantic import ValidationError
from llama_index.core.chat_ui.models.artifact import (
Artifact,
ArtifactType,
CodeArtifactData,
DocumentArtifactData,
)
from llama_index.core.llms import ChatMessage
INLINE_ANNOTATION_KEY = "annotation"
def get_inline_annotations(message: ChatMessage) -> List[Any]:
"""Extract inline annotations from a chat message."""
markdown_content = message.content
inline_annotations: List[Any] = []
# Regex to match annotation code blocks
# Matches ```annotation followed by content until closing ```
annotation_regex = re.compile(
rf"```{re.escape(INLINE_ANNOTATION_KEY)}\s*\n([\s\S]*?)\n```", re.MULTILINE
)
for match in annotation_regex.finditer(markdown_content):
json_content = match.group(1).strip() if match.group(1) else None
if not json_content:
continue
try:
# Parse the JSON content
parsed = json.loads(json_content)
# Check for required fields in the parsed annotation
if (
not isinstance(parsed, dict)
or "type" not in parsed
or "data" not in parsed
):
continue
# Extract the annotation data
inline_annotations.append(parsed)
except (json.JSONDecodeError, ValidationError) as error:
# Skip invalid annotations - they might be malformed JSON or invalid schema
print(f"Failed to parse annotation: {error}")
return inline_annotations
def artifact_from_message(message: ChatMessage) -> Optional[Artifact]:
"""Create an artifact from a chat message if it contains artifact annotations."""
inline_annotations = get_inline_annotations(message)
for annotation in inline_annotations:
if isinstance(annotation, dict) and annotation.get("type") == "artifact":
try:
# Create artifact data based on type
artifact_data = annotation.get("data")
if not artifact_data:
continue
artifact_type = artifact_data.get("type")
if artifact_type == "code":
# Get the nested data object that contains the actual code information
code_info = artifact_data.get("data", {})
code_data = CodeArtifactData(
file_name=code_info.get("file_name", ""),
code=code_info.get("code", ""),
language=code_info.get("language", ""),
)
artifact = Artifact(
created_at=artifact_data.get("created_at"),
type=ArtifactType.CODE,
data=code_data,
)
elif artifact_type == "document":
# Get the nested data object that contains the actual document information
doc_info = artifact_data.get("data", {})
doc_data = DocumentArtifactData(
title=doc_info.get("title", ""),
content=doc_info.get("content", ""),
type=doc_info.get("type", "markdown"),
sources=doc_info.get("sources"),
)
artifact = Artifact(
created_at=artifact_data.get("created_at"),
type=ArtifactType.DOCUMENT,
data=doc_data,
)
else:
continue
return artifact
except Exception as e:
print(
f"Failed to parse artifact from annotation: {annotation}. Error: {e}"
)
return None
def get_artifacts(chat_history: List[ChatMessage]) -> List[Artifact]:
"""
Return a list of artifacts sorted by their creation time.
Artifacts without a creation time are placed at the end.
"""
artifacts = []
for message in chat_history:
artifact = artifact_from_message(message)
if artifact is not None:
artifacts.append(artifact)
# Sort by creation time, with None values at the end
return sorted(
artifacts,
key=lambda a: (a.created_at is None, a.created_at),
)
def get_last_artifact(chat_history: List[ChatMessage]) -> Optional[Artifact]:
"""Get the last artifact from chat history."""
artifacts = get_artifacts(chat_history)
return artifacts[-1] if len(artifacts) > 0 else None
@@ -2,11 +2,10 @@ import re
import time
from typing import Any, Literal, Optional, Union
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.llms import LLM
from llama_index.core import Settings
from llama_index.core.llms import LLM, ChatMessage
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.openai import OpenAI
from llama_index.core.workflow import (
Context,
Event,
@@ -15,25 +14,25 @@ from llama_index.core.workflow import (
Workflow,
step,
)
from llama_index.server.api.models import (
from llama_index.core.chat_ui.models.artifact import (
Artifact,
ArtifactEvent,
ArtifactType,
ChatRequest,
CodeArtifactData,
UIEvent,
)
from llama_index.server.api.utils import get_last_artifact
from llama_index.core.chat_ui.events import (
UIEvent,
ArtifactEvent,
)
from src.utils import get_last_artifact
from src.settings import init_settings
from pydantic import BaseModel, Field
from dotenv import load_dotenv
def create_workflow(chat_request: ChatRequest) -> Workflow:
workflow = CodeArtifactWorkflow(
llm=OpenAI(model="gpt-4.1"),
chat_request=chat_request,
timeout=120.0,
)
return workflow
def create_workflow() -> Workflow:
load_dotenv()
init_settings()
return CodeArtifactWorkflow(timeout=120.0)
class Requirement(BaseModel):
@@ -83,8 +82,6 @@ class CodeArtifactWorkflow(Workflow):
def __init__(
self,
llm: LLM,
chat_request: ChatRequest,
**kwargs: Any,
):
"""
@@ -93,9 +90,8 @@ class CodeArtifactWorkflow(Workflow):
chat_request: The chat request from the chat app to use.
"""
super().__init__(**kwargs)
self.llm = llm
self.chat_request = chat_request
self.last_artifact = get_last_artifact(chat_request)
self.llm: LLM = Settings.llm
self.last_artifact: Optional[Artifact] = None
@step
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> PlanEvent:
@@ -103,13 +99,21 @@ class CodeArtifactWorkflow(Workflow):
if user_msg is None:
raise ValueError("user_msg is required to run the workflow")
await ctx.set("user_msg", user_msg)
chat_history = ev.chat_history or []
chat_history.append(
# prepare chat history from StartEvent
messages = [
ChatMessage(
role="user",
content=user_msg,
role=msg.get("role", "user"),
content=msg.get("content", ""),
)
)
for msg in ev.get("chat_history", [])
]
chat_history = [*messages, ChatMessage(role="user", content=user_msg)]
# extract inline artifact from chat history
last_artifact = get_last_artifact(messages)
self.last_artifact = last_artifact
memory = ChatMemoryBuffer.from_defaults(
chat_history=chat_history,
llm=self.llm,
@@ -373,3 +377,6 @@ class CodeArtifactWorkflow(Workflow):
)
)
return StopEvent(result=response_stream)
workflow = create_workflow()
@@ -1,69 +1,106 @@
This is a [LlamaIndex](https://www.llamaindex.ai/) multi-agents project using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/).
# LlamaIndex Workflow Example
## Getting Started
This is a [LlamaIndex](https://www.llamaindex.ai/) project that using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/) deployed with [LlamaDeploy](https://github.com/run-llama/llama_deploy).
First, setup the environment with uv:
LlamaDeploy is a system for deploying and managing LlamaIndex workflows, while LlamaIndexServer provides a pre-built TypeScript server with an integrated chat UI that can connect directly to LlamaDeploy deployments. This example shows how you can quickly set up a complete chat application by combining these two technologies/
> **_Note:_** This step is not needed if you are using the dev-container.
## Prerequisites
```shell
If you haven't installed uv, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/) to install it.
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [src/settings.py](src/settings.py).
Please setup their API keys in the `src/.env` file.
## Installation
Both the SDK and the CLI are part of the LlamaDeploy Python package. To install, just run:
```bash
uv sync
```
Then check the parameters that have been pre-configured in the `.env` file in this directory.
Make sure you have set the `OPENAI_API_KEY` for the LLM.
If you don't have uv installed, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/).
Second, generate the embeddings of the documents in the `./data` directory:
## Generate Index
Generate the embeddings of the documents in the `./data` directory:
```shell
uv run generate
```
Third, run the development server:
## Running the Deployment
```shell
uv run fastapi dev
```
Then open [http://localhost:8000](http://localhost:8000) with your browser to start the chat UI.
To start the app optimized for **production**, run:
At this point we have all we need to run this deployment. Ideally, we would have the API server already running
somewhere in the cloud, but to get started let's start an instance locally. Run the following python script
from a shell:
```
uv run fastapi run
$ uv run -m llama_deploy.apiserver
INFO: Started server process [10842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:4501 (Press CTRL+C to quit)
```
## Configure LLM and Embedding Model
From another shell, use the CLI, `llamactl`, to create the deployment:
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [settings.py](app/settings.py).
```
$ uv run llamactl deploy llama_deploy.yml
Deployment successful: chat
```
## UI Interface
LlamaDeploy will serve the UI through the apiserver. Point the browser to [http://localhost:4501/deployments/chat/ui](http://localhost:4501/deployments/chat/ui) to interact with your deployment through a user-friendly interface.
## API endpoints
You can find all the endpoints in the [API documentation](http://localhost:4501/docs). To get started, you can try the following endpoints:
Create a new task:
```bash
curl -X POST 'http://localhost:4501/deployments/chat/tasks/create' \
-H 'Content-Type: application/json' \
-d '{
"input": "{\"user_msg\":\"Hello\",\"chat_history\":[]}",
"service_id": "workflow"
}'
```
Stream events:
```bash
curl 'http://localhost:4501/deployments/chat/tasks/0b411be6-005d-43f0-9b6b-6a0017f08002/events?session_id=dd36442c-45ca-4eaa-8d75-b4e6dad1a83e&raw_event=true' \
-H 'Content-Type: application/json'
```
Note that the task_id and session_id are returned when creating a new task.
## Use Case
We have prepared an [example workflow](./app/workflow.py) for the deep research use case, where you can ask questions about the example documents in the [./data](./data) directory.
You can start by sending an request on the [chat UI](http://localhost:8000) or you can test the `/api/chat` endpoint with the following curl request:
```
curl --location 'localhost:8000/api/chat' \
--header 'Content-Type: application/json' \
--data '{ "messages": [{ "role": "user", "content": "Create a report comparing the finances of Apple and Tesla" }] }'
```
To update the workflow, you can modify the code in [`src/workflow.py`](src/workflow.py).
## Customize the UI
To customize the UI, you can start by modifying the [./components/ui_event.jsx](./components/ui_event.jsx) file.
The UI is served by LLamaIndexServer package, you can configure the UI by modifying the `uiConfig` in the [ui/index.ts](ui/index.ts) file.
You can also generate a new code for the workflow using LLM by running the following command:
The following are the available options:
```
uv run generate_ui
```
- `starterQuestions`: Predefined questions for chat interface
- `componentsDir`: Directory for custom event components
- `layoutDir`: Directory for custom layout components
- `llamaCloudIndexSelector`: Enable LlamaCloud integration
- `llamaDeploy`: The LlamaDeploy configration (deployment name and workflow name that defined in the [llama_deploy.yml](llama_deploy.yml) file)
## Learn More
To learn more about LlamaIndex, take a look at the following resources:
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows.
- [LlamaDeploy GitHub Repository](https://github.com/run-llama/llama_deploy)
- [Chat-UI Documentation](https://ts.llamaindex.ai/docs/chat-ui)
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
@@ -0,0 +1,46 @@
from typing import AsyncGenerator, Union
from llama_index.core.base.llms.types import (
CompletionResponse,
CompletionResponseAsyncGen,
ChatResponse,
)
from llama_index.core.workflow import Context
from llama_index.core.agent.workflow.workflow_events import AgentStream
async def write_response_to_stream(
res: Union[CompletionResponse, CompletionResponseAsyncGen, AsyncGenerator[ChatResponse, None]],
ctx: Context,
current_agent_name: str = "assistant",
) -> str:
"""
Handle both streaming and non-streaming LLM responses.
Args:
res: The LLM response (either streaming or non-streaming)
ctx: The workflow context for writing events to stream
current_agent_name: The name of the current agent (default: "assistant")
Returns:
The final response text as a string
"""
final_response = ""
if isinstance(res, AsyncGenerator):
# Handle streaming response (CompletionResponseAsyncGen or ChatResponse AsyncGenerator)
async for chunk in res:
ctx.write_event_to_stream(
AgentStream(
delta=chunk.delta or "",
response=final_response,
current_agent_name=current_agent_name,
tool_calls=[],
raw=getattr(chunk, 'raw', None) or "",
)
)
final_response += chunk.delta or ""
else:
# Handle non-streaming response (CompletionResponse)
final_response = res.text
return final_response
@@ -1,9 +1,11 @@
import logging
import os
import uuid
import time
from typing import List, Literal, Optional
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from app.index import get_index
from llama_index.core.base.llms.types import (
CompletionResponse,
CompletionResponseAsyncGen,
@@ -23,26 +25,31 @@ from llama_index.core.workflow import (
Workflow,
step,
)
from llama_index.server.api.models import (
ArtifactEvent,
ArtifactType,
ChatRequest,
SourceNodesEvent,
UIEvent,
from llama_index.core.chat_ui.models.artifact import (
Artifact,
ArtifactType,
DocumentArtifactData,
DocumentArtifactSource,
)
import time
from llama_index.server.utils.stream import write_response_to_stream
from pydantic import BaseModel, Field
from llama_index.core.chat_ui.events import (
UIEvent,
ArtifactEvent,
SourceNodesEvent,
)
from src.index import get_index
from src.settings import init_settings
from src.utils import write_response_to_stream
logger = logging.getLogger("uvicorn")
logger.setLevel(logging.INFO)
def create_workflow(chat_request: Optional[ChatRequest] = None) -> Workflow:
index = get_index(chat_request=chat_request)
def create_workflow() -> Workflow:
load_dotenv()
init_settings()
# TODO: load index in StartEvent
index = get_index()
if index is None:
raise ValueError(
"Index is not found. Try run generation script to create the index first."
@@ -140,21 +147,23 @@ class DeepResearchWorkflow(Workflow):
"""
self.stream = ev.get("stream", True)
self.user_request = ev.get("user_msg")
chat_history = ev.get("chat_history")
if chat_history is not None:
self.memory.put_messages(chat_history)
messages = [
ChatMessage(
role=msg.get("role", "user"),
content=msg.get("content", ""),
)
for msg in ev.get("chat_history", [])
]
user_message = ChatMessage(role="user", content=self.user_request)
chat_history = [*messages, user_message]
self.memory.put_messages(chat_history)
await ctx.set("total_questions", 0)
# Add user message to memory
self.memory.put_messages(
messages=[
ChatMessage(
role=MessageRole.USER,
content=self.user_request,
)
]
)
self.memory.put_messages(messages=[user_message])
ctx.write_event_to_stream(
UIEvent(
type="ui_event",
@@ -574,3 +583,6 @@ def _get_text_node_content_for_citation(node: NodeWithScore) -> str:
node_id = node.node.node_id
content = f"<Citation id='{node_id}'>\n{node.get_content(metadata_mode=MetadataMode.LLM)}</Citation id='{node_id}'>"
return content
workflow = create_workflow()
@@ -1,66 +1,100 @@
This is a [LlamaIndex](https://www.llamaindex.ai/) project using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/).
# LlamaIndex Workflow Example
## Getting Started
This is a [LlamaIndex](https://www.llamaindex.ai/) project that using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/) deployed with [LlamaDeploy](https://github.com/run-llama/llama_deploy).
First, setup the environment with uv:
LlamaDeploy is a system for deploying and managing LlamaIndex workflows, while LlamaIndexServer provides a pre-built TypeScript server with an integrated chat UI that can connect directly to LlamaDeploy deployments. This example shows how you can quickly set up a complete chat application by combining these two technologies/
> **_Note:_** This step is not needed if you are using the dev-container.
## Prerequisites
```shell
If you haven't installed uv, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/) to install it.
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [src/settings.py](src/settings.py).
Please setup their API keys in the `src/.env` file.
## Installation
Both the SDK and the CLI are part of the LlamaDeploy Python package. To install, just run:
```bash
uv sync
```
Then check the parameters that have been pre-configured in the `.env` file in this directory.
Make sure you have set the `OPENAI_API_KEY` for the LLM.
If you don't have uv installed, you can follow the instructions [here](https://docs.astral.sh/uv/getting-started/installation/).
Then, run the development server:
## Running the Deployment
```shell
uv run fastapi dev
```
Then open [http://localhost:8000](http://localhost:8000) with your browser to start the chat UI.
To start the app optimized for **production**, run:
At this point we have all we need to run this deployment. Ideally, we would have the API server already running
somewhere in the cloud, but to get started let's start an instance locally. Run the following python script
from a shell:
```
uv run fastapi run
$ uv run -m llama_deploy.apiserver
INFO: Started server process [10842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:4501 (Press CTRL+C to quit)
```
## Configure LLM and Embedding Model
From another shell, use the CLI, `llamactl`, to create the deployment:
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [settings.py](app/settings.py).
```
$ uv run llamactl deploy llama_deploy.yml
Deployment successful: chat
```
## UI Interface
LlamaDeploy will serve the UI through the apiserver. Point the browser to [http://localhost:4501/deployments/chat/ui](http://localhost:4501/deployments/chat/ui) to interact with your deployment through a user-friendly interface.
## API endpoints
You can find all the endpoints in the [API documentation](http://localhost:4501/docs). To get started, you can try the following endpoints:
Create a new task:
```bash
curl -X POST 'http://localhost:4501/deployments/chat/tasks/create' \
-H 'Content-Type: application/json' \
-d '{
"input": "{\"user_msg\":\"Hello\",\"chat_history\":[]}",
"service_id": "workflow"
}'
```
Stream events:
```bash
curl 'http://localhost:4501/deployments/chat/tasks/0b411be6-005d-43f0-9b6b-6a0017f08002/events?session_id=dd36442c-45ca-4eaa-8d75-b4e6dad1a83e&raw_event=true' \
-H 'Content-Type: application/json'
```
Note that the task_id and session_id are returned when creating a new task.
## Use Case
AI-powered document generator that can help you generate documents with a chat interface and simple markdown editor.
To update the workflow, you can modify the code in [`workflow.py`](app/workflow.py).
You can start by sending an request on the [chat UI](http://localhost:8000) or you can test the `/api/chat` endpoint with the following curl request:
```
curl --location 'localhost:8000/api/chat' \
--header 'Content-Type: application/json' \
--data '{ "messages": [{ "role": "user", "content": "Create a report comparing the finances of Apple and Tesla" }] }'
```
To update the workflow, you can modify the code in [`src/workflow.py`](src/workflow.py).
## Customize the UI
To customize the UI, you can start by modifying the [./components/ui_event.jsx](./components/ui_event.jsx) file.
The UI is served by LLamaIndexServer package, you can configure the UI by modifying the `uiConfig` in the [ui/index.ts](ui/index.ts) file.
You can also generate a new code for the workflow using LLM by running the following command:
The following are the available options:
```
uv run generate_ui
```
- `starterQuestions`: Predefined questions for chat interface
- `componentsDir`: Directory for custom event components
- `layoutDir`: Directory for custom layout components
- `llamaCloudIndexSelector`: Enable LlamaCloud integration
- `llamaDeploy`: The LlamaDeploy configration (deployment name and workflow name that defined in the [llama_deploy.yml](llama_deploy.yml) file)
To customize the UI, you can start by modifying the [./ui/components/ui_event.jsx](./ui/components/ui_event.jsx) file.
## Learn More
To learn more about LlamaIndex, take a look at the following resources:
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows.
- [LlamaIndex Server](https://pypi.org/project/llama-index-server/)
- [LlamaDeploy GitHub Repository](https://github.com/run-llama/llama_deploy)
- [Chat-UI Documentation](https://ts.llamaindex.ai/docs/chat-ui)
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
@@ -0,0 +1,131 @@
import json
import re
from typing import List, Optional, Any
from pydantic import ValidationError
from llama_index.core.chat_ui.models.artifact import (
Artifact,
ArtifactType,
CodeArtifactData,
DocumentArtifactData,
)
from llama_index.core.llms import ChatMessage
INLINE_ANNOTATION_KEY = "annotation"
def get_inline_annotations(message: ChatMessage) -> List[Any]:
"""Extract inline annotations from a chat message."""
markdown_content = message.content
inline_annotations: List[Any] = []
# Regex to match annotation code blocks
# Matches ```annotation followed by content until closing ```
annotation_regex = re.compile(
rf"```{re.escape(INLINE_ANNOTATION_KEY)}\s*\n([\s\S]*?)\n```", re.MULTILINE
)
for match in annotation_regex.finditer(markdown_content):
json_content = match.group(1).strip() if match.group(1) else None
if not json_content:
continue
try:
# Parse the JSON content
parsed = json.loads(json_content)
# Check for required fields in the parsed annotation
if (
not isinstance(parsed, dict)
or "type" not in parsed
or "data" not in parsed
):
continue
# Extract the annotation data
inline_annotations.append(parsed)
except (json.JSONDecodeError, ValidationError) as error:
# Skip invalid annotations - they might be malformed JSON or invalid schema
print(f"Failed to parse annotation: {error}")
return inline_annotations
def artifact_from_message(message: ChatMessage) -> Optional[Artifact]:
"""Create an artifact from a chat message if it contains artifact annotations."""
inline_annotations = get_inline_annotations(message)
for annotation in inline_annotations:
if isinstance(annotation, dict) and annotation.get("type") == "artifact":
try:
# Create artifact data based on type
artifact_data = annotation.get("data")
if not artifact_data:
continue
artifact_type = artifact_data.get("type")
if artifact_type == "code":
# Get the nested data object that contains the actual code information
code_info = artifact_data.get("data", {})
code_data = CodeArtifactData(
file_name=code_info.get("file_name", ""),
code=code_info.get("code", ""),
language=code_info.get("language", ""),
)
artifact = Artifact(
created_at=artifact_data.get("created_at"),
type=ArtifactType.CODE,
data=code_data,
)
elif artifact_type == "document":
# Get the nested data object that contains the actual document information
doc_info = artifact_data.get("data", {})
doc_data = DocumentArtifactData(
title=doc_info.get("title", ""),
content=doc_info.get("content", ""),
type=doc_info.get("type", "markdown"),
sources=doc_info.get("sources"),
)
artifact = Artifact(
created_at=artifact_data.get("created_at"),
type=ArtifactType.DOCUMENT,
data=doc_data,
)
else:
continue
return artifact
except Exception as e:
print(
f"Failed to parse artifact from annotation: {annotation}. Error: {e}"
)
return None
def get_artifacts(chat_history: List[ChatMessage]) -> List[Artifact]:
"""
Return a list of artifacts sorted by their creation time.
Artifacts without a creation time are placed at the end.
"""
artifacts = []
for message in chat_history:
artifact = artifact_from_message(message)
if artifact is not None:
artifacts.append(artifact)
# Sort by creation time, with None values at the end
return sorted(
artifacts,
key=lambda a: (a.created_at is None, a.created_at),
)
def get_last_artifact(chat_history: List[ChatMessage]) -> Optional[Artifact]:
"""Get the last artifact from chat history."""
artifacts = get_artifacts(chat_history)
return artifacts[-1] if len(artifacts) > 0 else None
@@ -1,10 +1,9 @@
import re
import time
from typing import Any, Literal, Optional
from typing import Any, Literal, Optional, Union
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.llms import LLM
from llama_index.llms.openai import OpenAI
from llama_index.core import Settings
from llama_index.core.llms import LLM, ChatMessage
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.prompts import PromptTemplate
from llama_index.core.workflow import (
@@ -15,26 +14,25 @@ from llama_index.core.workflow import (
Workflow,
step,
)
from llama_index.server.api.models import (
from llama_index.core.chat_ui.models.artifact import (
Artifact,
ArtifactEvent,
ArtifactType,
ChatRequest,
DocumentArtifactData,
UIEvent,
)
from llama_index.server.api.utils import get_last_artifact
from llama_index.core.chat_ui.events import (
UIEvent,
ArtifactEvent,
)
from src.utils import get_last_artifact
from src.settings import init_settings
from pydantic import BaseModel, Field
from dotenv import load_dotenv
def create_workflow(chat_request: ChatRequest) -> Workflow:
workflow = DocumentArtifactWorkflow(
llm=OpenAI(model="gpt-4.1"),
chat_request=chat_request,
timeout=120.0,
)
return workflow
def create_workflow() -> Workflow:
load_dotenv()
init_settings()
return DocumentArtifactWorkflow(timeout=120.0)
class DocumentRequirement(BaseModel):
type: Literal["markdown", "html"]
@@ -81,8 +79,6 @@ class DocumentArtifactWorkflow(Workflow):
def __init__(
self,
llm: LLM,
chat_request: ChatRequest,
**kwargs: Any,
):
"""
@@ -91,9 +87,8 @@ class DocumentArtifactWorkflow(Workflow):
chat_request: The chat request from the chat app to use.
"""
super().__init__(**kwargs)
self.llm = llm
self.chat_request = chat_request
self.last_artifact = get_last_artifact(chat_request)
self.llm: LLM = Settings.llm
self.last_artifact: Optional[Artifact] = None
@step
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> PlanEvent:
@@ -101,13 +96,21 @@ class DocumentArtifactWorkflow(Workflow):
if user_msg is None:
raise ValueError("user_msg is required to run the workflow")
await ctx.set("user_msg", user_msg)
chat_history = ev.chat_history or []
chat_history.append(
# prepare chat history from StartEvent
messages = [
ChatMessage(
role="user",
content=user_msg,
role=msg.get("role", "user"),
content=msg.get("content", ""),
)
)
for msg in ev.get("chat_history", [])
]
chat_history = [*messages, ChatMessage(role="user", content=user_msg)]
# extract inline artifact from chat history
last_artifact = get_last_artifact(messages)
self.last_artifact = last_artifact
memory = ChatMemoryBuffer.from_defaults(
chat_history=chat_history,
llm=self.llm,
@@ -115,9 +118,9 @@ class DocumentArtifactWorkflow(Workflow):
await ctx.set("memory", memory)
return PlanEvent(
user_msg=user_msg,
context=str(self.last_artifact.model_dump_json())
if self.last_artifact
else "",
context=(
str(self.last_artifact.model_dump_json()) if self.last_artifact else ""
),
)
@step
@@ -135,7 +138,8 @@ class DocumentArtifactWorkflow(Workflow):
),
)
)
prompt = PromptTemplate("""
prompt = PromptTemplate(
"""
You are a documentation analyst responsible for analyzing the user's request and providing requirements for document generation or update.
Follow these instructions:
1. Carefully analyze the conversation history and the user's request to determine what has been done and what the next step should be.
@@ -176,10 +180,13 @@ class DocumentArtifactWorkflow(Workflow):
Now, please plan for the user's request:
{user_msg}
""").format(
context=""
if event.context is None
else f"## The context is: \n{event.context}\n",
"""
).format(
context=(
""
if event.context is None
else f"## The context is: \n{event.context}\n"
),
user_msg=event.user_msg,
)
response = await self.llm.acomplete(
@@ -232,7 +239,8 @@ class DocumentArtifactWorkflow(Workflow):
),
)
)
prompt = PromptTemplate("""
prompt = PromptTemplate(
"""
You are a skilled technical writer who can help users with documentation.
You are given a task to generate or update a document for a given requirement.
@@ -265,10 +273,11 @@ class DocumentArtifactWorkflow(Workflow):
Now, please generate the document for the following requirement:
{requirement}
""").format(
previous_artifact=self.last_artifact.model_dump_json()
if self.last_artifact
else "",
"""
).format(
previous_artifact=(
self.last_artifact.model_dump_json() if self.last_artifact else ""
),
requirement=event.requirement,
)
response = await self.llm.acomplete(
@@ -345,3 +354,6 @@ class DocumentArtifactWorkflow(Workflow):
)
)
return StopEvent(result=response_stream)
workflow = create_workflow()
@@ -0,0 +1,254 @@
import logging
import uuid
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, Optional
from pydantic import BaseModel, ConfigDict
from llama_index.core.base.llms.types import ChatMessage, ChatResponse
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.tools import (
BaseTool,
FunctionTool,
ToolOutput,
ToolSelection,
)
from llama_index.core.workflow import Context
from llama_index.core.agent.workflow.workflow_events import ToolCall, ToolCallResult
from src.events import AgentRunEvent, AgentRunEventType
logger = logging.getLogger("uvicorn")
class ToolCallOutput(BaseModel):
tool_call_id: str
tool_output: ToolOutput
class ContextAwareTool(FunctionTool, ABC):
@abstractmethod
async def acall(self, ctx: Context, input: Any) -> ToolOutput: # type: ignore
pass
class ChatWithToolsResponse(BaseModel):
"""
A tool call response from chat_with_tools.
"""
tool_calls: Optional[list[ToolSelection]]
tool_call_message: Optional[ChatMessage]
generator: Optional[AsyncGenerator[ChatResponse | None, None]]
model_config = ConfigDict(arbitrary_types_allowed=True)
def is_calling_different_tools(self) -> bool:
tool_names = {tool_call.tool_name for tool_call in self.tool_calls or []}
return len(tool_names) > 1
def has_tool_calls(self) -> bool:
return self.tool_calls is not None and len(self.tool_calls) > 0
def tool_name(self) -> str:
if not self.has_tool_calls():
raise ValueError("No tool calls")
if self.is_calling_different_tools():
raise ValueError("Calling different tools")
return self.tool_calls[0].tool_name # type: ignore
async def full_response(self) -> str:
assert self.generator is not None
full_response = ""
async for chunk in self.generator:
content = chunk.delta # type: ignore
if content:
full_response += content
return full_response
async def chat_with_tools( # type: ignore
llm: FunctionCallingLLM,
tools: list[BaseTool],
chat_history: list[ChatMessage],
) -> ChatWithToolsResponse:
"""
Request LLM to call tools or not.
This function doesn't change the memory.
"""
generator = _tool_call_generator(llm, tools, chat_history)
is_tool_call = await generator.__anext__()
if is_tool_call:
# Last chunk is the full response
# Wait for the last chunk
full_response = None
async for chunk in generator:
full_response = chunk
assert isinstance(full_response, ChatResponse)
return ChatWithToolsResponse(
tool_calls=llm.get_tool_calls_from_response(full_response),
tool_call_message=full_response.message,
generator=None,
)
else:
return ChatWithToolsResponse(
tool_calls=None,
tool_call_message=None,
generator=generator, # type: ignore
)
async def call_tools(
ctx: Context,
agent_name: str,
tools: list[BaseTool],
tool_calls: list[ToolSelection],
emit_agent_events: bool = True,
) -> list[ToolCallOutput]:
"""
Call tools and return the tool call responses.
"""
if len(tool_calls) == 0:
return []
tools_by_name = {tool.metadata.get_name(): tool for tool in tools}
if len(tool_calls) == 1:
if emit_agent_events:
ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=f"{tool_calls[0].tool_name}: {tool_calls[0].tool_kwargs}",
)
)
return [
await call_tool(ctx, tools_by_name[tool_calls[0].tool_name], tool_calls[0])
]
# Multiple tool calls, show progress
tool_call_outputs: list[ToolCallOutput] = []
progress_id = str(uuid.uuid4())
total_steps = len(tool_calls)
if emit_agent_events:
ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=f"Making {total_steps} tool calls",
)
)
for i, tool_call in enumerate(tool_calls):
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
tool_call_outputs.append(
ToolCallOutput(
tool_call_id=tool_call.tool_id,
tool_output=ToolOutput(
is_error=True,
content=f"Tool {tool_call.tool_name} does not exist",
tool_name=tool_call.tool_name,
raw_input=tool_call.tool_kwargs,
raw_output={
"error": f"Tool {tool_call.tool_name} does not exist",
},
),
)
)
continue
tool_call_output = await call_tool(
ctx,
tool,
tool_call,
)
if emit_agent_events:
ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=f"{tool_call.tool_name}: {tool_call.tool_kwargs}",
event_type=AgentRunEventType.PROGRESS,
data={
"id": progress_id,
"total": total_steps,
"current": i,
},
)
)
tool_call_outputs.append(tool_call_output)
return tool_call_outputs
async def call_tool(
ctx: Context,
tool: BaseTool,
tool_call: ToolSelection,
) -> ToolCallOutput:
ctx.write_event_to_stream(
ToolCall(
tool_name=tool_call.tool_name,
tool_id=tool_call.tool_id,
tool_kwargs=tool_call.tool_kwargs,
)
)
try:
if isinstance(tool, ContextAwareTool):
if ctx is None:
raise ValueError("Context is required for context aware tool")
# inject context for calling an context aware tool
output = await tool.acall(ctx=ctx, **tool_call.tool_kwargs)
else:
output = await tool.acall(**tool_call.tool_kwargs) # type: ignore
except Exception as e:
logger.error(f"Got error in tool {tool_call.tool_name}: {e!s}")
output = ToolOutput(
is_error=True,
content=f"Error: {e!s}",
tool_name=tool.metadata.get_name(),
raw_input=tool_call.tool_kwargs,
raw_output={
"error": str(e),
},
)
ctx.write_event_to_stream(
ToolCallResult(
tool_name=tool_call.tool_name,
tool_kwargs=tool_call.tool_kwargs,
tool_id=tool_call.tool_id,
tool_output=output,
return_direct=False,
)
)
return ToolCallOutput(
tool_call_id=tool_call.tool_id,
tool_output=output,
)
async def _tool_call_generator(
llm: FunctionCallingLLM,
tools: list[BaseTool],
chat_history: list[ChatMessage],
) -> AsyncGenerator[ChatResponse | bool, None]:
response_stream = await llm.astream_chat_with_tools(
tools,
chat_history=chat_history,
allow_parallel_tool_calls=False,
)
full_response = None
yielded_indicator = False
async for chunk in response_stream:
if "tool_calls" not in chunk.message.additional_kwargs:
# Yield a boolean to indicate whether the response is a tool call
if not yielded_indicator:
yield False
yielded_indicator = True
# if not a tool call, yield the chunks!
yield chunk # type: ignore
elif not yielded_indicator:
# Yield the indicator for a tool call
yield True
yielded_indicator = True
full_response = chunk
if full_response:
yield full_response # type: ignore
@@ -0,0 +1,252 @@
import logging
import os
import tempfile
import re
from enum import Enum
from io import BytesIO
from llama_index.core.tools.function_tool import FunctionTool
# use nextjs for file server
WORKFLOW="chat"
# define nextjs file server url prefix
FILE_SERVER_URL_PREFIX = f"/deployments/{WORKFLOW}/ui/api/files/output/tools"
# When deploying to llama_deploy, ui folder will be copied to deployments folder in the temp directory
# We need to save generated documents to that exact ui directory to make it accessible to the file server
# eg: /tmp/llama_deploy/deployments/chat/ui/output/tools/generated_report.pdf
LLAMA_DEPLOY_DIR = os.path.join(tempfile.gettempdir(), "llama_deploy", "deployments")
OUTPUT_DIR = os.path.join(LLAMA_DEPLOY_DIR, WORKFLOW, "ui", "output", "tools")
class DocumentType(Enum):
PDF = "pdf"
HTML = "html"
COMMON_STYLES = """
body {
font-family: Arial, sans-serif;
line-height: 1.3;
color: #333;
}
h1, h2, h3, h4, h5, h6 {
margin-top: 1em;
margin-bottom: 0.5em;
}
p {
margin-bottom: 0.7em;
}
code {
background-color: #f4f4f4;
padding: 2px 4px;
border-radius: 4px;
}
pre {
background-color: #f4f4f4;
padding: 10px;
border-radius: 4px;
overflow-x: auto;
}
table {
border-collapse: collapse;
width: 100%;
margin-bottom: 1em;
}
th, td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #f2f2f2;
font-weight: bold;
}
"""
HTML_SPECIFIC_STYLES = """
body {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
"""
PDF_SPECIFIC_STYLES = """
@page {
size: letter;
margin: 2cm;
}
body {
font-size: 11pt;
}
h1 { font-size: 18pt; }
h2 { font-size: 16pt; }
h3 { font-size: 14pt; }
h4, h5, h6 { font-size: 12pt; }
pre, code {
font-family: Courier, monospace;
font-size: 0.9em;
}
"""
HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
{common_styles}
{specific_styles}
</style>
</head>
<body>
{content}
</body>
</html>
"""
class DocumentGenerator:
def __init__(self, file_server_url_prefix: str | None = FILE_SERVER_URL_PREFIX):
if not file_server_url_prefix:
raise ValueError("file_server_url_prefix is required")
self.file_server_url_prefix = file_server_url_prefix
@classmethod
def _generate_html_content(cls, original_content: str) -> str:
"""
Generate HTML content from the original markdown content.
"""
try:
import markdown # type: ignore
except ImportError:
raise ImportError(
"Failed to import required modules. Please install markdown."
)
# Convert markdown to HTML with fenced code and table extensions
return markdown.markdown(original_content, extensions=["fenced_code", "tables"])
@classmethod
def _generate_pdf(cls, html_content: str) -> BytesIO:
"""
Generate a PDF from the HTML content.
"""
try:
from xhtml2pdf import pisa
except ImportError:
raise ImportError(
"Failed to import required modules. Please install xhtml2pdf."
)
pdf_html = HTML_TEMPLATE.format(
common_styles=COMMON_STYLES,
specific_styles=PDF_SPECIFIC_STYLES,
content=html_content,
)
buffer = BytesIO()
pdf = pisa.pisaDocument(
BytesIO(pdf_html.encode("UTF-8")), buffer, encoding="UTF-8"
)
if pdf.err:
logging.error(f"PDF generation failed: {pdf.err}")
raise ValueError("PDF generation failed")
buffer.seek(0)
return buffer
@classmethod
def _generate_html(cls, html_content: str) -> str:
"""
Generate a complete HTML document with the given HTML content.
"""
return HTML_TEMPLATE.format(
common_styles=COMMON_STYLES,
specific_styles=HTML_SPECIFIC_STYLES,
content=html_content,
)
def generate_document(
self, original_content: str, document_type: str, file_name: str
) -> str:
"""
To generate document as PDF or HTML file.
Parameters:
original_content: str (markdown style)
document_type: str (pdf or html) specify the type of the file format based on the use case
file_name: str (name of the document file) must be a valid file name, no extensions needed
Returns:
str (URL to the document file): A file URL ready to serve.
"""
try:
doc_type = DocumentType(document_type.lower())
except ValueError:
raise ValueError(
f"Invalid document type: {document_type}. Must be 'pdf' or 'html'."
)
# Always generate html content first
html_content = self._generate_html_content(original_content)
# Based on the type of document, generate the corresponding file
if doc_type == DocumentType.PDF:
content = self._generate_pdf(html_content)
file_extension = "pdf"
elif doc_type == DocumentType.HTML:
content = BytesIO(self._generate_html(html_content).encode("utf-8"))
file_extension = "html"
else:
raise ValueError(f"Unexpected document type: {document_type}")
file_name = self._validate_file_name(file_name)
file_path = os.path.join(OUTPUT_DIR, f"{file_name}.{file_extension}")
self._write_to_file(content, file_path)
return (
f"{self.file_server_url_prefix}/{file_name}.{file_extension}"
)
@staticmethod
def _write_to_file(content: BytesIO, file_path: str) -> None:
"""
Write the content to a file.
"""
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as file:
file.write(content.getvalue())
except Exception:
raise
@staticmethod
def _validate_file_name(file_name: str) -> str:
"""
Validate the file name.
"""
# Don't allow directory traversal
if os.path.isabs(file_name):
raise ValueError("File name is not allowed.")
# Don't allow special characters
if re.match(r"^[a-zA-Z0-9_.-]+$", file_name):
return file_name
else:
raise ValueError("File name is not allowed to contain special characters.")
@classmethod
def _validate_packages(cls) -> None:
try:
import markdown # noqa: F401
import xhtml2pdf # noqa: F401
except ImportError:
raise ImportError(
"Failed to import required modules. Please install markdown and xhtml2pdf "
"using `pip install markdown xhtml2pdf`"
)
def to_tool(self) -> FunctionTool:
self._validate_packages()
return FunctionTool.from_defaults(self.generate_document)
@@ -0,0 +1,32 @@
from typing import List, Optional
from enum import Enum
from llama_index.core.base.llms.types import ChatMessage
from llama_index.core.tools import ToolSelection
from llama_index.core.workflow import Event
class AgentRunEventType(Enum):
TEXT = "text"
PROGRESS = "progress"
class AgentRunEvent(Event):
name: str
msg: str
event_type: AgentRunEventType = AgentRunEventType.TEXT
data: Optional[dict] = None
class InputEvent(Event):
input: List[ChatMessage]
response: bool = False
class ResearchEvent(Event):
input: list[ToolSelection]
class AnalyzeEvent(Event):
input: list[ToolSelection] | ChatMessage
class ReportEvent(Event):
input: list[ToolSelection]
@@ -0,0 +1,280 @@
import base64
import logging
import os
import re
import uuid
from pathlib import Path
from typing import Any, List, Optional
from pydantic import BaseModel
from llama_index.core.tools import FunctionTool
logger = logging.getLogger("uvicorn")
class FileMetadata(BaseModel):
"""Simple file metadata model"""
id: str
type: str
size: int
url: str
path: str
class InterpreterExtraResult(BaseModel):
type: str
content: Optional[str] = None
filename: Optional[str] = None
url: Optional[str] = None
class E2BToolOutput(BaseModel):
is_error: bool
logs: "Logs" # type: ignore # noqa: F821
error_message: Optional[str] = None
results: List[InterpreterExtraResult] = []
retry_count: int = 0
class E2BCodeInterpreter:
output_dir = "output/tools"
uploaded_files_dir = "output/uploaded"
interpreter: Optional["Sandbox"] = None # type: ignore # noqa: F821
def __init__(
self,
api_key: str,
output_dir: Optional[str] = None,
uploaded_files_dir: Optional[str] = None,
):
"""
Args:
api_key: The API key for the E2B Code Interpreter.
output_dir: The directory for the output files. Default is `output/tools`.
uploaded_files_dir: The directory for the files to be uploaded to the sandbox. Default is `output/uploaded`.
"""
self._validate_package()
if not api_key:
raise ValueError(
"api_key is required to run code interpreter. Get it here: https://e2b.dev/docs/getting-started/api-key"
)
self.api_key = api_key
self.output_dir = output_dir or "output/tools"
self.uploaded_files_dir = uploaded_files_dir or "output/uploaded"
@classmethod
def _validate_package(cls) -> None:
try:
from e2b_code_interpreter import Sandbox # noqa: F401
from e2b_code_interpreter.models import Logs # noqa: F401
except ImportError:
raise ImportError(
"e2b_code_interpreter is not installed. Please install it using `pip install e2b-code-interpreter`."
)
def __del__(self) -> None:
"""
Kill the interpreter when the tool is no longer in use.
"""
if self.interpreter is not None:
self.interpreter.kill()
def _init_interpreter(self, sandbox_files: List[str] = []) -> None:
"""
Lazily initialize the interpreter.
"""
from e2b_code_interpreter import Sandbox
logger.info(f"Initializing interpreter with {len(sandbox_files)} files")
self.interpreter = Sandbox(api_key=self.api_key)
if len(sandbox_files) > 0:
for file_path in sandbox_files:
file_name = os.path.basename(file_path)
local_file_path = os.path.join(self.uploaded_files_dir, file_name)
with open(local_file_path, "rb") as f:
content = f.read()
if self.interpreter and self.interpreter.files:
self.interpreter.files.write(file_path, content)
logger.info(f"Uploaded {len(sandbox_files)} files to sandbox")
def _process_file_name(self, file_name: str) -> tuple[str, str]:
"""
Process original file name to generate a unique file id and extension.
"""
_id = str(uuid.uuid4())
name, extension = os.path.splitext(file_name)
extension = extension.lstrip(".")
if extension == "":
raise ValueError("File name is not valid! It must have an extension.")
# sanitize the name
name = re.sub(r"[^a-zA-Z0-9.]", "_", name)
file_id = f"{name}_{_id}.{extension}"
return file_id, extension
def _get_file_url(self, file_id: str, save_dir: str) -> str:
"""
Get the URL of a file.
"""
# Ensure the path uses forward slashes for URLs
url_path = f"{save_dir}/{file_id}".replace("\\", "/")
return f"/api/files/{url_path}"
def _save_file(self, content: bytes, file_name: str, save_dir: str) -> FileMetadata:
file_id, extension = self._process_file_name(file_name)
file_path = os.path.join(save_dir, file_id)
# Write the file directly
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
f.write(content)
except PermissionError as e:
logger.error(f"Permission denied when writing to file {file_path}: {e!s}")
raise
except OSError as e:
logger.error(f"IO error occurred when writing to file {file_path}: {e!s}")
raise
except Exception as e:
logger.error(f"Unexpected error when writing to file {file_path}: {e!s}")
raise
logger.info(f"Saved file to {file_path}")
file_size = os.path.getsize(file_path)
file_url = self._get_file_url(file_id, save_dir)
return FileMetadata(
id=file_id,
type=extension,
size=file_size,
url=file_url,
path=file_path,
)
def _save_to_disk(self, base64_data: str, ext: str) -> FileMetadata:
buffer = base64.b64decode(base64_data)
# Output from e2b doesn't have a name. Create a random name for it.
filename = f"e2b_file_{uuid.uuid4()}.{ext}"
return self._save_file(buffer, file_name=filename, save_dir=self.output_dir)
def _parse_result(self, result: Any) -> List[InterpreterExtraResult]:
"""
The result could include multiple formats (e.g. png, svg, etc.) but encoded in base64
We save each result to disk and return saved file metadata (extension, filename, url).
"""
if not result:
return []
output = []
try:
formats = result.formats()
results = [result[format] for format in formats]
for ext, data in zip(formats, results):
if ext in ["png", "svg", "jpeg", "pdf"]:
document_file = self._save_to_disk(data, ext)
output.append(
InterpreterExtraResult(
type=ext,
filename=document_file.id,
url=document_file.url,
)
)
else:
# Try serialize data to string
try:
data = str(data)
except Exception as e:
data = f"Error when serializing data: {e}"
output.append(
InterpreterExtraResult(
type=ext,
content=data,
)
)
except Exception as error:
logger.exception(error, exc_info=True)
logger.error("Error when parsing output from E2b interpreter tool", error)
return output
def interpret(
self,
code: str,
sandbox_files: List[str] = [],
retry_count: int = 0,
) -> E2BToolOutput:
"""
Execute Python code in a Jupyter notebook cell. The tool will return the result, stdout, stderr, display_data, and error.
If the code needs to use a file, ALWAYS pass the file path in the sandbox_files argument.
You have a maximum of 3 retries to get the code to run successfully.
Parameters:
code (str): The Python code to be executed in a single cell.
sandbox_files (List[str]): List of local file paths to be used by the code. The tool will throw an error if a file is not found.
retry_count (int): Number of times the tool has been retried.
"""
from e2b_code_interpreter.models import Logs
if retry_count > 2:
return E2BToolOutput(
is_error=True,
logs=Logs(
stdout="",
stderr="",
display_data="",
error="",
),
error_message="Failed to execute the code after 3 retries. Explain the error to the user and suggest a fix.",
retry_count=retry_count,
)
if self.interpreter is None:
self._init_interpreter(sandbox_files)
if self.interpreter:
logger.info(
f"\n{'=' * 50}\n> Running following AI-generated code:\n{code}\n{'=' * 50}"
)
exec = self.interpreter.run_code(code)
if exec.error:
error_message = f"The code failed to execute successfully. Error: {exec.error}. Try to fix the code and run again."
logger.error(error_message)
# Calling the generated code caused an error. Kill the interpreter and return the error to the LLM so it can try to fix the error
try:
self.interpreter.kill() # type: ignore
except Exception:
pass
finally:
self.interpreter = None
output = E2BToolOutput(
is_error=True,
logs=exec.logs,
results=[],
error_message=error_message,
retry_count=retry_count + 1,
)
else:
if len(exec.results) == 0:
output = E2BToolOutput(is_error=False, logs=exec.logs, results=[])
else:
results = self._parse_result(exec.results[0])
output = E2BToolOutput(
is_error=False,
logs=exec.logs,
results=results,
retry_count=retry_count + 1,
)
return output
else:
raise ValueError("Interpreter is not initialized.")
def to_tool(self) -> FunctionTool:
self._validate_package()
return FunctionTool.from_defaults(self.interpret)
@@ -0,0 +1,47 @@
import os
from typing import Any, Optional
from llama_index.core.base.base_query_engine import BaseQueryEngine
from llama_index.core.indices.base import BaseIndex
from llama_index.core.tools.query_engine import QueryEngineTool
def create_query_engine(index: BaseIndex, **kwargs: Any) -> BaseQueryEngine:
"""
Create a query engine for the given index.
Args:
index: The index to create a query engine for.
params (optional): Additional parameters for the query engine, e.g: similarity_top_k
"""
top_k = int(os.getenv("TOP_K", 0))
if top_k != 0 and kwargs.get("filters") is None:
kwargs["similarity_top_k"] = top_k
return index.as_query_engine(**kwargs)
def get_query_engine_tool(
index: BaseIndex,
name: Optional[str] = None,
description: Optional[str] = None,
**kwargs: Any,
) -> QueryEngineTool:
"""
Get a query engine tool for the given index.
Args:
index: The index to create a query engine for.
name (optional): The name of the tool.
description (optional): The description of the tool.
"""
if name is None:
name = "query_index"
if description is None:
description = "Use this tool to retrieve information from a knowledge base. Provide a specific query and can call the tool multiple times if necessary."
query_engine = create_query_engine(index, **kwargs)
tool = QueryEngineTool.from_defaults(
query_engine=query_engine,
name=name,
description=description,
)
return tool
@@ -0,0 +1,46 @@
from typing import AsyncGenerator, Union
from llama_index.core.base.llms.types import (
CompletionResponse,
CompletionResponseAsyncGen,
ChatResponse,
)
from llama_index.core.workflow import Context
from llama_index.core.agent.workflow.workflow_events import AgentStream
async def write_response_to_stream(
res: Union[CompletionResponse, CompletionResponseAsyncGen, AsyncGenerator[ChatResponse, None]],
ctx: Context,
current_agent_name: str = "assistant",
) -> str:
"""
Handle both streaming and non-streaming LLM responses.
Args:
res: The LLM response (either streaming or non-streaming)
ctx: The workflow context for writing events to stream
current_agent_name: The name of the current agent (default: "assistant")
Returns:
The final response text as a string
"""
final_response = ""
if isinstance(res, AsyncGenerator):
# Handle streaming response (CompletionResponseAsyncGen or ChatResponse AsyncGenerator)
async for chunk in res:
ctx.write_event_to_stream(
AgentStream(
delta=chunk.delta or "",
response=final_response,
current_agent_name=current_agent_name,
tool_calls=[],
raw=getattr(chunk, 'raw', None) or "",
)
)
final_response += chunk.delta or ""
else:
# Handle non-streaming response (CompletionResponse)
final_response = res.text
return final_response
@@ -1,7 +1,9 @@
import os
from typing import List, Optional
from enum import Enum
from dotenv import load_dotenv
from app.index import get_index
from llama_index.core import Settings
from llama_index.core.base.llms.types import ChatMessage, MessageRole
from llama_index.core.llms.function_calling import FunctionCallingLLM
@@ -15,19 +17,27 @@ from llama_index.core.workflow import (
Workflow,
step,
)
from llama_index.server.api.models import AgentRunEvent, ChatRequest
from llama_index.server.settings import server_settings
from llama_index.server.tools.document_generator import DocumentGenerator
from llama_index.server.tools.index import get_query_engine_tool
from llama_index.server.tools.interpreter import E2BCodeInterpreter
from llama_index.server.utils.agent_tool import (
call_tools,
chat_with_tools,
from src.index import get_index
from src.settings import init_settings
from src.query import get_query_engine_tool
from src.document_generator import DocumentGenerator
from src.interpreter import E2BCodeInterpreter
from src.events import (
InputEvent,
ResearchEvent,
AnalyzeEvent,
ReportEvent,
AgentRunEvent,
)
from src.agent_tool import call_tools, chat_with_tools
from src.utils import write_response_to_stream
def create_workflow(chat_request: Optional[ChatRequest] = None) -> Workflow:
index = get_index(chat_request=chat_request)
def create_workflow() -> Workflow:
load_dotenv()
init_settings()
index = get_index()
if index is None:
raise ValueError(
"Index is not found. Try run generation script to create the index first."
@@ -39,9 +49,7 @@ def create_workflow(chat_request: Optional[ChatRequest] = None) -> Workflow:
"E2B_API_KEY is required to use the code interpreter tool. Please check README.md to know how to get the key."
)
code_interpreter_tool = E2BCodeInterpreter(api_key=e2b_api_key).to_tool()
document_generator_tool = DocumentGenerator(
file_server_url_prefix=server_settings.file_server_url_prefix,
).to_tool()
document_generator_tool = DocumentGenerator().to_tool()
return FinancialReportWorkflow(
query_engine_tool=query_engine_tool,
@@ -51,23 +59,6 @@ def create_workflow(chat_request: Optional[ChatRequest] = None) -> Workflow:
)
class InputEvent(Event):
input: List[ChatMessage]
response: bool = False
class ResearchEvent(Event):
input: list[ToolSelection]
class AnalyzeEvent(Event):
input: list[ToolSelection] | ChatMessage
class ReportEvent(Event):
input: list[ToolSelection]
class FinancialReportWorkflow(Workflow):
"""
A workflow to generate a financial report using indexed documents.
@@ -129,10 +120,14 @@ class FinancialReportWorkflow(Workflow):
async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> InputEvent:
self.stream = ev.get("stream", True)
user_msg = ev.get("user_msg")
chat_history = ev.get("chat_history")
if chat_history is not None:
self.memory.put_messages(chat_history)
messages = [
ChatMessage(
role=msg.get("role", "user"),
content=msg.get("content", ""),
)
for msg in ev.get("chat_history", [])
]
self.memory.put_messages(messages)
# Add user message to memory
self.memory.put(ChatMessage(role=MessageRole.USER, content=user_msg))
@@ -164,7 +159,8 @@ class FinancialReportWorkflow(Workflow):
)
if not response.has_tool_calls():
if self.stream:
return StopEvent(result=response.generator)
final_response = await write_response_to_stream(response.generator, ctx)
return StopEvent(result=final_response)
else:
return StopEvent(result=await response.full_response())
# calling different tools at the same time is not supported at the moment
@@ -331,3 +327,6 @@ class FinancialReportWorkflow(Workflow):
)
# After the tool calls, fallback to the input with the latest chat history
return InputEvent(input=self.memory.get())
workflow = create_workflow()
@@ -1,109 +0,0 @@
This is a [LlamaIndex](https://www.llamaindex.ai/) project using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/).
## Getting Started
First, setup the environment with uv:
> **_Note:_** This step is not needed if you are using the dev-container.
```shell
uv sync
```
Then check the parameters that have been pre-configured in the `.env` file in this directory.
Make sure you have set the `OPENAI_API_KEY` for the LLM.
Then, run the development server:
```shell
uv run fastapi dev
```
Then open [http://localhost:8000](http://localhost:8000) with your browser to start the chat UI.
To start the app optimized for **production**, run:
```
uv run fastapi run
```
## Configure LLM and Embedding Model
You can configure [LLM model](https://docs.llamaindex.ai/en/stable/module_guides/models/llms) and [embedding model](https://docs.llamaindex.ai/en/stable/module_guides/models/embeddings) in [settings.py](app/settings.py).
## Use Case
This example shows how to use the LlamaIndexServer with a human in the loop. It allows you to start CLI commands that are reviewed by a human before execution.
To update the workflow, you can modify the code in [`workflow.py`](app/workflow.py).
You can start by sending an request on the [chat UI](http://localhost:8000) or you can test the `/api/chat` endpoint with the following curl request:
```
curl --location 'localhost:8000/api/chat' \
--header 'Content-Type: application/json' \
--data '{ "messages": [{ "role": "user", "content": "Show me the files in the current directory" }] }'
```
## How does HITL work?
### Events
The human-in-the-loop approach used here is based on a simple idea: the workflow pauses and waits for a human response before proceeding to the next step.
To do this, you will need to implement two custom events:
- [HumanInputEvent](https://github.com/run-llama/create-llama/blob/main/packages/server/src/utils/hitl/events.ts): This event is used to request input from the user.
- [HumanResponseEvent](https://github.com/run-llama/create-llama/blob/main/packages/server/src/utils/hitl/events.ts): This event is sent to the workflow to resume execution with input from the user.
In this example, we have implemented these two custom events in [`events.ts`](src/app/events.ts):
- `cliHumanInputEvent` to request input from the user for CLI command execution.
- `cliHumanResponseEvent` to resume the workflow with the response from the user.
```typescript
export const cliHumanInputEvent = humanInputEvent<{
type: "cli_human_input";
data: { command: string };
response: typeof cliHumanResponseEvent;
}>();
export const cliHumanResponseEvent = humanResponseEvent<{
type: "human_response";
data: { execute: boolean; command: string };
}>();
```
### UI Component
HITL also needs a custom UI component, that is shown when the LlamaIndexServer receives the `cliHumanInputEvent`. The name of the component is defined in the `type` field of the `cliHumanInputEvent` - in our case, it is `cli_human_input`, which corresponds to the [cli_human_input.tsx](./components/cli_human_input.tsx) component.
The custom component must use `append` to send a message with a `human_response` annotation. The data of the annotation must be in the format of the response event `cliHumanResponseEvent`, in our case, for sending to execute the command `ls -l`, we would send:
```tsx
append({
content: "Yes",
role: "user",
annotations: [
{
type: "human_response",
data: {
execute: true,
command: "ls -l", // The command to execute
},
},
],
});
```
This component displays the command to execute and the user can choose to execute or cancel the command execution.
## Learn More
To learn more about LlamaIndex, take a look at the following resources:
- [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex.
- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows.
- [LlamaIndex Server](https://pypi.org/project/llama-index-server/)
You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome!
@@ -1,34 +0,0 @@
from typing import Type
from pydantic import BaseModel, Field
from llama_index.server.models import HumanInputEvent, HumanResponseEvent
class CLIHumanResponseEvent(HumanResponseEvent):
execute: bool = Field(
description="True if the human wants to execute the command, False otherwise."
)
command: str = Field(description="The command to execute.")
class CLICommand(BaseModel):
command: str = Field(description="The command to execute.")
# We need an event that extends from HumanInputEvent for HITL feature
class CLIHumanInputEvent(HumanInputEvent):
"""
CLIInputRequiredEvent is sent when the agent needs permission from the user to execute the CLI command or not.
Render this event by showing the command and a boolean button to execute the command or not.
"""
event_type: str = (
"cli_human_input" # used by UI to render with appropriate component
)
response_event_type: Type = (
CLIHumanResponseEvent # used by workflow to resume with the correct event
)
data: CLICommand = Field( # the data that sent to the UI for rendering
description="The command to execute.",
)
@@ -1,87 +0,0 @@
import platform
import subprocess
from typing import Any
from app.events import CLICommand, CLIHumanInputEvent, CLIHumanResponseEvent
from llama_index.core.prompts import PromptTemplate
from llama_index.core.settings import Settings
from llama_index.core.workflow import (
Context,
StartEvent,
StopEvent,
Workflow,
step,
)
def create_workflow() -> Workflow:
return CLIWorkflow()
class CLIWorkflow(Workflow):
"""
A workflow has ability to execute command line tool with human in the loop for confirmation.
"""
default_prompt = PromptTemplate(
template="""
You are a helpful assistant who can write CLI commands to execute using {cli_language}.
Your task is to analyze the user's request and write a CLI command to execute.
## User Request
{user_request}
Don't be verbose, only respond with the CLI command without any other text.
"""
)
def __init__(self, **kwargs: Any) -> None:
# HITL Workflow should disable timeout otherwise, we will get a timeout error from callback
kwargs["timeout"] = None
super().__init__(**kwargs)
@step
async def start(self, ctx: Context, ev: StartEvent) -> CLIHumanInputEvent:
user_msg = ev.user_msg
if user_msg is None:
raise ValueError("Missing user_msg in StartEvent")
await ctx.set("user_msg", user_msg)
# Request LLM to generate a CLI command
os_name = platform.system()
if os_name == "Linux" or os_name == "Darwin":
cli_language = "bash"
else:
cli_language = "cmd"
prompt = self.default_prompt.format(
user_request=user_msg, cli_language=cli_language
)
llm = Settings.llm
if llm is None:
raise ValueError("Missing LLM in Settings")
response = await llm.acomplete(prompt, formatted=True)
command = response.text.strip()
if command == "":
raise ValueError("Couldn't generate a command")
# Send the command to the user for confirmation
await ctx.set("command", command)
return CLIHumanInputEvent( # type: ignore
data=CLICommand(command=command),
response_event_type=CLIHumanResponseEvent,
)
@step
async def handle_human_response(
self,
ctx: Context,
ev: CLIHumanResponseEvent, # This event is sent by LlamaIndexServer when user response
) -> StopEvent:
# If we have human response, check the confirmation and execute the command
if ev.execute:
command = ev.command or ""
if command == "":
raise ValueError("Missing command in CLIExecutionEvent")
res = subprocess.run(command, shell=True, capture_output=True, text=True)
return StopEvent(result=res.stdout or res.stderr)
else:
return StopEvent(result=None)
@@ -6,12 +6,12 @@ load_dotenv()
import logging
from app.index import get_index
from app.settings import init_settings
from llama_index.server.services.llamacloud.generate import (
load_to_llamacloud,
)
from llama_index.core.readers import SimpleDirectoryReader
from tqdm import tqdm
from src.index import get_index
from src.service import LLamaCloudFileService
from src.settings import init_settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
@@ -25,29 +25,41 @@ def generate_index():
if index is None:
raise ValueError("Index not found and could not be created")
load_to_llamacloud(index, logger=logger)
# use SimpleDirectoryReader to retrieve the files to process
reader = SimpleDirectoryReader(
"ui/data",
recursive=True,
)
files_to_process = reader.input_files
# add each file to the LlamaCloud pipeline
error_files = []
for input_file in tqdm(
files_to_process,
desc="Processing files",
unit="file",
):
with open(input_file, "rb") as f:
logger.debug(
f"Adding file {input_file} to pipeline {index.name} in project {index.project_name}"
)
try:
LLamaCloudFileService.add_file_to_pipeline(
index.project.id,
index.pipeline.id,
f,
custom_metadata={},
wait_for_processing=False,
)
except Exception as e:
error_files.append(input_file)
logger.error(f"Error adding file {input_file}: {e}")
if error_files:
logger.error(f"Failed to add the following files: {error_files}")
logger.info("Finished generating the index")
def generate_ui_for_workflow():
"""
Generate UI for UIEventData event in app/workflow.py
"""
import asyncio
from llama_index.llms.openai import OpenAI
from main import COMPONENT_DIR
# To generate UI components for additional event types,
# import the corresponding data model (e.g., MyCustomEventData)
# and run the generate_ui_for_workflow function with the imported model.
# Make sure the output filename of the generated UI component matches the event type (here `ui_event`)
try:
from app.workflow import UIEventData # type: ignore
except ImportError:
raise ImportError("Couldn't generate UI component for the current workflow.")
from llama_index.server.gen_ui import generate_event_component
# works also well with Claude 3.7 Sonnet or Gemini Pro 2.5
llm = OpenAI(model="gpt-4.1")
code = asyncio.run(generate_event_component(event_cls=UIEventData, llm=llm))
with open(f"{COMPONENT_DIR}/ui_event.jsx", "w") as f:
f.write(code)
if __name__ == "__main__":
generate_index()
@@ -1,7 +1,146 @@
from llama_index.server.services.llamacloud import (
LlamaCloudIndex,
get_client,
get_index,
)
import logging
import os
from typing import Optional
__all__ = ["LlamaCloudIndex", "get_client", "get_index"]
from llama_cloud import PipelineType
from llama_index.core.callbacks import CallbackManager
from llama_index.core.ingestion.api_utils import (
get_client as llama_cloud_get_client,
)
from llama_index.core.settings import Settings
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
from pydantic import BaseModel, Field, field_validator
logger = logging.getLogger("uvicorn")
class LlamaCloudConfig(BaseModel):
# Private attributes
api_key: str = Field(
exclude=True, # Exclude from the model representation
)
base_url: Optional[str] = Field(
exclude=True,
)
organization_id: Optional[str] = Field(
exclude=True,
)
# Configuration attributes, can be set by the user
pipeline: str = Field(
description="The name of the pipeline to use",
)
project: str = Field(
description="The name of the LlamaCloud project",
)
def __init__(self, **kwargs):
if "api_key" not in kwargs:
kwargs["api_key"] = os.getenv("LLAMA_CLOUD_API_KEY")
if "base_url" not in kwargs:
kwargs["base_url"] = os.getenv("LLAMA_CLOUD_BASE_URL")
if "organization_id" not in kwargs:
kwargs["organization_id"] = os.getenv("LLAMA_CLOUD_ORGANIZATION_ID")
if "pipeline" not in kwargs:
kwargs["pipeline"] = os.getenv("LLAMA_CLOUD_INDEX_NAME")
if "project" not in kwargs:
kwargs["project"] = os.getenv("LLAMA_CLOUD_PROJECT_NAME")
super().__init__(**kwargs)
# Validate and throw error if the env variables are not set before starting the app
@field_validator("pipeline", "project", "api_key", mode="before")
@classmethod
def validate_fields(cls, value):
if value is None:
raise ValueError(
"Please set LLAMA_CLOUD_INDEX_NAME, LLAMA_CLOUD_PROJECT_NAME and LLAMA_CLOUD_API_KEY"
" to your environment variables or config them in .env file"
)
return value
def to_client_kwargs(self) -> dict:
return {
"api_key": self.api_key,
"base_url": self.base_url,
}
class IndexConfig(BaseModel):
llama_cloud_pipeline_config: LlamaCloudConfig = Field(
default_factory=lambda: LlamaCloudConfig(),
alias="llamaCloudPipeline",
)
callback_manager: Optional[CallbackManager] = Field(
default=None,
)
def to_index_kwargs(self) -> dict:
return {
"name": self.llama_cloud_pipeline_config.pipeline,
"project_name": self.llama_cloud_pipeline_config.project,
"api_key": self.llama_cloud_pipeline_config.api_key,
"base_url": self.llama_cloud_pipeline_config.base_url,
"organization_id": self.llama_cloud_pipeline_config.organization_id,
"callback_manager": self.callback_manager,
}
def get_index(
config: IndexConfig = None,
create_if_missing: bool = False,
):
if config is None:
config = IndexConfig()
# Check whether the index exists
try:
index = LlamaCloudIndex(**config.to_index_kwargs())
return index
except ValueError:
logger.warning("Index not found")
if create_if_missing:
logger.info("Creating index")
_create_index(config)
return LlamaCloudIndex(**config.to_index_kwargs())
return None
def get_client():
config = LlamaCloudConfig()
return llama_cloud_get_client(**config.to_client_kwargs())
def _create_index(
config: IndexConfig,
):
client = get_client()
pipeline_name = config.llama_cloud_pipeline_config.pipeline
pipelines = client.pipelines.search_pipelines(
pipeline_name=pipeline_name,
pipeline_type=PipelineType.MANAGED.value,
)
if len(pipelines) == 0:
from llama_index.embeddings.openai import OpenAIEmbedding
if not isinstance(Settings.embed_model, OpenAIEmbedding):
raise ValueError(
"Creating a new pipeline with a non-OpenAI embedding model is not supported."
)
client.pipelines.upsert_pipeline(
request={
"name": pipeline_name,
"embedding_config": {
"type": "OPENAI_EMBEDDING",
"component": {
"api_key": os.getenv("OPENAI_API_KEY"), # editable
"model_name": os.getenv("EMBEDDING_MODEL"),
},
},
"transform_config": {
"mode": "auto",
"config": {
"chunk_size": Settings.chunk_size, # editable
"chunk_overlap": Settings.chunk_overlap, # editable
},
},
},
)
@@ -0,0 +1,74 @@
import logging
import os
import time
import typing
from io import BytesIO
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from llama_cloud import ManagedIngestionStatus, PipelineFileCreateCustomMetadataValue
from pydantic import BaseModel
from src.index import get_client
logger = logging.getLogger("uvicorn")
class LlamaCloudFile(BaseModel):
file_name: str
pipeline_id: str
def __eq__(self, other):
if not isinstance(other, LlamaCloudFile):
return NotImplemented
return (
self.file_name == other.file_name and self.pipeline_id == other.pipeline_id
)
def __hash__(self):
return hash((self.file_name, self.pipeline_id))
class LLamaCloudFileService:
LOCAL_STORE_PATH = "output/llamacloud"
DOWNLOAD_FILE_NAME_TPL = "{pipeline_id}${filename}"
@classmethod
def add_file_to_pipeline(
cls,
project_id: str,
pipeline_id: str,
upload_file: Union[typing.IO, Tuple[str, BytesIO]],
custom_metadata: Optional[Dict[str, PipelineFileCreateCustomMetadataValue]],
wait_for_processing: bool = True,
) -> str:
client = get_client()
file = client.files.upload_file(project_id=project_id, upload_file=upload_file)
file_id = file.id
files = [
{
"file_id": file_id,
"custom_metadata": {"file_id": file_id, **(custom_metadata or {})},
}
]
files = client.pipelines.add_files_to_pipeline_api(pipeline_id, request=files)
if not wait_for_processing:
return file_id
# Wait 2s for the file to be processed
max_attempts = 20
attempt = 0
while attempt < max_attempts:
result = client.pipelines.get_pipeline_file_status(
file_id=file_id, pipeline_id=pipeline_id
)
if result.status == ManagedIngestionStatus.ERROR:
raise Exception(f"File processing failed: {str(result)}")
if result.status == ManagedIngestionStatus.SUCCESS:
# File is ingested - return the file id
return file_id
attempt += 1
time.sleep(0.1) # Sleep for 100ms
raise Exception(
f"File processing did not complete after {max_attempts} attempts."
)
@@ -1,68 +0,0 @@
import logging
import os
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def generate_index():
"""
Index the documents in the data directory.
"""
from app.index import STORAGE_DIR
from app.settings import init_settings
from llama_index.core.indices import (
VectorStoreIndex,
)
from llama_index.core.readers import SimpleDirectoryReader
load_dotenv()
init_settings()
logger.info("Creating new index")
# load the documents and create the index
reader = SimpleDirectoryReader(
os.environ.get("DATA_DIR", "data"),
recursive=True,
)
documents = reader.load_data()
index = VectorStoreIndex.from_documents(
documents,
show_progress=True,
)
# store it for later
index.storage_context.persist(STORAGE_DIR)
logger.info(f"Finished creating new index. Stored in {STORAGE_DIR}")
def generate_ui_for_workflow():
"""
Generate UI for UIEventData event in app/workflow.py
"""
import asyncio
from app.settings import init_settings
from llama_index.core.settings import Settings
from main import COMPONENT_DIR
load_dotenv()
init_settings()
# To generate UI components for additional event types,
# import the corresponding data model (e.g., MyCustomEventData)
# and run the generate_ui_for_workflow function with the imported model.
# Make sure the output filename of the generated UI component matches the event type (here `ui_event`)
try:
from app.workflow import UIEventData # type: ignore
except ImportError:
raise ImportError("Couldn't generate UI component for the current workflow.")
from llama_index.server.gen_ui import generate_event_component
# works well with OpenAI gpt-4.1, Claude 3.7 Sonnet or Gemini Pro 2.5
code = asyncio.run(
generate_event_component(event_cls=UIEventData, llm=Settings.llm)
)
with open(f"{COMPONENT_DIR}/ui_event.jsx", "w") as f:
f.write(code)
@@ -0,0 +1,24 @@
name: chat
control-plane:
port: 8000
default-service: workflow
services:
workflow:
name: Workflow
source:
type: local
name: src
path: src/workflow:workflow
python-dependencies:
- llama-index-llms-openai>=0.4.5
- llama-index-core>=0.12.45
ui:
name: My Nextjs App
port: 3000
source:
type: local
name: ui
@@ -1,32 +0,0 @@
import logging
from app.settings import init_settings
from app.workflow import create_workflow
from dotenv import load_dotenv
from llama_index.server import LlamaIndexServer, UIConfig
logger = logging.getLogger("uvicorn")
# A path to a directory where the customized UI code is stored
COMPONENT_DIR = "components"
def create_app():
app = LlamaIndexServer(
workflow_factory=create_workflow, # A factory function that creates a new workflow for each request
ui_config=UIConfig(
component_dir=COMPONENT_DIR,
dev_mode=True, # Please disable this in production
layout_dir="layout",
),
logger=logger,
env="dev",
)
# You can also add custom FastAPI routes to app
app.add_api_route("/api/health", lambda: {"message": "OK"}, status_code=200)
return app
load_dotenv()
init_settings()
app = create_app()
@@ -9,12 +9,17 @@ readme = "README.md"
requires-python = ">=3.11,<3.14"
dependencies = [
"python-dotenv>=1.0.0,<2.0.0",
"pydantic<2.10",
"pydantic>=2.11.5",
"aiostream>=0.5.2,<0.6.0",
"llama-index-core>=0.12.28,<0.13.0",
"llama-index-server>=0.1.17,<0.2.0",
"llama-index-readers-file>=0.4.6,<1.0.0",
"llama-index-indices-managed-llama-cloud>=0.6.3,<1.0.0",
"llama-deploy",
]
[tool.uv.sources]
llama-deploy = { git = "https://github.com/run-llama/llama_deploy" }
[project.optional-dependencies]
dev = [
"mypy>=1.8.0,<2.0.0",
@@ -23,9 +28,7 @@ dev = [
]
[project.scripts]
generate = "generate:generate_index"
generate_index = "generate:generate_index"
generate_ui = "generate:generate_ui_for_workflow"
generate = "src.generate:generate_index"
[tool.mypy]
@@ -43,7 +46,7 @@ strict_optional = false
disable_error_code = [ "return-value", "assignment" ]
[[tool.mypy.overrides]]
module = "app.*"
module = "src.*"
ignore_missing_imports = false
[tool.hatch.metadata]
@@ -51,4 +54,7 @@ allow-direct-references = true
[build-system]
requires = [ "hatchling>=1.24" ]
build-backend = "hatchling.build"
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]
@@ -0,0 +1,37 @@
import logging
import os
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def generate_index():
"""
Index the documents in the data directory.
"""
from src.index import STORAGE_DIR
from src.settings import init_settings
from llama_index.core.indices import (
VectorStoreIndex,
)
from llama_index.core.readers import SimpleDirectoryReader
load_dotenv()
init_settings()
logger.info("Creating new index")
# load the documents and create the index
reader = SimpleDirectoryReader(
os.environ.get("DATA_DIR", "ui/data"),
recursive=True,
)
documents = reader.load_data()
index = VectorStoreIndex.from_documents(
documents,
show_progress=True,
)
# store it for later
index.storage_context.persist(STORAGE_DIR)
logger.info(f"Finished creating new index. Stored in {STORAGE_DIR}")
@@ -1,23 +1,21 @@
import logging
import os
from typing import Optional
from llama_index.core.indices import load_index_from_storage
from llama_index.server.api.models import ChatRequest
from llama_index.server.tools.index.utils import get_storage_context
from llama_index.core.storage import StorageContext
logger = logging.getLogger("uvicorn")
STORAGE_DIR = "storage"
STORAGE_DIR = "src/storage"
def get_index(chat_request: Optional[ChatRequest] = None):
def get_index():
# check if storage already exists
if not os.path.exists(STORAGE_DIR):
return None
# load the existing index
logger.info(f"Loading index from {STORAGE_DIR}...")
storage_context = get_storage_context(STORAGE_DIR)
storage_context = StorageContext.from_defaults(persist_dir=STORAGE_DIR)
index = load_index_from_storage(storage_context)
logger.info(f"Finished loading index from {STORAGE_DIR}")
return index