simplify repo, remove Next

This commit is contained in:
starmorph
2025-05-01 14:02:19 -07:00
parent 2521c40754
commit f2eb51566c
46 changed files with 205 additions and 6427 deletions
+30 -10
View File
@@ -9,7 +9,7 @@
## TypeScript Email Assistant Implementation
The TypeScript Email Assistant (`app/page.tsx`) implements an LLM-powered workflow to process emails:
This is a vanilla TypeScript project for email assistant workflows using LangChain and LangGraph.
### Architecture
- Uses `StateGraph` from LangGraph to create a multi-step workflow
@@ -43,17 +43,37 @@ The TypeScript Email Assistant (`app/page.tsx`) implements an LLM-powered workfl
- Properly typed with TypeScript for complete type safety
- Uses command pattern to transition between states
### P2
- [ ] notebooks
- [ ] video course [1,2,3,4]
### Project Structure
- `scripts/`: TypeScript scripts to run the email assistant
- `lib/`: Utility functions, tools, and shared types
- `lib/tools/`: Tool implementations
- `lib/prompts.ts`: Prompt templates
- `lib/schemas.ts`: TypeScript/Zod schemas
- `lib/utils.ts`: Utility functions
### New TS files
- `lib/tools/`
- `lib/prompts.ts`
- `lib/schemas.ts`
- `lib/utils.ts`
## Getting Started
### Course outline
First, install dependencies:
```bash
npm install
# or
yarn
# or
pnpm install
```
Then, run the development script:
```bash
npm run dev
# or
yarn dev
# or
pnpm dev
```
## Course outline
> BUILD
> EVAL
-27
View File
@@ -1,27 +0,0 @@
import { NextRequest, NextResponse } from "next/server";
import { getEmailAssistant } from "../../page";
import { EmailData } from "../../../lib/schemas";
// This is a temporary API route for testing the email assistant (before setting up LangSmith)
export async function POST(request: NextRequest) {
try {
const data = await request.json();
const { email_input } = data;
// Get the email assistant
const emailAssistant = await getEmailAssistant();
// Run the email assistant with the raw email text
const result = await emailAssistant.invoke({
email_input
});
return NextResponse.json(result);
} catch (error: any) {
console.error("Error running email assistant:", error);
return NextResponse.json(
{ error: error.message },
{ status: 500 }
);
}
}
BIN
View File
Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

-122
View File
@@ -1,122 +0,0 @@
@import "tailwindcss";
@import "tw-animate-css";
@custom-variant dark (&:is(.dark *));
@theme inline {
--color-background: var(--background);
--color-foreground: var(--foreground);
--font-sans: var(--font-geist-sans);
--font-mono: var(--font-geist-mono);
--color-sidebar-ring: var(--sidebar-ring);
--color-sidebar-border: var(--sidebar-border);
--color-sidebar-accent-foreground: var(--sidebar-accent-foreground);
--color-sidebar-accent: var(--sidebar-accent);
--color-sidebar-primary-foreground: var(--sidebar-primary-foreground);
--color-sidebar-primary: var(--sidebar-primary);
--color-sidebar-foreground: var(--sidebar-foreground);
--color-sidebar: var(--sidebar);
--color-chart-5: var(--chart-5);
--color-chart-4: var(--chart-4);
--color-chart-3: var(--chart-3);
--color-chart-2: var(--chart-2);
--color-chart-1: var(--chart-1);
--color-ring: var(--ring);
--color-input: var(--input);
--color-border: var(--border);
--color-destructive: var(--destructive);
--color-accent-foreground: var(--accent-foreground);
--color-accent: var(--accent);
--color-muted-foreground: var(--muted-foreground);
--color-muted: var(--muted);
--color-secondary-foreground: var(--secondary-foreground);
--color-secondary: var(--secondary);
--color-primary-foreground: var(--primary-foreground);
--color-primary: var(--primary);
--color-popover-foreground: var(--popover-foreground);
--color-popover: var(--popover);
--color-card-foreground: var(--card-foreground);
--color-card: var(--card);
--radius-sm: calc(var(--radius) - 4px);
--radius-md: calc(var(--radius) - 2px);
--radius-lg: var(--radius);
--radius-xl: calc(var(--radius) + 4px);
}
:root {
--radius: 0.625rem;
--background: oklch(1 0 0);
--foreground: oklch(0.145 0 0);
--card: oklch(1 0 0);
--card-foreground: oklch(0.145 0 0);
--popover: oklch(1 0 0);
--popover-foreground: oklch(0.145 0 0);
--primary: oklch(0.205 0 0);
--primary-foreground: oklch(0.985 0 0);
--secondary: oklch(0.97 0 0);
--secondary-foreground: oklch(0.205 0 0);
--muted: oklch(0.97 0 0);
--muted-foreground: oklch(0.556 0 0);
--accent: oklch(0.97 0 0);
--accent-foreground: oklch(0.205 0 0);
--destructive: oklch(0.577 0.245 27.325);
--border: oklch(0.922 0 0);
--input: oklch(0.922 0 0);
--ring: oklch(0.708 0 0);
--chart-1: oklch(0.646 0.222 41.116);
--chart-2: oklch(0.6 0.118 184.704);
--chart-3: oklch(0.398 0.07 227.392);
--chart-4: oklch(0.828 0.189 84.429);
--chart-5: oklch(0.769 0.188 70.08);
--sidebar: oklch(0.985 0 0);
--sidebar-foreground: oklch(0.145 0 0);
--sidebar-primary: oklch(0.205 0 0);
--sidebar-primary-foreground: oklch(0.985 0 0);
--sidebar-accent: oklch(0.97 0 0);
--sidebar-accent-foreground: oklch(0.205 0 0);
--sidebar-border: oklch(0.922 0 0);
--sidebar-ring: oklch(0.708 0 0);
}
.dark {
--background: oklch(0.145 0 0);
--foreground: oklch(0.985 0 0);
--card: oklch(0.205 0 0);
--card-foreground: oklch(0.985 0 0);
--popover: oklch(0.205 0 0);
--popover-foreground: oklch(0.985 0 0);
--primary: oklch(0.922 0 0);
--primary-foreground: oklch(0.205 0 0);
--secondary: oklch(0.269 0 0);
--secondary-foreground: oklch(0.985 0 0);
--muted: oklch(0.269 0 0);
--muted-foreground: oklch(0.708 0 0);
--accent: oklch(0.269 0 0);
--accent-foreground: oklch(0.985 0 0);
--destructive: oklch(0.704 0.191 22.216);
--border: oklch(1 0 0 / 10%);
--input: oklch(1 0 0 / 15%);
--ring: oklch(0.556 0 0);
--chart-1: oklch(0.488 0.243 264.376);
--chart-2: oklch(0.696 0.17 162.48);
--chart-3: oklch(0.769 0.188 70.08);
--chart-4: oklch(0.627 0.265 303.9);
--chart-5: oklch(0.645 0.246 16.439);
--sidebar: oklch(0.205 0 0);
--sidebar-foreground: oklch(0.985 0 0);
--sidebar-primary: oklch(0.488 0.243 264.376);
--sidebar-primary-foreground: oklch(0.985 0 0);
--sidebar-accent: oklch(0.269 0 0);
--sidebar-accent-foreground: oklch(0.985 0 0);
--sidebar-border: oklch(1 0 0 / 10%);
--sidebar-ring: oklch(0.556 0 0);
}
@layer base {
* {
@apply border-border outline-ring/50;
}
body {
@apply bg-background text-foreground;
}
}
-34
View File
@@ -1,34 +0,0 @@
import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
import "./globals.css";
const geistSans = Geist({
variable: "--font-geist-sans",
subsets: ["latin"],
});
const geistMono = Geist_Mono({
variable: "--font-geist-mono",
subsets: ["latin"],
});
export const metadata: Metadata = {
title: "Create Next App",
description: "Generated by create next app",
};
export default function RootLayout({
children,
}: Readonly<{
children: React.ReactNode;
}>) {
return (
<html lang="en">
<body
className={`${geistSans.variable} ${geistMono.variable} antialiased`}
>
{children}
</body>
</html>
);
}
-123
View File
@@ -1,123 +0,0 @@
"use client";
// this is a temporary page for testing the email assistant (before setting up LangSmith)
import { useState } from "react";
import { EmailData } from "../../lib/schemas";
import { BaseMessage } from "@langchain/core/messages";
export default function TestEmailAssistant() {
const [emailInput, setEmailInput] = useState<string>(
`From: john@example.com
To: assistant@company.com
Subject: Meeting Request
Hi there,
Can we schedule a meeting for tomorrow at 2pm to discuss the project?
Thanks,
John`
);
const [output, setOutput] = useState<any>({});
const [loading, setLoading] = useState<boolean>(false);
const [messages, setMessages] = useState<BaseMessage[]>([]);
async function runTest() {
setLoading(true);
try {
// Call the API route instead of directly invoking the assistant
const response = await fetch('/api/email-assistant', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
email_input: emailInput
}),
});
if (!response.ok) {
throw new Error(`API responded with status: ${response.status}`);
}
const result = await response.json();
setOutput(result);
// Extract messages if they exist
if (result.messages && Array.isArray(result.messages)) {
setMessages(result.messages);
}
} catch (error: any) {
console.error("Error running email assistant:", error);
setOutput({ error: error.message });
} finally {
setLoading(false);
}
}
return (
<div className="container mx-auto p-4 max-w-4xl">
<h1 className="text-2xl font-bold mb-4">Email Assistant Test</h1>
<div className="mb-4">
<label className="block text-sm font-medium mb-2">Email Input</label>
<textarea
className="w-full h-64 p-2 border rounded shadow-sm font-mono text-sm"
value={emailInput}
onChange={(e) => setEmailInput(e.target.value)}
/>
</div>
<button
className="px-4 py-2 bg-blue-600 text-white rounded hover:bg-blue-700 mb-4"
onClick={runTest}
disabled={loading}
>
{loading ? "Running..." : "Test Email Assistant"}
</button>
<div className="mb-4">
<h2 className="text-xl font-semibold mb-2">Classification Result</h2>
<div className="p-4 bg-gray-100 rounded">
{output.classification_decision && (
<div className="text-lg font-medium">
{output.classification_decision === "respond" && "📧 RESPOND - This email requires a response"}
{output.classification_decision === "ignore" && "🚫 IGNORE - This email can be safely ignored"}
{output.classification_decision === "notify" && "🔔 NOTIFY - This email contains important information"}
</div>
)}
</div>
</div>
{messages.length > 0 && (
<div>
<h2 className="text-xl font-semibold mb-2">Messages</h2>
<div className="space-y-4">
{messages.map((message, index) => (
<div key={index} className="p-4 border rounded">
<div className="font-medium">{message.constructor.name}</div>
<div className="whitespace-pre-wrap">{String(message.content)}</div>
{message.additional_kwargs?.tool_calls && (
<div className="mt-2">
<div className="font-medium">Tool Calls:</div>
<pre className="bg-gray-100 p-2 rounded overflow-auto text-sm">
{JSON.stringify(message.additional_kwargs.tool_calls, null, 2)}
</pre>
</div>
)}
</div>
))}
</div>
</div>
)}
<div className="mt-4">
<h2 className="text-xl font-semibold mb-2">Full Output</h2>
<pre className="bg-gray-100 p-4 rounded overflow-auto text-sm">
{JSON.stringify(output, null, 2)}
</pre>
</div>
</div>
);
}
-21
View File
@@ -1,21 +0,0 @@
{
"$schema": "https://ui.shadcn.com/schema.json",
"style": "new-york",
"rsc": true,
"tsx": true,
"tailwind": {
"config": "",
"css": "app/globals.css",
"baseColor": "neutral",
"cssVariables": true,
"prefix": ""
},
"aliases": {
"components": "@/components",
"utils": "@/lib/utils",
"ui": "@/components/ui",
"lib": "@/lib",
"hooks": "@/hooks"
},
"iconLibrary": "lucide"
}
+1 -1
View File
@@ -10,7 +10,7 @@ const compat = new FlatCompat({
});
const eslintConfig = [
...compat.extends("next/core-web-vitals", "next/typescript"),
...compat.extends("eslint:recommended", "plugin:@typescript-eslint/recommended"),
];
export default eslintConfig;
-7
View File
@@ -1,7 +0,0 @@
import type { NextConfig } from "next";
const nextConfig: NextConfig = {
/* config options here */
};
export default nextConfig;
+5 -18
View File
@@ -2,38 +2,25 @@
"name": "email-assistant",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"dev": "next dev --turbopack",
"build": "next build",
"start": "next start",
"lint": "next lint",
"dev": "tsx scripts/dev.ts",
"lint": "eslint .",
"agent": "langgraphjs dev"
},
"dependencies": {
"@langchain/core": "^0.3.49",
"@langchain/langgraph": "^0.2.67",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"langchain": "^0.3.24",
"lucide-react": "^0.503.0",
"next": "15.3.1",
"react": "^19.0.0",
"react-dom": "^19.0.0",
"tailwind-merge": "^3.2.0",
"zod": "^3.24.3"
},
"devDependencies": {
"@eslint/eslintrc": "^3",
"@langchain/langgraph-cli": "^0.0.30",
"@tailwindcss/postcss": "^4",
"@types/node": "^20",
"@types/react": "^19",
"@types/react-dom": "^19",
"eslint": "^9",
"eslint-config-next": "15.3.1",
"tailwindcss": "^4",
"tw-animate-css": "^1.2.8",
"typescript": "^5"
"typescript": "^5",
"tsx": "^4.7.0"
},
"packageManager": "pnpm@10.9.0+sha512.0486e394640d3c1fb3c9d43d49cf92879ff74f8516959c235308f5a8f62e2e19528a65cdc2a3058f587cde71eba3d5b56327c8c33a97e4c4051ca48a10ca2d5f"
}
+115 -2551
View File
File diff suppressed because it is too large Load Diff
-5
View File
@@ -1,5 +0,0 @@
const config = {
plugins: ["@tailwindcss/postcss"],
};
export default config;
-1
View File
@@ -1 +0,0 @@
<svg fill="none" viewBox="0 0 16 16" xmlns="http://www.w3.org/2000/svg"><path d="M14.5 13.5V5.41a1 1 0 0 0-.3-.7L9.8.29A1 1 0 0 0 9.08 0H1.5v13.5A2.5 2.5 0 0 0 4 16h8a2.5 2.5 0 0 0 2.5-2.5m-1.5 0v-7H8v-5H3v12a1 1 0 0 0 1 1h8a1 1 0 0 0 1-1M9.5 5V2.12L12.38 5zM5.13 5h-.62v1.25h2.12V5zm-.62 3h7.12v1.25H4.5zm.62 3h-.62v1.25h7.12V11z" clip-rule="evenodd" fill="#666" fill-rule="evenodd"/></svg>

Before

Width:  |  Height:  |  Size: 391 B

-1
View File
@@ -1 +0,0 @@
<svg fill="none" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16"><g clip-path="url(#a)"><path fill-rule="evenodd" clip-rule="evenodd" d="M10.27 14.1a6.5 6.5 0 0 0 3.67-3.45q-1.24.21-2.7.34-.31 1.83-.97 3.1M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16m.48-1.52a7 7 0 0 1-.96 0H7.5a4 4 0 0 1-.84-1.32q-.38-.89-.63-2.08a40 40 0 0 0 3.92 0q-.25 1.2-.63 2.08a4 4 0 0 1-.84 1.31zm2.94-4.76q1.66-.15 2.95-.43a7 7 0 0 0 0-2.58q-1.3-.27-2.95-.43a18 18 0 0 1 0 3.44m-1.27-3.54a17 17 0 0 1 0 3.64 39 39 0 0 1-4.3 0 17 17 0 0 1 0-3.64 39 39 0 0 1 4.3 0m1.1-1.17q1.45.13 2.69.34a6.5 6.5 0 0 0-3.67-3.44q.65 1.26.98 3.1M8.48 1.5l.01.02q.41.37.84 1.31.38.89.63 2.08a40 40 0 0 0-3.92 0q.25-1.2.63-2.08a4 4 0 0 1 .85-1.32 7 7 0 0 1 .96 0m-2.75.4a6.5 6.5 0 0 0-3.67 3.44 29 29 0 0 1 2.7-.34q.31-1.83.97-3.1M4.58 6.28q-1.66.16-2.95.43a7 7 0 0 0 0 2.58q1.3.27 2.95.43a18 18 0 0 1 0-3.44m.17 4.71q-1.45-.12-2.69-.34a6.5 6.5 0 0 0 3.67 3.44q-.65-1.27-.98-3.1" fill="#666"/></g><defs><clipPath id="a"><path fill="#fff" d="M0 0h16v16H0z"/></clipPath></defs></svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

-1
View File
@@ -1 +0,0 @@
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 394 80"><path fill="#000" d="M262 0h68.5v12.7h-27.2v66.6h-13.6V12.7H262V0ZM149 0v12.7H94v20.4h44.3v12.6H94v21h55v12.6H80.5V0h68.7zm34.3 0h-17.8l63.8 79.4h17.9l-32-39.7 32-39.6h-17.9l-23 28.6-23-28.6zm18.3 56.7-9-11-27.1 33.7h17.8l18.3-22.7z"/><path fill="#000" d="M81 79.3 17 0H0v79.3h13.6V17l50.2 62.3H81Zm252.6-.4c-1 0-1.8-.4-2.5-1s-1.1-1.6-1.1-2.6.3-1.8 1-2.5 1.6-1 2.6-1 1.8.3 2.5 1a3.4 3.4 0 0 1 .6 4.3 3.7 3.7 0 0 1-3 1.8zm23.2-33.5h6v23.3c0 2.1-.4 4-1.3 5.5a9.1 9.1 0 0 1-3.8 3.5c-1.6.8-3.5 1.3-5.7 1.3-2 0-3.7-.4-5.3-1s-2.8-1.8-3.7-3.2c-.9-1.3-1.4-3-1.4-5h6c.1.8.3 1.6.7 2.2s1 1.2 1.6 1.5c.7.4 1.5.5 2.4.5 1 0 1.8-.2 2.4-.6a4 4 0 0 0 1.6-1.8c.3-.8.5-1.8.5-3V45.5zm30.9 9.1a4.4 4.4 0 0 0-2-3.3 7.5 7.5 0 0 0-4.3-1.1c-1.3 0-2.4.2-3.3.5-.9.4-1.6 1-2 1.6a3.5 3.5 0 0 0-.3 4c.3.5.7.9 1.3 1.2l1.8 1 2 .5 3.2.8c1.3.3 2.5.7 3.7 1.2a13 13 0 0 1 3.2 1.8 8.1 8.1 0 0 1 3 6.5c0 2-.5 3.7-1.5 5.1a10 10 0 0 1-4.4 3.5c-1.8.8-4.1 1.2-6.8 1.2-2.6 0-4.9-.4-6.8-1.2-2-.8-3.4-2-4.5-3.5a10 10 0 0 1-1.7-5.6h6a5 5 0 0 0 3.5 4.6c1 .4 2.2.6 3.4.6 1.3 0 2.5-.2 3.5-.6 1-.4 1.8-1 2.4-1.7a4 4 0 0 0 .8-2.4c0-.9-.2-1.6-.7-2.2a11 11 0 0 0-2.1-1.4l-3.2-1-3.8-1c-2.8-.7-5-1.7-6.6-3.2a7.2 7.2 0 0 1-2.4-5.7 8 8 0 0 1 1.7-5 10 10 0 0 1 4.3-3.5c2-.8 4-1.2 6.4-1.2 2.3 0 4.4.4 6.2 1.2 1.8.8 3.2 2 4.3 3.4 1 1.4 1.5 3 1.5 5h-5.8z"/></svg>

Before

Width:  |  Height:  |  Size: 1.3 KiB

-1
View File
@@ -1 +0,0 @@
<svg fill="none" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1155 1000"><path d="m577.3 0 577.4 1000H0z" fill="#fff"/></svg>

Before

Width:  |  Height:  |  Size: 128 B

-1
View File
@@ -1 +0,0 @@
<svg fill="none" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16"><path fill-rule="evenodd" clip-rule="evenodd" d="M1.5 2.5h13v10a1 1 0 0 1-1 1h-11a1 1 0 0 1-1-1zM0 1h16v11.5a2.5 2.5 0 0 1-2.5 2.5h-11A2.5 2.5 0 0 1 0 12.5zm3.75 4.5a.75.75 0 1 0 0-1.5.75.75 0 0 0 0 1.5M7 4.75a.75.75 0 1 1-1.5 0 .75.75 0 0 1 1.5 0m1.75.75a.75.75 0 1 0 0-1.5.75.75 0 0 0 0 1.5" fill="#666"/></svg>

Before

Width:  |  Height:  |  Size: 385 B

-28
View File
@@ -1,28 +0,0 @@
"""Define the configurable parameters for the agent."""
import os
from dataclasses import dataclass, fields
from typing import Any, Optional
from langchain_core.runnables import RunnableConfig
@dataclass(kw_only=True)
class Configuration:
"""Main configuration class."""
# llm =
@classmethod
def from_runnable_config(
cls, config: Optional[RunnableConfig] = None
) -> "Configuration":
"""Create a Configuration instance from a RunnableConfig."""
configurable = (
config["configurable"] if config and "configurable" in config else {}
)
values: dict[str, Any] = {
f.name: os.environ.get(f.name.upper(), configurable.get(f.name))
for f in fields(cls)
if f.init
}
return cls(**{k: v for k, v in values.items() if v})
-159
View File
@@ -1,159 +0,0 @@
from typing import Literal
from langchain.chat_models import init_chat_model
from src.email_assistant.tools import get_tools, get_tools_by_name
from src.email_assistant.tools.default.prompt_templates import AGENT_TOOLS_PROMPT
from src.email_assistant.prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt, default_background, default_triage_instructions, default_response_preferences, default_cal_preferences
from src.email_assistant.schemas import State, RouterSchema, StateInput
from src.email_assistant.utils import parse_email, format_email_markdown
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
# Get tools
tools = get_tools()
tools_by_name = get_tools_by_name(tools)
# Initialize the LLM for use with router / structured output
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_router = llm.with_structured_output(RouterSchema)
# Initialize the LLM, enforcing tool use (of any available tools) for agent
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_with_tools = llm.bind_tools(tools, tool_choice="required")
# Nodes
def llm_call(state: State):
"""LLM decides whether to call a tool or not"""
return {
"messages": [
llm_with_tools.invoke(
[
{"role": "system", "content": agent_system_prompt.format(
tools_prompt=AGENT_TOOLS_PROMPT,
background=default_background,
response_preferences=default_response_preferences,
cal_preferences=default_cal_preferences)
},
]
+ state["messages"]
)
]
}
def tool_node(state: dict):
"""Performs the tool call"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append({"role": "tool", "content" : observation, "tool_call_id": tool_call["id"]})
return {"messages": result}
# Conditional edge function
def should_continue(state: State) -> Literal["Action", END]:
"""Route to Action, or end if Done tool called"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
for tool_call in last_message.tool_calls:
if tool_call["name"] == "Done":
return END
else:
return "Action"
# Build workflow
agent_builder = StateGraph(State)
# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("environment", tool_node)
# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
{
# Name returned by should_continue : Name of next node to visit
"Action": "environment",
END: END,
},
)
agent_builder.add_edge("environment", "llm_call")
# Compile the agent
agent = agent_builder.compile()
def triage_router(state: State) -> Command[Literal["response_agent", "__end__"]]:
"""Analyze email content to decide if we should respond, notify, or ignore.
The triage step prevents the assistant from wasting time on:
- Marketing emails and spam
- Company-wide announcements
- Messages meant for other teams
"""
author, to, subject, email_thread = parse_email(state["email_input"])
system_prompt = triage_system_prompt.format(
background=default_background,
triage_instructions=default_triage_instructions
)
user_prompt = triage_user_prompt.format(
author=author, to=to, subject=subject, email_thread=email_thread
)
# Create email markdown for Agent Inbox in case of notification
email_markdown = format_email_markdown(subject, author, to, email_thread)
# Run the router LLM
result = llm_router.invoke(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
)
# Decision
classification = result.classification
if classification == "respond":
print("📧 Classification: RESPOND - This email requires a response")
goto = "response_agent"
# Add the email to the messages
update = {
"classification_decision": result.classification,
"messages": [{"role": "user",
"content": f"Respond to the email: {email_markdown}"
}],
}
elif result.classification == "ignore":
print("🚫 Classification: IGNORE - This email can be safely ignored")
update = {
"classification_decision": result.classification,
}
goto = END
elif result.classification == "notify":
# If real life, this would do something else
print("🔔 Classification: NOTIFY - This email contains important information")
update = {
"classification_decision": result.classification,
}
goto = END
else:
raise ValueError(f"Invalid classification: {result.classification}")
return Command(goto=goto, update=update)
# Build workflow
overall_workflow = (
StateGraph(State, input=StateInput)
.add_node(triage_router)
.add_node("response_agent", agent)
.add_edge(START, "triage_router")
)
email_assistant = overall_workflow.compile()
-391
View File
@@ -1,391 +0,0 @@
from typing import Literal
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from src.email_assistant.tools import get_tools, get_tools_by_name
from src.email_assistant.tools.default.prompt_templates import HITL_TOOLS_PROMPT
from src.email_assistant.prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt_hitl, default_background, default_triage_instructions, default_response_preferences, default_cal_preferences
from src.email_assistant.schemas import State, RouterSchema, StateInput
from src.email_assistant.utils import parse_email, format_for_display, format_email_markdown
# Get tools
tools = get_tools(["write_email", "schedule_meeting", "check_calendar_availability", "Question", "Done"])
tools_by_name = get_tools_by_name(tools)
# Initialize the LLM for use with router / structured output
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_router = llm.with_structured_output(RouterSchema)
# Initialize the LLM, enforcing tool use (of any available tools) for agent
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_with_tools = llm.bind_tools(tools, tool_choice="required")
# Nodes
def triage_router(state: State) -> Command[Literal["triage_interrupt_handler", "response_agent", "__end__"]]:
"""Analyze email content to decide if we should respond, notify, or ignore.
The triage step prevents the assistant from wasting time on:
- Marketing emails and spam
- Company-wide announcements
- Messages meant for other teams
"""
# Parse the email input
author, to, subject, email_thread = parse_email(state["email_input"])
user_prompt = triage_user_prompt.format(
author=author, to=to, subject=subject, email_thread=email_thread
)
# Create email markdown for Agent Inbox in case of notification
email_markdown = format_email_markdown(subject, author, to, email_thread)
# Format system prompt with background and triage instructions
system_prompt = triage_system_prompt.format(
background=default_background,
triage_instructions=default_triage_instructions
)
# Run the router LLM
result = llm_router.invoke(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
)
# Decision
classification = result.classification
# Process the classification decision
if classification == "respond":
print("📧 Classification: RESPOND - This email requires a response")
# Next node
goto = "response_agent"
# Update the state
update = {
"classification_decision": result.classification,
"messages": [{"role": "user",
"content": f"Respond to the email: {email_markdown}"
}],
}
elif classification == "ignore":
print("🚫 Classification: IGNORE - This email can be safely ignored")
# Next node
goto = END
# Update the state
update = {
"classification_decision": classification,
}
elif classification == "notify":
print("🔔 Classification: NOTIFY - This email contains important information")
# Next node
goto = "triage_interrupt_handler"
# Update the state
update = {
"classification_decision": classification,
}
else:
raise ValueError(f"Invalid classification: {classification}")
return Command(goto=goto, update=update)
def triage_interrupt_handler(state: State) -> Command[Literal["response_agent", "__end__"]]:
"""Handles interrupts from the triage step"""
# Parse the email input
author, to, subject, email_thread = parse_email(state["email_input"])
# Create email markdown for Agent Inbox in case of notification
email_markdown = format_email_markdown(subject, author, to, email_thread)
# Create messages
messages = [{"role": "user",
"content": f"Email to notify user about: {email_markdown}"
}]
# Create interrupt for Agent Inbox
request = {
"action_request": {
"action": f"Email Assistant: {state['classification_decision']}",
"args": {}
},
"config": {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": False,
"allow_accept": False,
},
# Email to show in Agent Inbox
"description": email_markdown,
}
# Agent Inbox responds with a list
response = interrupt([request])[0]
# If user provides feedback, go to response agent and use feedback to respond to email
if response["type"] == "response":
# Add feedback to messages
user_input = response["args"]
# Used by the response agent
messages.append({"role": "user",
"content": f"User wants to reply to the email. Use this feedback to respond: {user_input}"
})
# Go to response agent
goto = "response_agent"
# If user ignores email, go to END
elif response["type"] == "ignore":
goto = END
# Catch all other responses
else:
raise ValueError(f"Invalid response: {response}")
# Update the state
update = {
"messages": messages,
}
return Command(goto=goto, update=update)
def llm_call(state: State):
"""LLM decides whether to call a tool or not"""
return {
"messages": [
llm_with_tools.invoke(
[
{"role": "system", "content": agent_system_prompt_hitl.format(
tools_prompt=HITL_TOOLS_PROMPT,
background=default_background,
response_preferences=default_response_preferences,
cal_preferences=default_cal_preferences
)}
]
+ state["messages"]
)
]
}
def interrupt_handler(state: State) -> Command[Literal["llm_call", "__end__"]]:
"""Creates an interrupt for human review of tool calls"""
# Store messages
result = []
# Go to the LLM call node next
goto = "llm_call"
# Iterate over the tool calls in the last message
for tool_call in state["messages"][-1].tool_calls:
# Allowed tools for HITL
hitl_tools = ["write_email", "schedule_meeting", "Question"]
# If tool is not in our HITL list, execute it directly without interruption
if tool_call["name"] not in hitl_tools:
# Execute search_memory and other tools without interruption
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append({"role": "tool", "content": observation, "tool_call_id": tool_call["id"]})
continue
# Get original email from email_input in state
email_input = state["email_input"]
author, to, subject, email_thread = parse_email(email_input)
original_email_markdown = format_email_markdown(subject, author, to, email_thread)
# Format tool call for display and prepend the original email
tool_display = format_for_display(state, tool_call)
description = original_email_markdown + tool_display
# Configure what actions are allowed in Agent Inbox
if tool_call["name"] == "write_email":
config = {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": True,
"allow_accept": True,
}
elif tool_call["name"] == "schedule_meeting":
config = {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": True,
"allow_accept": True,
}
elif tool_call["name"] == "Question":
config = {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": False,
"allow_accept": False,
}
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
# Create the interrupt request
request = {
"action_request": {
"action": tool_call["name"],
"args": tool_call["args"]
},
"config": config,
"description": description,
}
# Send to Agent Inbox and wait for response
response = interrupt([request])[0]
# Handle the responses
if response["type"] == "accept":
# Execute the tool with original args
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append({"role": "tool", "content": observation, "tool_call_id": tool_call["id"]})
elif response["type"] == "edit":
# Tool selection
tool = tools_by_name[tool_call["name"]]
# Get edited args from Agent Inbox
edited_args = response["args"]["args"]
# Update the write_email tool call with the edited content from Agent Inbox
if tool_call["name"] == "write_email":
# Update the AI message's tool call with edited content (reference to the message in the state)
ai_message = state["messages"][-1]
current_id = tool_call["id"]
# Replace the original tool call with the edited one (any changes made to this reference affect the original object in the state)
ai_message.tool_calls = [tc for tc in ai_message.tool_calls if tc["id"] != current_id] + [
{"type": "tool_call", "name": tool_call["name"], "args": edited_args, "id": current_id}
]
# Execute the tool with edited args
observation = tool.invoke(edited_args)
# Add only the tool response message
result.append({"role": "tool", "content": observation, "tool_call_id": current_id})
# Update the schedule_meeting tool call with the edited content from Agent Inbox
elif tool_call["name"] == "schedule_meeting":
# Update the AI message's tool call with edited content
ai_message = state["messages"][-1]
current_id = tool_call["id"]
# Replace the original tool call with the edited one
ai_message.tool_calls = [tc for tc in ai_message.tool_calls if tc["id"] != current_id] + [
{"type": "tool_call", "name": tool_call["name"], "args": edited_args, "id": current_id}
]
# Execute the tool with edited args
observation = tool.invoke(edited_args)
# Add only the tool response message
result.append({"role": "tool", "content": observation, "tool_call_id": current_id})
# Catch all other tool calls
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
elif response["type"] == "ignore":
if tool_call["name"] == "write_email":
# Don't execute the tool, and tell the agent how to proceed
result.append({"role": "tool", "content": "User ignored this email draft. Ignore this email and end the workflow.", "tool_call_id": tool_call["id"]})
# Go to END
goto = END
elif tool_call["name"] == "schedule_meeting":
# Don't execute the tool, and tell the agent how to proceed
result.append({"role": "tool", "content": "User ignored this calendar meeting draft. Ignore this email and end the workflow.", "tool_call_id": tool_call["id"]})
# Go to END
goto = END
elif tool_call["name"] == "Question":
# Don't execute the tool, and tell the agent how to proceed
result.append({"role": "tool", "content": "User ignored this question. Ignore this email and end the workflow.", "tool_call_id": tool_call["id"]})
# Go to END
goto = END
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
elif response["type"] == "response":
# User provided feedback
user_feedback = response["args"]
if tool_call["name"] == "write_email":
# Don't execute the tool, and add a message with the user feedback to incorporate into the email
result.append({"role": "tool", "content": f"User gave feedback, which can we incorporate into the email. Feedback: {user_feedback}", "tool_call_id": tool_call["id"]})
elif tool_call["name"] == "schedule_meeting":
# Don't execute the tool, and add a message with the user feedback to incorporate into the email
result.append({"role": "tool", "content": f"User gave feedback, which can we incorporate into the meeting request. Feedback: {user_feedback}", "tool_call_id": tool_call["id"]})
elif tool_call["name"] == "Question":
# Don't execute the tool, and add a message with the user feedback to incorporate into the email
result.append({"role": "tool", "content": f"User answered the question, which can we can use for any follow up actions. Feedback: {user_feedback}", "tool_call_id": tool_call["id"]})
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
# Catch all other responses
else:
raise ValueError(f"Invalid response: {response}")
# Update the state
update = {
"messages": result,
}
return Command(goto=goto, update=update)
# Conditional edge function
def should_continue(state: State) -> Literal["interrupt_handler", END]:
"""Route to tool handler, or end if Done tool called"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
for tool_call in last_message.tool_calls:
if tool_call["name"] == "Done":
return END
else:
return "interrupt_handler"
# Build workflow
agent_builder = StateGraph(State)
# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("interrupt_handler", interrupt_handler)
# Add edges
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
{
"interrupt_handler": "interrupt_handler",
END: END,
},
)
# Compile the agent
response_agent = agent_builder.compile()
# Build overall workflow
overall_workflow = (
StateGraph(State, input=StateInput)
.add_node(triage_router)
.add_node(triage_interrupt_handler)
.add_node("response_agent", response_agent)
.add_edge(START, "triage_router")
)
email_assistant = overall_workflow.compile()
-580
View File
@@ -1,580 +0,0 @@
import os
from typing import Literal
from pydantic import BaseModel
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, END
from langgraph.store.base import BaseStore
from langgraph.types import interrupt, Command
from src.email_assistant.tools import get_tools, get_tools_by_name
from src.email_assistant.tools.default.prompt_templates import HITL_MEMORY_TOOLS_PROMPT
from src.email_assistant.prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt_hitl_memory, default_triage_instructions, default_background, default_response_preferences, default_cal_preferences
from src.email_assistant.schemas import State, RouterSchema, StateInput
from src.email_assistant.utils import parse_email, format_for_display, format_email_markdown
# Get tools
tools = get_tools(["write_email", "schedule_meeting", "check_calendar_availability", "Question", "Done"])
tools_by_name = get_tools_by_name(tools)
# Initialize the LLM for use with router / structured output
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_router = llm.with_structured_output(RouterSchema)
# Initialize the LLM, enforcing tool use (of any available tools) for agent
llm = init_chat_model("openai:gpt-4.1", temperature=0.0)
llm_with_tools = llm.bind_tools(tools, tool_choice="required")
def get_memory(store, namespace, default_content=None):
"""Get memory from the store or initialize with default if it doesn't exist.
Args:
store: LangGraph BaseStore instance to search for existing memory
namespace: Tuple defining the memory namespace, e.g. ("email_assistant", "triage_preferences")
default_content: Default content to use if memory doesn't exist
Returns:
str: The content of the memory profile, either from existing memory or the default
"""
# Search for existing memory with namespace and key
user_preferences = store.get(namespace, "user_preferences")
# If memory exists, return its content (the value)
if user_preferences:
return user_preferences.value
# If memory doesn't exist, add it to the store and return the default content
else:
# Namespace, key, value
store.put(namespace, "user_preferences", default_content)
user_preferences = default_content
# Return the default content
return user_preferences
class UserPreferences(BaseModel):
"""User preferences."""
preferences: str
justification: str
MEMORY_UPDATE_INSTRUCTIONS = """
# Role and Objective
You are a memory profile manager for an email assistant agent that selectively updates user preferences based on feedback messages from human-in-the-loop interactions with the email assistant.
# Instructions
- NEVER overwrite the entire memory profile
- ONLY make targeted additions of new information
- ONLY update specific facts that are directly contradicted by feedback messages
- PRESERVE all other existing information in the profile
- Format the profile consistently with the original style
- Generate the profile as a string
# Reasoning Steps
1. Analyze the current memory profile structure and content
2. Review feedback messages from human-in-the-loop interactions
3. Extract relevant user preferences from these feedback messages (such as edits to emails/calendar invites, explicit feedback on assistant performance, user decisions to ignore certain emails)
4. Compare new information against existing profile
5. Identify only specific facts to add or update
6. Preserve all other existing information
7. Output the complete updated profile
# Example
<memory_profile>
RESPOND:
- wife
- specific questions
- system admin notifications
NOTIFY:
- meeting invites
IGNORE:
- marketing emails
- company-wide announcements
- messages meant for other teams
</memory_profile>
<user_messages>
"The assistant shouldn't have responded to that system admin notification."
</user_messages>
<updated_profile>
RESPOND:
- wife
- specific questions
NOTIFY:
- meeting invites
- system admin notifications
IGNORE:
- marketing emails
- company-wide announcements
- messages meant for other teams
</updated_profile>
# Process current profile for {namespace}
<memory_profile>
{current_profile}
</memory_profile>
Think step by step about what specific feedback is being provided and what specific information should be added or updated in the profile while preserving everything else."""
MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT = """
Remember:
- NEVER overwrite the entire profile
- ONLY make targeted additions or changes based on explicit feedback
- PRESERVE all existing information not directly contradicted
- Output the complete updated profile as a string
"""
def update_memory(store, namespace, messages):
"""Update memory profile in the store.
Args:
store: LangGraph BaseStore instance to update memory
namespace: Tuple defining the memory namespace, e.g. ("email_assistant", "triage_preferences")
messages: List of messages to update the memory with
"""
# Get the existing memory
user_preferences = store.get(namespace, "user_preferences")
# Update the memory
llm = init_chat_model("openai:gpt-4.1", temperature=0.0).with_structured_output(UserPreferences)
result = llm.invoke(
[
{"role": "system", "content": MEMORY_UPDATE_INSTRUCTIONS.format(current_profile=user_preferences.value, namespace=namespace)},
{"role": "user", "content": f"Think carefully and update the memory profile based upon these user messages:"}
] + messages
)
# Save the updated memory to the store
store.put(namespace, "user_preferences", result.preferences)
# Nodes
def triage_router(state: State, store: BaseStore) -> Command[Literal["triage_interrupt_handler", "response_agent", "__end__"]]:
"""Analyze email content to decide if we should respond, notify, or ignore.
The triage step prevents the assistant from wasting time on:
- Marketing emails and spam
- Company-wide announcements
- Messages meant for other teams
"""
# Parse the email input
author, to, subject, email_thread = parse_email(state["email_input"])
user_prompt = triage_user_prompt.format(
author=author, to=to, subject=subject, email_thread=email_thread
)
# Create email markdown for Agent Inbox in case of notification
email_markdown = format_email_markdown(subject, author, to, email_thread)
# Search for existing triage_preferences memory
triage_instructions = get_memory(store, ("email_assistant", "triage_preferences"), default_triage_instructions)
# Format system prompt with background and triage instructions
system_prompt = triage_system_prompt.format(
background=default_background,
triage_instructions=triage_instructions,
)
# Run the router LLM
result = llm_router.invoke(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
)
# Decision
classification = result.classification
# Process the classification decision
if classification == "respond":
print("📧 Classification: RESPOND - This email requires a response")
# Next node
goto = "response_agent"
# Update the state
update = {
"classification_decision": result.classification,
"messages": [{"role": "user",
"content": f"Respond to the email: {email_markdown}"
}],
}
elif classification == "ignore":
print("🚫 Classification: IGNORE - This email can be safely ignored")
# Next node
goto = END
# Update the state
update = {
"classification_decision": classification,
}
elif classification == "notify":
print("🔔 Classification: NOTIFY - This email contains important information")
# Next node
goto = "triage_interrupt_handler"
# Update the state
update = {
"classification_decision": classification,
}
else:
raise ValueError(f"Invalid classification: {classification}")
return Command(goto=goto, update=update)
def triage_interrupt_handler(state: State, store: BaseStore) -> Command[Literal["response_agent", "__end__"]]:
"""Handles interrupts from the triage step"""
# Parse the email input
author, to, subject, email_thread = parse_email(state["email_input"])
# Create email markdown for Agent Inbox in case of notification
email_markdown = format_email_markdown(subject, author, to, email_thread)
# Create messages
messages = [{"role": "user",
"content": f"Email to notify user about: {email_markdown}"
}]
# Create interrupt for Agent Inbox
request = {
"action_request": {
"action": f"Email Assistant: {state['classification_decision']}",
"args": {}
},
"config": {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": False,
"allow_accept": False,
},
# Email to show in Agent Inbox
"description": email_markdown,
}
# Send to Agent Inbox and wait for response
response = interrupt([request])[0]
# If user provides feedback, go to response agent and use feedback to respond to email
if response["type"] == "response":
# Add feedback to messages
user_input = response["args"]
messages.append({"role": "user",
"content": f"User wants to reply to the email. Use this feedback to respond: {user_input}"
})
# Update memory with feedback
update_memory(store, ("email_assistant", "triage_preferences"), [{
"role": "user",
"content": f"The user decided to respond to the email, so update the triage preferences to capture this."
}] + messages)
goto = "response_agent"
# If user ignores email, go to END
elif response["type"] == "ignore":
# Make note of the user's decision to ignore the email
messages.append({"role": "user",
"content": f"The user decided to ignore the email even though it was classified as notify. Update triage preferences to capture this."
})
# Update memory with feedback using the memory manager
update_memory(store, ("email_assistant", "triage_preferences"), messages)
goto = END
# Catch all other responses
else:
raise ValueError(f"Invalid response: {response}")
# Update the state
update = {
"messages": messages,
}
return Command(goto=goto, update=update)
def llm_call(state: State, store: BaseStore):
"""LLM decides whether to call a tool or not"""
# Search for existing cal_preferences memory
cal_preferences = get_memory(store, ("email_assistant", "cal_preferences"), default_cal_preferences)
# Search for existing response_preferences memory
response_preferences = get_memory(store, ("email_assistant", "response_preferences"), default_response_preferences)
return {
"messages": [
llm_with_tools.invoke(
[
{"role": "system", "content": agent_system_prompt_hitl_memory.format(
tools_prompt=HITL_MEMORY_TOOLS_PROMPT,
background=default_background,
response_preferences=response_preferences,
cal_preferences=cal_preferences
)}
]
+ state["messages"]
)
]
}
def interrupt_handler(state: State, store: BaseStore) -> Command[Literal["llm_call", "__end__"]]:
"""Creates an interrupt for human review of tool calls"""
# Store messages
result = []
# Go to the LLM call node next
goto = "llm_call"
# Iterate over the tool calls in the last message
for tool_call in state["messages"][-1].tool_calls:
# Allowed tools for HITL
hitl_tools = ["write_email", "schedule_meeting", "Question"]
# If tool is not in our HITL list, execute it directly without interruption
if tool_call["name"] not in hitl_tools:
# Execute search_memory and other tools without interruption
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append({"role": "tool", "content": observation, "tool_call_id": tool_call["id"]})
continue
# Get original email from email_input in state
email_input = state["email_input"]
author, to, subject, email_thread = parse_email(email_input)
original_email_markdown = format_email_markdown(subject, author, to, email_thread)
# Format tool call for display and prepend the original email
tool_display = format_for_display(state, tool_call)
description = original_email_markdown + tool_display
# Configure what actions are allowed in Agent Inbox
if tool_call["name"] == "write_email":
config = {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": True,
"allow_accept": True,
}
elif tool_call["name"] == "schedule_meeting":
config = {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": True,
"allow_accept": True,
}
elif tool_call["name"] == "Question":
config = {
"allow_ignore": True,
"allow_respond": True,
"allow_edit": False,
"allow_accept": False,
}
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
# Create the interrupt request
request = {
"action_request": {
"action": tool_call["name"],
"args": tool_call["args"]
},
"config": config,
"description": description,
}
# Send to Agent Inbox and wait for response
response = interrupt([request])[0]
# Handle the responses
if response["type"] == "accept":
# Execute the tool with original args
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append({"role": "tool", "content": observation, "tool_call_id": tool_call["id"]})
elif response["type"] == "edit":
# Tool selection
tool = tools_by_name[tool_call["name"]]
# Get edited args from Agent Inbox
edited_args = response["args"]["args"]
# Save feedback in memory and update the write_email tool call with the edited content from Agent Inbox
if tool_call["name"] == "write_email":
# Capture the initial tool call
initial_tool_call = tool_call["name"] + ": " + str(tool_call["args"])
# Update the AI message's tool call with edited content (reference to the message in the state)
ai_message = state["messages"][-1]
current_id = tool_call["id"]
# Replace the original tool call with the edited one (any changes made to this reference affect the original object in the state)
ai_message.tool_calls = [tc for tc in ai_message.tool_calls if tc["id"] != current_id] + [
{"type": "tool_call", "name": tool_call["name"], "args": edited_args, "id": current_id}
]
# Execute the tool with edited args
observation = tool.invoke(edited_args)
# Add only the tool response message
result.append({"role": "tool", "content": observation, "tool_call_id": current_id})
# We update the memory
update_memory(store, ("email_assistant", "response_preferences"), [{
"role": "user",
"content": f"User edited the email response. Here is the initial email generated by the assistant: {initial_tool_call}. Here is the edited email: {edited_args}. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
# Save feedback in memory and update the schedule_meeting tool call with the edited content from Agent Inbox
elif tool_call["name"] == "schedule_meeting":
# Capture the initial tool call
initial_tool_call = tool_call["name"] + ": " + str(tool_call["args"])
# Update the AI message's tool call with edited content
ai_message = state["messages"][-1]
current_id = tool_call["id"]
# Replace the original tool call with the edited one
ai_message.tool_calls = [tc for tc in ai_message.tool_calls if tc["id"] != current_id] + [
{"type": "tool_call", "name": tool_call["name"], "args": edited_args, "id": current_id}
]
# Execute the tool with edited args
observation = tool.invoke(edited_args)
# Add only the tool response message
result.append({"role": "tool", "content": observation, "tool_call_id": current_id})
# Update the memory
update_memory(store, ("email_assistant", "cal_preferences"), [{
"role": "user",
"content": f"User edited the calendar invitation. Here is the initial calendar invitation generated by the assistant: {initial_tool_call}. Here is the edited calendar invitation: {edited_args}. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
# Catch all other tool calls
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
elif response["type"] == "ignore":
if tool_call["name"] == "write_email":
# Don't execute the tool, and tell the agent how to proceed
result.append({"role": "tool", "content": "User ignored this email draft. Ignore this email and end the workflow.", "tool_call_id": tool_call["id"]})
# Go to END
goto = END
# Update the memory by reflecting on the email tool call
update_memory(store, ("email_assistant", "triage_preferences"), state["messages"] + result + [{
"role": "user",
"content": f"The user ignored the email draft. That means they did not want to respond to the email. Update the triage preferences to ensure emails of this type are not classified as respond. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
elif tool_call["name"] == "schedule_meeting":
# Don't execute the tool, and tell the agent how to proceed
result.append({"role": "tool", "content": "User ignored this calendar meeting draft. Ignore this email and end the workflow.", "tool_call_id": tool_call["id"]})
# Go to END
goto = END
# Update the memory by reflecting on the full message history including the schedule_meeting tool call
update_memory(store, ("email_assistant", "triage_preferences"), state["messages"] + result + [{
"role": "user",
"content": f"The user ignored the calendar meeting draft. That means they did not want to schedule a meeting for this email. Update the triage preferences to ensure emails of this type are not classified as respond. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
elif tool_call["name"] == "Question":
# Don't execute the tool, and tell the agent how to proceed
result.append({"role": "tool", "content": "User ignored this question. Ignore this email and end the workflow.", "tool_call_id": tool_call["id"]})
# Go to END
goto = END
# Update the memory by reflecting on the full message history including the Question tool call
update_memory(store, ("email_assistant", "triage_preferences"), state["messages"] + result + [{
"role": "user",
"content": f"The user ignored the Question. That means they did not want to answer the question or deal with this email. Update the triage preferences to ensure emails of this type are not classified as respond. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
elif response["type"] == "response":
# User provided feedback
user_feedback = response["args"]
if tool_call["name"] == "write_email":
# Don't execute the tool, and add a message with the user feedback to incorporate into the email
result.append({"role": "tool", "content": f"User gave feedback, which can we incorporate into the email. Feedback: {user_feedback}", "tool_call_id": tool_call["id"]})
update_memory(store, ("email_assistant", "response_preferences"), state["messages"] + result + [{
"role": "user",
"content": f"User gave feedback, which we can use to update the response preferences. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
elif tool_call["name"] == "schedule_meeting":
# Don't execute the tool, and add a message with the user feedback to incorporate into the email
result.append({"role": "tool", "content": f"User gave feedback, which can we incorporate into the meeting request. Feedback: {user_feedback}", "tool_call_id": tool_call["id"]})
update_memory(store, ("email_assistant", "cal_preferences"), state["messages"] + result + [{
"role": "user",
"content": f"User gave feedback, which we can use to update the calendar preferences. Follow all instructions above, and remember: {MEMORY_UPDATE_INSTRUCTIONS_REINFORCEMENT}."
}])
elif tool_call["name"] == "Question":
# Don't execute the tool, and add a message with the user feedback to incorporate into the email
result.append({"role": "tool", "content": f"User answered the question, which can we can use for any follow up actions. Feedback: {user_feedback}", "tool_call_id": tool_call["id"]})
else:
raise ValueError(f"Invalid tool call: {tool_call['name']}")
# Update the state
update = {
"messages": result,
}
return Command(goto=goto, update=update)
# Conditional edge function
def should_continue(state: State, store: BaseStore) -> Literal["interrupt_handler", END]:
"""Route to tool handler, or end if Done tool called"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
for tool_call in last_message.tool_calls:
if tool_call["name"] == "Done":
# TODO: Here, we could update the background memory with the email-response for follow up actions.
return END
else:
return "interrupt_handler"
# Build workflow
agent_builder = StateGraph(State)
# Add nodes - with store parameter
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("interrupt_handler", interrupt_handler)
# Add edges
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
{
"interrupt_handler": "interrupt_handler",
END: END,
},
)
# Compile the agent
response_agent = agent_builder.compile()
# Build overall workflow with store and checkpointer
overall_workflow = (
StateGraph(State, input=StateInput)
.add_node(triage_router)
.add_node(triage_interrupt_handler)
.add_node("response_agent", response_agent)
.add_edge(START, "triage_router")
)
email_assistant = overall_workflow.compile()
-262
View File
@@ -1,262 +0,0 @@
# Baseline agent prompt
from datetime import datetime
from src.email_assistant.tools.default.prompt_templates import (
STANDARD_TOOLS_PROMPT,
HITL_TOOLS_PROMPT,
HITL_MEMORY_TOOLS_PROMPT,
AGENT_TOOLS_PROMPT
)
agent_system_prompt_baseline = """
< Role >
You are a top-notch executive assistant who cares about helping your executive perform as well as possible.
</ Role >
< Tools >
You have access to the following tools to help manage communications and schedule:
{tools_prompt}
</ Tools >
< Instructions >
When handling emails, follow these steps:
1. Carefully analyze the email content and purpose
2. IMPORTANT --- always call a tool and call one tool at a time until the task is complete:
3. For responding to the email, draft a response email with the write_email tool
4. For meeting requests, use the check_calendar_availability tool to find open time slots
5. To schedule a meeting, use the schedule_meeting tool with a datetime object for the preferred_day parameter
- Today's date is """ + datetime.now().strftime("%Y-%m-%d") + """ - use this for scheduling meetings accurately
6. If you scheduled a meeting, then draft a short response email using the write_email tool
7. After using the write_email tool, the task is complete
8. If you have sent the email, then use the Done tool to indicate that the task is complete
</ Instructions >
< Triage Instructions >
{triage_instructions}
</ Triage Instructions >
< Background >
{background}
</ Background >
< Response Preferences >
{response_preferences}
</ Response Preferences >
< Calendar Preferences >
{cal_preferences}
</ Calendar Preferences >
"""
# Agentic workflow triage prompt
triage_system_prompt = """
< Role >
Your role is to triage incoming emails based upon instructs and background information below.
</ Role >
< Background >
{background}.
</ Background >
< Instructions >
Categorize each email into one of three categories:
1. IGNORE - Emails that are not worth responding to or tracking
2. NOTIFY - Important information that worth notification but doesn't require a response
3. RESPOND - Emails that need a direct response
Classify the below email into one of these categories.
</ Instructions >
< Rules >
{triage_instructions}
</ Rules >
"""
# Agentic workflow triage user prompt
triage_user_prompt = """
Please determine how to handle the below email thread:
From: {author}
To: {to}
Subject: {subject}
{email_thread}"""
# Agentic workflow prompt
agent_system_prompt = """
< Role >
You are a top-notch executive assistant who cares about helping your executive perform as well as possible.
</ Role >
< Tools >
You have access to the following tools to help manage communications and schedule:
{tools_prompt}
</ Tools >
< Instructions >
When handling emails, follow these steps:
1. Carefully analyze the email content and purpose
2. IMPORTANT --- always call a tool and call one tool at a time until the task is complete:
3. For responding to the email, draft a response email with the write_email tool
4. For meeting requests, use the check_calendar_availability tool to find open time slots
5. To schedule a meeting, use the schedule_meeting tool with a datetime object for the preferred_day parameter
- Today's date is """ + datetime.now().strftime("%Y-%m-%d") + """ - use this for scheduling meetings accurately
6. If you scheduled a meeting, then draft a short response email using the write_email tool
7. After using the write_email tool, the task is complete
8. If you have sent the email, then use the Done tool to indicate that the task is complete
</ Instructions >
< Background >
{background}
</ Background >
< Response Preferences >
{response_preferences}
</ Response Preferences >
< Calendar Preferences >
{cal_preferences}
</ Calendar Preferences >
"""
# Agentic workflow with HITL prompt
agent_system_prompt_hitl = """
< Role >
You are a top-notch executive assistant who cares about helping your executive perform as well as possible.
</ Role >
< Tools >
You have access to the following tools to help manage communications and schedule:
{tools_prompt}
</ Tools >
< Instructions >
When handling emails, follow these steps:
1. Carefully analyze the email content and purpose
2. IMPORTANT --- always call a tool and call one tool at a time until the task is complete:
3. If you need more information to complete the task, use the Question tool to ask a follow-up question to the user
4. For responding to the email, draft a response email with the write_email tool
5. For meeting requests, use the check_calendar_availability tool to find open time slots
6. To schedule a meeting, use the schedule_meeting tool with a datetime object for the preferred_day parameter
- Today's date is """ + datetime.now().strftime("%Y-%m-%d") + """ - use this for scheduling meetings accurately
7. If you scheduled a meeting, then draft a short response email using the write_email tool
8. After using the write_email tool, the task is complete
9. If you have sent the email, then use the Done tool to indicate that the task is complete
</ Instructions >
< Background >
{background}
</ Background >
< Response Preferences >
{response_preferences}
</ Response Preferences >
< Calendar Preferences >
{cal_preferences}
</ Calendar Preferences >
"""
# Agentic workflow with HITL and memory prompt
agent_system_prompt_hitl_memory = """
< Role >
You are a top-notch executive assistant.
</ Role >
< Tools >
You have access to the following tools to help manage communications and schedule:
{tools_prompt}
</ Tools >
< Instructions >
When handling emails, follow these steps:
1. Carefully analyze the email content and purpose
2. IMPORTANT --- always call a tool and call one tool at a time until the task is complete:
3. To gather information background information use the "background" tool
4. To gather information about meeting preferences use the "cal_preferences" tool
5. To gather information about response preferences use the "response_preferences" tool
6. If the provided background information, meeting preferences, or response preferences are not sufficient, use the Question tool to ask follow-up questions
7. For meeting requests, use the check_calendar_availability tool to find open time slots
8. Schedule meetings with the schedule_meeting tool when appropriate
- Today's date is """ + datetime.now().strftime("%Y-%m-%d") + """ - use this for scheduling meetings accurately
9. If you scheduled a meeting, then draft a short response email using the write_email tool
10. Draft response emails using the write_email tool
11. After calling the write_email tool, the task is complete
12. If you have sent the email, then use the Done tool to indicate that the task is complete
</ Instructions >
< Response Preferences >
{response_preferences}
</ Response Preferences >
< Calendar Preferences >
{cal_preferences}
</ Calendar Preferences >
< Background >
{background}
</ Background >
"""
# Default background information
default_background = """
I'm Lance, a software engineer at LangChain.
"""
# Default response preferences
default_response_preferences = """
Use professional and concise language. If the e-mail mentions a deadline, make sure to explicitly acknowledge and reference the deadline in your response.
When responding to technical questions that require investigation:
- Clearly state whether you will investigate or who you will ask
- Provide an estimated timeline for when you'll have more information or complete the task
When responding to event or conference invitations:
- Always acknowledge any mentioned deadlines (particularly registration deadlines)
- If workshops or specific topics are mentioned, ask for more specific details about them
- If discounts (group or early bird) are mentioned, explicitly request information about them
- Don't commit
When responding to collaboration or project-related requests:
- Acknowledge any existing work or materials mentioned (drafts, slides, documents, etc.)
- Explicitly mention reviewing these materials before or during the meeting
- When scheduling meetings, clearly state the specific day, date, and time proposed
When responding to meeting scheduling requests:
- If times are proposed, verify calendar availability for all time slots mentioned in the original email and then commit to one of the proposed times based on your availability by scheduling the meeting. Or, say you can't make it at the time proposed.
- If no times are proposed, then check your calendar for availability and propose multiple time options when available instead of selecting just one.
- Mention the meeting duration in your response to confirm you've noted it correctly.
- Reference the meeting's purpose in your response.
"""
# Default calendar preferences
default_cal_preferences = """
30 minute meetings are preferred, but 15 minute meetings are also acceptable.
"""
# Default triage instructions
default_triage_instructions = """
Emails that are not worth responding to:
- Marketing newsletters and promotional emails
- Spam or suspicious emails
- CC'd on FYI threads with no direct questions
There are also other things that should be known about, but don't require an email response. For these, you should notify (using the `notify` response). Examples of this include:
- Team member out sick or on vacation
- Build system notifications or deployments
- Project status updates without action items
- Important company announcements
- FYI emails that contain relevant information for current projects
- HR Department deadline reminders
- Subscription status / renewal reminders
- GitHub notifications
Emails that are worth responding to:
- Direct questions from team members requiring expertise
- Meeting requests requiring confirmation
- Critical bug reports related to team's projects
- Requests from management requiring acknowledgment
- Client inquiries about project status or features
- Technical questions about documentation, code, or APIs (especially questions about missing endpoints or features)
- Personal reminders related to family (wife / daughter)
- Personal reminder related to self-care (doctor appointments, etc)
"""
-34
View File
@@ -1,34 +0,0 @@
from pydantic import BaseModel, Field
from typing import Optional
from typing_extensions import TypedDict, Literal, Annotated
from langgraph.graph import MessagesState
class RouterSchema(BaseModel):
"""Analyze the unread email and route it according to its content."""
reasoning: str = Field(
description="Step-by-step reasoning behind the classification."
)
classification: Literal["ignore", "respond", "notify"] = Field(
description="The classification of an email: 'ignore' for irrelevant emails, "
"'notify' for important information that doesn't need a response, "
"'respond' for emails that need a reply",
)
class StateInput(TypedDict):
# This is the input to the state
email_input: dict
class State(MessagesState):
# This state class has the messages key build in
email_input: dict
classification_decision: Literal["ignore", "respond", "notify"]
class EmailData(TypedDict):
id: str
thread_id: str
from_email: str
subject: str
page_content: str
send_time: str
to_email: str
-13
View File
@@ -1,13 +0,0 @@
from src.email_assistant.tools.base import get_tools, get_tools_by_name
from src.email_assistant.tools.default.email_tools import write_email, triage_email, Done
from src.email_assistant.tools.default.calendar_tools import schedule_meeting, check_calendar_availability
__all__ = [
"get_tools",
"get_tools_by_name",
"write_email",
"triage_email",
"Done",
"schedule_meeting",
"check_calendar_availability",
]
Binary file not shown.
Binary file not shown.
-57
View File
@@ -1,57 +0,0 @@
from typing import Dict, List, Callable, Any
from langchain_core.tools import BaseTool
def get_tools(tool_names: List[str] = None, include_gmail: bool = False) -> List[BaseTool]:
"""Get specified tools or all tools if tool_names is None.
Args:
tool_names: Optional list of tool names to include. If None, returns all tools.
include_gmail: Whether to include Gmail tools. Defaults to False.
Returns:
List of tool objects
"""
# Import default tools
from src.email_assistant.tools.default.email_tools import write_email, triage_email, Done
from src.email_assistant.tools.default.calendar_tools import schedule_meeting, check_calendar_availability
# Base tools dictionary
all_tools = {
"write_email": write_email,
"triage_email": triage_email,
"Done": Done,
"schedule_meeting": schedule_meeting,
"check_calendar_availability": check_calendar_availability,
}
# Add Gmail tools if requested
if include_gmail:
try:
from src.email_assistant.tools.gmail.gmail_tools import (
fetch_emails_tool,
send_email_tool,
check_calendar_tool,
schedule_meeting_tool
)
all_tools.update({
"fetch_emails_tool": fetch_emails_tool,
"send_email_tool": send_email_tool,
"check_calendar_tool": check_calendar_tool,
"schedule_meeting_tool": schedule_meeting_tool,
})
except ImportError:
# If Gmail tools aren't available, continue without them
pass
if tool_names is None:
return list(all_tools.values())
return [all_tools[name] for name in tool_names if name in all_tools]
def get_tools_by_name(tools: List[BaseTool] = None) -> Dict[str, BaseTool]:
"""Get a dictionary of tools mapped by name."""
if tools is None:
tools = get_tools()
return {tool.name: tool for tool in tools}
-22
View File
@@ -1,22 +0,0 @@
"""Default tools for email assistant."""
from src.email_assistant.tools.default.email_tools import write_email, triage_email, Done
from src.email_assistant.tools.default.calendar_tools import schedule_meeting, check_calendar_availability
from src.email_assistant.tools.default.prompt_templates import (
STANDARD_TOOLS_PROMPT,
AGENT_TOOLS_PROMPT,
HITL_TOOLS_PROMPT,
HITL_MEMORY_TOOLS_PROMPT
)
__all__ = [
"write_email",
"triage_email",
"Done",
"schedule_meeting",
"check_calendar_availability",
"STANDARD_TOOLS_PROMPT",
"AGENT_TOOLS_PROMPT",
"HITL_TOOLS_PROMPT",
"HITL_MEMORY_TOOLS_PROMPT"
]
-17
View File
@@ -1,17 +0,0 @@
from datetime import datetime
from langchain_core.tools import tool
@tool
def schedule_meeting(
attendees: list[str], subject: str, duration_minutes: int, preferred_day: datetime, start_time: int
) -> str:
"""Schedule a calendar meeting."""
# Placeholder response - in real app would check calendar and schedule
date_str = preferred_day.strftime("%A, %B %d, %Y")
return f"Meeting '{subject}' scheduled on {date_str} at {start_time} for {duration_minutes} minutes with {len(attendees)} attendees"
@tool
def check_calendar_availability(day: str) -> str:
"""Check calendar availability for a given day."""
# Placeholder response - in real app would check actual calendar
return f"Available times on {day}: 9:00 AM, 2:00 PM, 4:00 PM"
-19
View File
@@ -1,19 +0,0 @@
from typing import Literal
from pydantic import BaseModel
from langchain_core.tools import tool
@tool
def write_email(to: str, subject: str, content: str) -> str:
"""Write and send an email."""
# Placeholder response - in real app would send email
return f"Email sent to {to} with subject '{subject}'"
@tool
def triage_email(category: Literal["ignore", "notify", "respond"]) -> str:
"""Triage an email into one of three categories: ignore, notify, respond."""
return f"Classification Decision: {category}"
@tool
class Done(BaseModel):
"""E-mail has been sent."""
done: bool
-37
View File
@@ -1,37 +0,0 @@
"""Tool prompt templates for the email assistant."""
# Standard tool descriptions for insertion into prompts
STANDARD_TOOLS_PROMPT = """
1. triage_email(ignore, notify, respond) - Triage emails into one of three categories
2. write_email(to, subject, content) - Send emails to specified recipients
3. schedule_meeting(attendees, subject, duration_minutes, preferred_day, start_time) - Schedule calendar meetings where preferred_day is a datetime object
4. check_calendar_availability(day) - Check available time slots for a given day
5. Done - E-mail has been sent
"""
# Tool descriptions for HITL workflow
HITL_TOOLS_PROMPT = """
1. write_email(to, subject, content) - Send emails to specified recipients
2. schedule_meeting(attendees, subject, duration_minutes, preferred_day, start_time) - Schedule calendar meetings where preferred_day is a datetime object
3. check_calendar_availability(day) - Check available time slots for a given day
4. Question(content) - Ask the user any follow-up questions
5. Done - E-mail has been sent
"""
# Tool descriptions for HITL with memory workflow
HITL_MEMORY_TOOLS_PROMPT = """
1. write_email(to, subject, content) - Send emails to specified recipients
2. schedule_meeting(attendees, subject, duration_minutes, preferred_day, start_time) - Schedule calendar meetings where preferred_day is a datetime object
3. check_calendar_availability(day) - Check available time slots for a given day
4. Question(content) - Ask the user any follow-up questions
5. background - Search for background information about the user and their contacts
6. Done - E-mail has been sent
"""
# Tool descriptions for agent workflow without triage
AGENT_TOOLS_PROMPT = """
1. write_email(to, subject, content) - Send emails to specified recipients
2. schedule_meeting(attendees, subject, duration_minutes, preferred_day, start_time) - Schedule calendar meetings where preferred_day is a datetime object
3. check_calendar_availability(day) - Check available time slots for a given day
4. Done - E-mail has been sent
"""
-303
View File
@@ -1,303 +0,0 @@
# Gmail Integration Tools
This directory contains tools for integrating with Gmail and Google Calendar APIs to enable the email assistant to work with real emails and calendar events.
## Features
- **Email Fetching**: Retrieve recent emails from your Gmail account
- **Email Sending**: Send replies to email threads
- **Calendar Availability**: Check your Google Calendar for availability on specific dates
- **Meeting Scheduling**: Create calendar events and send invites to attendees
## Setup Instructions
### 1. Set up Google Cloud Project and Enable Gmail API
1. Enable the Gmail API by clicking the blue "Enable API" button [here](https://developers.google.com/gmail/api/quickstart/python#enable_the_api)
2. Authorize credentials for a desktop application [here](https://developers.google.com/workspace/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application)
- Go to Clients
- Create Client
- Application type > Desktop app
- Create
- Under "Audience" select "External" if you're using a personal email (non-Google Workspace)
<img width="1496" alt="Screenshot 2025-04-26 at 7 43 57 AM" src="https://github.com/user-attachments/assets/718da39e-9b10-4a2a-905c-eda87c1c1126" />
- Add yourself as a test user
<img width="1622" alt="Screenshot 2025-04-26 at 7 46 32 AM" src="https://github.com/user-attachments/assets/0489ad7e-0acd-4abd-b309-7c97ce705932" />
3. Save the downloaded JSON file
### 3. Set Up Authentication Files
1. Move your downloaded client secret JSON file to the `.secrets` directory
```bash
# Create a secrets directory
mkdir -p src/email_assistant/tools/gmail/.secrets
# Move your downloaded client secret to the secrets directory
mv /path/to/downloaded/client_secret.json src/email_assistant/tools/gmail/.secrets/secrets.json
```
2. Run the Gmail setup script
```bash
# Run the Gmail setup script
python src/email_assistant/tools/gmail/setup_gmail.py
```
- This will open a browser window for you to authenticate with your Google account
- This will create a `token.json` file in the `.secrets` directory
- This token will be used for Gmail API access
### 4. Run the Gmail Ingestion Script
1. Once you have authentication set up, you can run the Gmail ingestion script.
2. Start the locally running LangGraph server in one terminal:
```
langgraph dev
```
3. Run the ingestion script in another terminal:
```bash
python src/email_assistant/tools/gmail/run_ingest.py --email rlance.martin@gmail.com --minutes-since 1000
```
- This will fetch emails from the past 1000 minutes and process them with your email assistant.
- It will use the LangGraph SDK to pass each email to the locally running email assistant.
#### Important Ingestion Parameters:
- `--graph-name`: Name of the LangGraph to use (default: "email_assistant_hitl_memory")
- `--email`: The email address to fetch messages from (alternative to setting EMAIL_ADDRESS)
- `--minutes-since`: Only process emails that are newer than this many minutes (default: 60)
- `--url`: URL of the LangGraph deployment (default: http://127.0.0.1:2024)
- `--rerun`: Process emails that have already been processed (default: false)
- `--early`: Stop after processing one email (default: false)
- `--mock`: Run in mock mode without requiring a LangGraph server
- `--include-read`: Include emails that have already been read (by default only unread emails are processed)
- `--skip-filters`: Process all emails without filtering (by default only latest messages in threads where you're not the sender are processed)
- `--enable-tracing`: Enable LangSmith tracing (requires LANGCHAIN_API_KEY to be set)
- `--langsmith-api-key`: LangSmith API key for tracing (alternative to setting LANGCHAIN_API_KEY)
- `--langsmith-project`: LangSmith project name for tracing (default: "gmail-assistant")
#### Flag Combinations:
- `--rerun --early`: Process one email (regardless if it was processed before) and stop
- `--rerun`: Process all emails, including ones previously processed by LangGraph
- `--early`: Process only one new (previously unprocessed) email and stop
- (no flags): Process only new (previously unprocessed) emails
- `--include-read --skip-filters`: Process all emails, including ones marked as read and ones that would normally be filtered out
- `--minutes-since 1000 --include-read --skip-filters`: Process all emails from the past ~16 hours without any filtering
- `--enable-tracing --langsmith-project "my-project"`: Process emails with LangSmith tracing enabled
#### Troubleshooting:
- **Missing emails?** The Gmail API applies filters to show only important/primary emails by default. You can:
- Increase the `--minutes-since` parameter to a larger value (e.g., 1000) to fetch emails from a longer time period
- Use the `--include-read` flag to process emails marked as "read" (by default only unread emails are processed)
- Use the `--skip-filters` flag to include all messages (not just the latest in a thread, and including ones you sent)
- Try running with all options to process everything: `--include-read --skip-filters --minutes-since 1000`
- Use the `--mock` flag to test the system with simulated emails
## How Gmail Ingestion Works
The Gmail ingestion process works in three main stages:
### 1. CLI Parameters → Gmail Search Query
CLI parameters are translated into a Gmail search query:
- `--minutes-since 1440``after:TIMESTAMP` (emails from the last 24 hours)
- `--email you@example.com``to:you@example.com OR from:you@example.com` (emails where you're sender or recipient)
- `--include-read` → removes `is:unread` filter (includes read messages)
For example, running:
```
python run_ingest.py --email you@example.com --minutes-since 1440 --include-read
```
Creates a Gmail API search query like:
```
(to:you@example.com OR from:you@example.com) after:1745432245
```
### 2. Search Results → Thread Processing
For each message returned by the search:
1. The script obtains the thread ID
2. Using this thread ID, it fetches the **complete thread** with all messages
3. Messages in the thread are sorted by date to identify the latest message
4. Depending on filtering options, it processes either:
- The specific message found in the search (default behavior)
- The latest message in the thread (when using `--skip-filters`)
### 3. Default Filters and `--skip-filters` Behavior
#### Default Filters Applied
Without `--skip-filters`, the system applies these three filters in sequence:
1. **Unread Filter** (controlled by `--include-read`):
- Default behavior: Only processes unread messages
- With `--include-read`: Processes both read and unread messages
- Implementation: Adds `is:unread` to the Gmail search query
- This filter happens at the search level before any messages are retrieved
2. **Sender Filter**:
- Default behavior: Skips messages sent by your own email address
- Implementation: Checks if your email appears in the "From" header
- Logic: `is_from_user = email_address in from_header`
- This prevents the assistant from responding to your own emails
3. **Thread-Position Filter**:
- Default behavior: Only processes the most recent message in each thread
- Implementation: Compares message ID with the last message in thread
- Logic: `is_latest_in_thread = message["id"] == last_message["id"]`
- Prevents processing older messages when a newer reply exists
The combination of these filters means only the latest message in each thread that was not sent by you and is unread (unless `--include-read` is specified) will be processed.
#### Effect of `--skip-filters` Flag
When `--skip-filters` is enabled:
1. **Bypasses Sender and Thread-Position Filters**:
- Messages sent by you will be processed
- Messages that aren't the latest in thread will be processed
- Logic: `should_process = skip_filters or (not is_from_user and is_latest_in_thread)`
2. **Changes Which Message Is Processed**:
- Without `--skip-filters`: Uses the specific message found by search
- With `--skip-filters`: Always uses the latest message in the thread
- Even if the latest message wasn't found in the search results
3. **Unread Filter Still Applies (unless overridden)**:
- `--skip-filters` does NOT bypass the unread filter
- To process read messages, you must still use `--include-read`
- This is because the unread filter happens at the search level
In summary:
- Default: Process only unread messages where you're not the sender and that are the latest in their thread
- `--skip-filters`: Process all messages found by search, using the latest message in each thread
- `--include-read`: Include read messages in the search
- `--include-read --skip-filters`: Most comprehensive, processes the latest message in all threads found by search
## Important Gmail API Limitations
The Gmail API has several limitations that affect email ingestion:
1. **Search-Based API**: Gmail doesn't provide a direct "get all emails from timeframe" endpoint
- All email retrieval relies on Gmail's search functionality
- Search results can be delayed for very recent messages (indexing lag)
- Search results might not include all messages that technically match criteria
2. **Two-Stage Retrieval Process**:
- Initial search to find relevant message IDs
- Secondary thread retrieval to get complete conversations
- This two-stage process is necessary because search doesn't guarantee complete thread information
## When to Use `--skip-filters`
### Use `--skip-filters` When:
- **Latest Messages Are Missing**: The thread contains newer messages that aren't being processed
- **Complete Thread Context Needed**: You want to ensure you have the most up-to-date conversation context
- **Debugging Thread Issues**: You need to see which messages exist in threads vs. which are being processed
- **Initial Data Loading**: You're populating the system with existing conversations
- **Inconsistent Results**: You notice some messages are being skipped or processed out of order
### When NOT to Use `--skip-filters`:
- **Day-to-Day Operation**: For routine email processing, the default filters provide a natural workflow
- **Avoiding Duplicates**: To prevent reprocessing messages that have already been handled
- **Targeting Specific Messages**: When you want to process exactly the messages that match your search criteria
- **Processing Only New Correspondence**: When you want to handle only new, unread messages directed to you
### Behavior With `--skip-filters` Enabled:
1. The system still uses search to find relevant thread IDs
2. For each thread found, it fetches ALL messages in that thread
3. It sorts all messages by timestamp to identify the truly latest message
4. It processes the latest message in each thread, even if:
- That message wasn't in the original search results
- That message was sent by you
- That message isn't the latest in the original search results
This ensures you're always working with the most current state of each conversation.
## Known Limitations and Troubleshooting
- **Indexing Delays**: The Gmail API's search might miss very recent messages (added in the last few minutes)
- **Inconsistent Threading**: Gmail's thread IDs are consistent within a session but might change across API calls
- **Message Visibility**: Some messages might be excluded due to Gmail's categorization (Promotions, Updates, etc.)
- **Rate Limits**: The Gmail API has rate limits that could affect processing of large email volumes
If messages appear to be missing:
- Use a larger `--minutes-since` value to cast a wider time net
- Enable `--include-read` to include messages you've already read
- Enable `--skip-filters` to process the latest message in each thread
- Try the combination: `--minutes-since 1440 --include-read --skip-filters`
- **Connection errors:** If you get "Connection refused" or "All connection attempts failed" errors:
- Make sure the LangGraph server is running with `langgraph start` in a separate terminal
- Verify the port number matches in your script (default is 2024)
- Use the `--mock` flag to test without a LangGraph server: `--mock`
## Recent Updates and Fixes
The Gmail integration has been updated with several improvements:
1. **Improved Thread Processing**: Now properly retrieves all messages in a thread, not just the ones found by search
- Added comprehensive logging of thread messages with dates and senders
- Fixed sorting to ensure the truly latest message is identified
2. **Enhanced `--skip-filters` Behavior**: When enabled, the system now:
- Processes the absolute latest message in the thread, even if it wasn't found in search
- Uses thread-based retrieval to bypass Gmail search limitations
- Shows detailed information about which messages are being processed
3. **Thread ID Handling**: Improved how thread IDs are mapped between Gmail and LangGraph
- Uses MD5 hash to ensure consistent ID generation across runs
- Better error handling for thread ID mapping issues
4. **Simplified Command-Line Interface**:
- Improved flag handling with boolean flags for better usability
- Added LangSmith tracing options for better observability
- Simplified parameters and added clearer documentation
5. **LangSmith Tracing Integration**:
- Added support for tracing email processing through LangSmith
- Ensured tracing context is maintained across workflow interrupts
- Added explicit flags for controlling tracing behavior
- **Authentication issues:** If you encounter a "Token has been expired or revoked" error, delete the existing `token.json` file and run the setup script again to generate a fresh token.
- **Tracing issues:** If you're not seeing traces in LangSmith after interrupts, ensure you're using the latest version of LangSmith.
## Using Gmail Tools in Your Agent
To use Gmail tools in your agent, modify your agent code as follows:
```python
from src.email_assistant.tools import get_tools, get_tools_by_name
from src.email_assistant.tools.gmail.prompt_templates import COMBINED_TOOLS_PROMPT
# Get tools with Gmail integration enabled
tools = get_tools(include_gmail=True)
tools_by_name = get_tools_by_name(tools)
# Use the combined tools prompt in your agent's system prompt
system_prompt = agent_system_prompt.format(
tools_prompt=COMBINED_TOOLS_PROMPT,
# other parameters...
)
```
See `src/email_assistant/gmail_assistant.py` for a complete example.
-18
View File
@@ -1,18 +0,0 @@
"""Gmail tools for email assistant."""
from src.email_assistant.tools.gmail.gmail_tools import (
fetch_emails_tool,
send_email_tool,
check_calendar_tool,
schedule_meeting_tool
)
from src.email_assistant.tools.gmail.prompt_templates import GMAIL_TOOLS_PROMPT
__all__ = [
"fetch_emails_tool",
"send_email_tool",
"check_calendar_tool",
"schedule_meeting_tool",
"GMAIL_TOOLS_PROMPT"
]
-879
View File
@@ -1,879 +0,0 @@
"""
Gmail tools implementation module.
This module formats the Gmail API functions into LangChain tools.
"""
import os
import sys
import base64
import email.utils
import json
import logging
from datetime import datetime
from typing import List, Optional, Dict, Any, Iterator
from pathlib import Path
from pydantic import Field, BaseModel
from langchain_core.tools import tool
# Setup basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define paths for credentials and tokens
_ROOT = Path(__file__).parent.absolute()
_SECRETS_DIR = _ROOT / ".secrets"
# We need to try importing the Gmail API libraries
# If they're not available, we'll use a mock implementation
try:
import logging
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from datetime import timedelta
from dateutil.parser import parse as parse_time
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Email content extraction function
def extract_message_part(payload):
"""Extract content from a message part."""
if payload.get("body", {}).get("data"):
# Handle base64 encoded content
data = payload["body"]["data"]
decoded = base64.urlsafe_b64decode(data).decode("utf-8")
return decoded
# Handle multipart messages
if payload.get("parts"):
text_parts = []
for part in payload["parts"]:
# Recursively process parts
content = extract_message_part(part)
if content:
text_parts.append(content)
return "\n".join(text_parts)
return ""
# Function to get credentials from token.json
def get_credentials(gmail_token=None, gmail_secret=None):
"""Get Gmail API credentials from token.json"""
token_path = _SECRETS_DIR / "token.json"
if not os.path.exists(token_path):
logger.error(f"Token file not found at {token_path}")
return None
try:
with open(token_path, "r") as f:
token_data = json.load(f)
from google.oauth2.credentials import Credentials
# Create credentials object with specific format
# Using token format from the token.json file
credentials = Credentials(
token=token_data.get("token"),
refresh_token=token_data.get("refresh_token"),
token_uri=token_data.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=token_data.get("client_id"),
client_secret=token_data.get("client_secret"),
scopes=token_data.get("scopes", ["https://www.googleapis.com/auth/gmail.modify"])
)
# Add authorize method to make it compatible with old code
credentials.authorize = lambda request: request
return credentials
except Exception as e:
logger.error(f"Error loading credentials: {str(e)}")
return None
# Type alias for better readability
EmailData = Dict[str, Any]
GMAIL_API_AVAILABLE = True
except ImportError:
# If Gmail API libraries aren't available, set flag to use mock implementation
GMAIL_API_AVAILABLE = False
logger = logging.getLogger(__name__)
# Helper function that is used by the tool and can be imported elsewhere
def fetch_group_emails(
email_address: str,
minutes_since: int = 30,
gmail_token: Optional[str] = None,
gmail_secret: Optional[str] = None,
include_read: bool = False,
skip_filters: bool = False,
) -> Iterator[Dict[str, Any]]:
"""
Fetch recent emails from Gmail that involve the specified email address.
This function retrieves emails where the specified address is either a sender
or recipient, processes them, and returns them in a format suitable for the
email assistant to process.
Args:
email_address: Email address to fetch messages for
minutes_since: Only retrieve emails newer than this many minutes
gmail_token: Optional token for Gmail API authentication
gmail_secret: Optional credentials for Gmail API authentication
include_read: Whether to include already read emails (default: False)
skip_filters: Skip thread and sender filtering (return all messages, default: False)
Yields:
Dict objects containing processed email information
"""
use_mock = False
# Check if we need to use mock implementation
if not GMAIL_API_AVAILABLE:
logger.info("Gmail API not available, using mock implementation")
use_mock = True
# Check if required credential files exist
if not use_mock and not gmail_token and not gmail_secret:
token_path = str(_SECRETS_DIR / "token.json")
secrets_path = str(_SECRETS_DIR / "secrets.json")
if not os.path.exists(token_path) and not os.path.exists(secrets_path):
logger.warning(f"No Gmail API credentials found. Looking for token.json or secrets.json in .secrets directory")
logger.warning("Using mock implementation instead")
use_mock = True
# Return mock data if needed
if use_mock:
# For demo purposes, we return a mock email
mock_email = {
"from_email": "sender@example.com",
"to_email": email_address,
"subject": "Sample Email Subject",
"page_content": "This is a sample email body for testing the email assistant.",
"id": "mock-email-id-123",
"thread_id": "mock-thread-id-123",
"send_time": datetime.now().isoformat()
}
yield mock_email
return
try:
# Get Gmail API credentials
creds = get_credentials(gmail_token, gmail_secret)
# Check if credentials are valid
if not creds or not hasattr(creds, 'authorize'):
logger.warning("Invalid Gmail credentials, using mock implementation")
mock_email = {
"from_email": "sender@example.com",
"to_email": email_address,
"subject": "Sample Email Subject - Invalid Credentials",
"page_content": "This is a mock email because the Gmail credentials are invalid.",
"id": "mock-email-id-123",
"thread_id": "mock-thread-id-123",
"send_time": datetime.now().isoformat()
}
yield mock_email
return
service = build("gmail", "v1", credentials=creds)
# Calculate timestamp for filtering
after = int((datetime.now() - timedelta(minutes=minutes_since)).timestamp())
# Construct Gmail search query
# This query searches for:
# - Emails sent to or from the specified address
# - Emails after the specified timestamp
# - Including emails from all categories (inbox, updates, promotions, etc.)
# Base query with time filter
query = f"(to:{email_address} OR from:{email_address}) after:{after}"
# Only include unread emails unless include_read is True
if not include_read:
query += " is:unread"
else:
logger.info("Including read emails in search")
# Log the final query for debugging
logger.info(f"Gmail search query: {query}")
# Additional filter options (commented out by default)
# If you want to include emails from specific categories, use:
# query += " category:(primary OR updates OR promotions)"
# Retrieve all matching messages (handling pagination)
messages = []
nextPageToken = None
logger.info(f"Fetching emails for {email_address} from last {minutes_since} minutes")
while True:
results = (
service.users()
.messages()
.list(userId="me", q=query, pageToken=nextPageToken)
.execute()
)
if "messages" in results:
new_messages = results["messages"]
messages.extend(new_messages)
logger.info(f"Found {len(new_messages)} messages in this page")
else:
logger.info("No messages found in this page")
nextPageToken = results.get("nextPageToken")
if not nextPageToken:
logger.info(f"Total messages found: {len(messages)}")
break
# Process each message
count = 0
for message in messages:
try:
# Get full message details
msg = service.users().messages().get(userId="me", id=message["id"]).execute()
thread_id = msg["threadId"]
payload = msg["payload"]
headers = payload.get("headers", [])
# Get thread details to determine conversation context
# Directly fetch the complete thread without any format restriction
# This matches the exact approach in the test code that successfully gets all messages
thread = service.users().threads().get(userId="me", id=thread_id).execute()
messages_in_thread = thread["messages"]
logger.info(f"Retrieved thread {thread_id} with {len(messages_in_thread)} messages")
# Sort messages by internalDate to ensure proper chronological ordering
# This ensures we correctly identify the latest message
if all("internalDate" in msg for msg in messages_in_thread):
messages_in_thread.sort(key=lambda m: int(m.get("internalDate", 0)))
logger.info(f"Sorted {len(messages_in_thread)} messages by internalDate")
else:
# Fallback to ID-based sorting if internalDate is missing
messages_in_thread.sort(key=lambda m: m["id"])
logger.info(f"Sorted {len(messages_in_thread)} messages by ID (internalDate missing)")
# Log details about the messages in the thread for debugging
for idx, msg in enumerate(messages_in_thread):
headers = msg["payload"]["headers"]
subject = next((h["value"] for h in headers if h["name"] == "Subject"), "No Subject")
from_email = next((h["value"] for h in headers if h["name"] == "From"), "Unknown")
date = next((h["value"] for h in headers if h["name"] == "Date"), "Unknown")
logger.info(f" Message {idx+1}/{len(messages_in_thread)}: ID={msg['id']}, Date={date}, From={from_email}")
# Log thread information for debugging
logger.info(f"Thread {thread_id} has {len(messages_in_thread)} messages")
# Analyze the last message in the thread to determine if we need to process it
last_message = messages_in_thread[-1]
last_headers = last_message["payload"]["headers"]
# Get sender of last message
from_header = next(
header["value"] for header in last_headers if header["name"] == "From"
)
last_from_header = next(
header["value"]
for header in last_message["payload"].get("headers")
if header["name"] == "From"
)
# If the last message was sent by the user, mark this as a user response
# and don't process it further (assistant doesn't need to respond to user's own emails)
if email_address in last_from_header:
yield {
"id": message["id"],
"thread_id": message["threadId"],
"user_respond": True,
}
continue
# Check if this is a message we should process
is_from_user = email_address in from_header
is_latest_in_thread = message["id"] == last_message["id"]
# Modified logic for skip_filters:
# 1. When skip_filters is True, process all messages regardless of position in thread
# 2. When skip_filters is False, only process if it's not from user AND is latest in thread
should_process = skip_filters or (not is_from_user and is_latest_in_thread)
if not should_process:
if is_from_user:
logger.debug(f"Skipping message {message['id']}: sent by the user")
elif not is_latest_in_thread:
logger.debug(f"Skipping message {message['id']}: not the latest in thread")
# Process the message if it passes our filters (or if filters are skipped)
if should_process:
# Log detailed information about this message
logger.info(f"Processing message {message['id']} from thread {thread_id}")
logger.info(f" Is latest in thread: {is_latest_in_thread}")
logger.info(f" Skip filters enabled: {skip_filters}")
# If the user wants to process the latest message in the thread,
# use the last_message from the thread API call instead of the original message
# that matched the search query
if not skip_filters:
# Use original message if skip_filters is False
process_message = message
process_payload = payload
process_headers = headers
else:
# Use the latest message in the thread if skip_filters is True
process_message = last_message
process_payload = last_message["payload"]
process_headers = process_payload.get("headers", [])
logger.info(f"Using latest message in thread: {process_message['id']}")
# Extract email metadata from headers
subject = next(
header["value"] for header in process_headers if header["name"] == "Subject"
)
from_email = next(
(header["value"] for header in process_headers if header["name"] == "From"),
"",
).strip()
_to_email = next(
(header["value"] for header in process_headers if header["name"] == "To"),
"",
).strip()
# Use Reply-To header if present
if reply_to := next(
(
header["value"]
for header in process_headers
if header["name"] == "Reply-To"
),
"",
).strip():
from_email = reply_to
# Extract and parse email timestamp
send_time = next(
header["value"] for header in process_headers if header["name"] == "Date"
)
parsed_time = parse_time(send_time)
# Extract email body content
body = extract_message_part(process_payload)
# Yield the processed email data
yield {
"from_email": from_email,
"to_email": _to_email,
"subject": subject,
"page_content": body,
"id": process_message["id"],
"thread_id": process_message["threadId"],
"send_time": parsed_time.isoformat(),
}
count += 1
except Exception as e:
logger.warning(f"Failed to process message {message['id']}: {str(e)}")
logger.info(f"Found {count} emails to process out of {len(messages)} total messages.")
except Exception as e:
logger.error(f"Error accessing Gmail API: {str(e)}")
# Fall back to mock implementation
mock_email = {
"from_email": "sender@example.com",
"to_email": email_address,
"subject": "Sample Email Subject",
"page_content": "This is a sample email body for testing the email assistant.",
"id": "mock-email-id-123",
"thread_id": "mock-thread-id-123",
"send_time": datetime.now().isoformat()
}
yield mock_email
class FetchEmailsInput(BaseModel):
"""
Input schema for the fetch_emails_tool.
"""
email_address: str = Field(
description="Email address to fetch emails for"
)
minutes_since: int = Field(
default=30,
description="Only retrieve emails newer than this many minutes"
)
@tool(args_schema=FetchEmailsInput)
def fetch_emails_tool(email_address: str, minutes_since: int = 30) -> str:
"""
Fetches recent emails from Gmail for the specified email address.
Args:
email_address: Email address to fetch messages for
minutes_since: Only retrieve emails newer than this many minutes (default: 30)
Returns:
String summary of fetched emails
"""
emails = list(fetch_group_emails(email_address, minutes_since))
if not emails:
return "No new emails found."
result = f"Found {len(emails)} new emails:\n\n"
for i, email in enumerate(emails, 1):
if email.get("user_respond", False):
result += f"{i}. You already responded to this email (Thread ID: {email['thread_id']})\n\n"
continue
result += f"{i}. From: {email['from_email']}\n"
result += f" To: {email['to_email']}\n"
result += f" Subject: {email['subject']}\n"
result += f" Time: {email['send_time']}\n"
result += f" ID: {email['id']}\n"
result += f" Thread ID: {email['thread_id']}\n"
result += f" Content: {email['page_content'][:200]}...\n\n"
return result
class SendEmailInput(BaseModel):
"""
Input schema for the send_email_tool.
"""
email_id: str = Field(
description="Gmail message ID to reply to. This must be a valid Gmail message ID obtained from the fetch_emails_tool. If you're creating a new email (not replying), you can use any string like 'NEW_EMAIL'."
)
response_text: str = Field(
description="Content of the reply"
)
email_address: str = Field(
description="Current user's email address"
)
additional_recipients: Optional[List[str]] = Field(
default=None,
description="Optional additional recipients to include"
)
# Helper function for sending emails
def send_email(
email_id: str,
response_text: str,
email_address: str,
addn_receipients: Optional[List[str]] = None
) -> bool:
"""
Send a reply to an existing email thread or create a new email.
Args:
email_id: Gmail message ID to reply to. If this is not a valid Gmail ID (e.g., when creating a new email),
the function will create a new email instead of replying to an existing thread.
response_text: Content of the reply or new email
email_address: Current user's email address (the sender)
addn_receipients: Optional additional recipients
Returns:
Success flag (True if email was sent)
"""
if not GMAIL_API_AVAILABLE:
logger.info("Gmail API not available, simulating email send")
logger.info(f"Would send: {response_text[:100]}...")
return True
try:
# Get Gmail API credentials
creds = get_credentials()
service = build("gmail", "v1", credentials=creds)
try:
# Try to get the original message to extract headers
message = service.users().messages().get(userId="me", id=email_id).execute()
headers = message["payload"]["headers"]
# Extract subject with Re: prefix if not already present
subject = next(header["value"] for header in headers if header["name"] == "Subject")
if not subject.startswith("Re:"):
subject = f"Re: {subject}"
# Create a reply message
original_from = next(header["value"] for header in headers if header["name"] == "From")
# Get thread ID from message
thread_id = message["threadId"]
except Exception as e:
logger.warning(f"Could not retrieve original message with ID {email_id}. Error: {str(e)}")
# If we can't get the original message, create a new message with minimal info
subject = "Response"
original_from = "recipient@example.com" # Will be overridden by user input
thread_id = None
# Create a message object
msg = MIMEText(response_text)
msg["to"] = original_from
msg["from"] = email_address
msg["subject"] = subject
# Add additional recipients if specified
if addn_receipients:
msg["cc"] = ", ".join(addn_receipients)
# Encode the message
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
# Prepare message body
body = {"raw": raw}
# Only add threadId if it exists
if thread_id:
body["threadId"] = thread_id
# Send the message
sent_message = (
service.users()
.messages()
.send(
userId="me",
body=body,
)
.execute()
)
logger.info(f"Email sent: Message ID {sent_message['id']}")
return True
except Exception as e:
logger.error(f"Error sending email: {str(e)}")
return False
@tool(args_schema=SendEmailInput)
def send_email_tool(
email_id: str,
response_text: str,
email_address: str,
additional_recipients: Optional[List[str]] = None
) -> str:
"""
Send a reply to an existing email thread or create a new email in Gmail.
Args:
email_id: Gmail message ID to reply to. This should be a valid Gmail message ID obtained from the fetch_emails_tool.
If creating a new email rather than replying, you can use any string identifier like "NEW_EMAIL".
response_text: Content of the reply or new email
email_address: Current user's email address (the sender)
additional_recipients: Optional additional recipients to include
Returns:
Confirmation message
"""
try:
success = send_email(
email_id,
response_text,
email_address,
addn_receipients=additional_recipients
)
if success:
return f"Email reply sent successfully to message ID: {email_id}"
else:
return "Failed to send email due to an API error"
except Exception as e:
return f"Failed to send email: {str(e)}"
class CheckCalendarInput(BaseModel):
"""
Input schema for the check_calendar_tool.
"""
dates: List[str] = Field(
description="List of dates to check in DD-MM-YYYY format"
)
def get_calendar_events(dates: List[str]) -> str:
"""
Check Google Calendar for events on specified dates.
Args:
dates: List of dates to check in DD-MM-YYYY format
Returns:
Formatted calendar events for the specified dates
"""
if not GMAIL_API_AVAILABLE:
logger.info("Gmail API not available, simulating calendar check")
result = "Calendar events:\n\n"
for date in dates:
result += f"Events for {date}:\n"
result += " - 9:00 AM - 10:00 AM: Team Meeting\n"
result += " - 2:00 PM - 3:00 PM: Project Review\n"
result += "Available slots: 10:00 AM - 2:00 PM, after 3:00 PM\n\n"
return result
try:
# Get Gmail API credentials
creds = get_credentials()
service = build("calendar", "v3", credentials=creds)
result = "Calendar events:\n\n"
for date_str in dates:
# Parse date string (DD-MM-YYYY)
day, month, year = date_str.split("-")
# Format start and end times for the API
start_time = f"{year}-{month}-{day}T00:00:00Z"
end_time = f"{year}-{month}-{day}T23:59:59Z"
# Call the Calendar API
events_result = (
service.events()
.list(
calendarId="primary",
timeMin=start_time,
timeMax=end_time,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
result += f"Events for {date_str}:\n"
if not events:
result += " No events found for this day\n"
result += " Available all day\n\n"
continue
# Process events
busy_slots = []
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
end = event["end"].get("dateTime", event["end"].get("date"))
# Convert to datetime objects
if "T" in start: # dateTime format
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
# Format for display
start_display = start_dt.strftime("%I:%M %p")
end_display = end_dt.strftime("%I:%M %p")
result += f" - {start_display} - {end_display}: {event['summary']}\n"
busy_slots.append((start_dt, end_dt))
else: # all-day event
result += f" - All day: {event['summary']}\n"
busy_slots.append(("all-day", "all-day"))
# Calculate available slots
if "all-day" in [slot[0] for slot in busy_slots]:
result += " Available: No availability (all-day events)\n\n"
else:
# Sort busy slots by start time
busy_slots.sort(key=lambda x: x[0])
# Define working hours (9 AM to 5 PM)
work_start = datetime(
year=int(year),
month=int(month),
day=int(day),
hour=9,
minute=0
)
work_end = datetime(
year=int(year),
month=int(month),
day=int(day),
hour=17,
minute=0
)
# Calculate available slots
available_slots = []
current = work_start
for start, end in busy_slots:
if current < start:
available_slots.append((current, start))
current = max(current, end)
if current < work_end:
available_slots.append((current, work_end))
# Format available slots
if available_slots:
result += " Available: "
for i, (start, end) in enumerate(available_slots):
start_display = start.strftime("%I:%M %p")
end_display = end.strftime("%I:%M %p")
result += f"{start_display} - {end_display}"
if i < len(available_slots) - 1:
result += ", "
result += "\n\n"
else:
result += " Available: No availability during working hours\n\n"
return result
except Exception as e:
logger.error(f"Error checking calendar: {str(e)}")
# Return mock data in case of error
result = "Calendar events (mock due to error):\n\n"
for date in dates:
result += f"Events for {date}:\n"
result += " - 9:00 AM - 10:00 AM: Team Meeting\n"
result += " - 2:00 PM - 3:00 PM: Project Review\n"
result += "Available slots: 10:00 AM - 2:00 PM, after 3:00 PM\n\n"
return result
@tool(args_schema=CheckCalendarInput)
def check_calendar_tool(dates: List[str]) -> str:
"""
Check Google Calendar for events on specified dates.
Args:
dates: List of dates to check in DD-MM-YYYY format
Returns:
Formatted calendar events for the specified dates
"""
try:
events = get_calendar_events(dates)
return events
except Exception as e:
return f"Failed to check calendar: {str(e)}"
class ScheduleMeetingInput(BaseModel):
"""
Input schema for the schedule_meeting_tool.
"""
attendees: List[str] = Field(
description="Email addresses of meeting attendees"
)
title: str = Field(
description="Meeting title/subject"
)
start_time: str = Field(
description="Meeting start time in ISO format (YYYY-MM-DDTHH:MM:SS)"
)
end_time: str = Field(
description="Meeting end time in ISO format (YYYY-MM-DDTHH:MM:SS)"
)
organizer_email: str = Field(
description="Email address of the meeting organizer"
)
timezone: str = Field(
default="America/Los_Angeles",
description="Timezone for the meeting"
)
def send_calendar_invite(
attendees: List[str],
title: str,
start_time: str,
end_time: str,
organizer_email: str,
timezone: str = "America/Los_Angeles"
) -> bool:
"""
Schedule a meeting with Google Calendar and send invites.
Args:
attendees: Email addresses of meeting attendees
title: Meeting title/subject
start_time: Meeting start time in ISO format (YYYY-MM-DDTHH:MM:SS)
end_time: Meeting end time in ISO format (YYYY-MM-DDTHH:MM:SS)
organizer_email: Email address of the meeting organizer
timezone: Timezone for the meeting
Returns:
Success flag (True if meeting was scheduled)
"""
if not GMAIL_API_AVAILABLE:
logger.info("Gmail API not available, simulating calendar invite")
logger.info(f"Would schedule: {title} from {start_time} to {end_time}")
logger.info(f"Attendees: {', '.join(attendees)}")
return True
try:
# Get Gmail API credentials
creds = get_credentials()
service = build("calendar", "v3", credentials=creds)
# Create event details
event = {
"summary": title,
"start": {
"dateTime": start_time,
"timeZone": timezone,
},
"end": {
"dateTime": end_time,
"timeZone": timezone,
},
"attendees": [{"email": email} for email in attendees],
"organizer": {
"email": organizer_email,
"self": True,
},
"reminders": {
"useDefault": True,
},
"sendUpdates": "all", # Send email notifications to attendees
}
# Create the event
event = service.events().insert(calendarId="primary", body=event).execute()
logger.info(f"Meeting created: {event.get('htmlLink')}")
return True
except Exception as e:
logger.error(f"Error scheduling meeting: {str(e)}")
return False
@tool(args_schema=ScheduleMeetingInput)
def schedule_meeting_tool(
attendees: List[str],
title: str,
start_time: str,
end_time: str,
organizer_email: str,
timezone: str = "America/Los_Angeles"
) -> str:
"""
Schedule a meeting with Google Calendar and send invites.
Args:
attendees: Email addresses of meeting attendees
title: Meeting title/subject
start_time: Meeting start time in ISO format (YYYY-MM-DDTHH:MM:SS)
end_time: Meeting end time in ISO format (YYYY-MM-DDTHH:MM:SS)
organizer_email: Email address of the meeting organizer
timezone: Timezone for the meeting (default: America/Los_Angeles)
Returns:
Success or failure message
"""
try:
success = send_calendar_invite(
attendees,
title,
start_time,
end_time,
organizer_email,
timezone
)
if success:
return f"Meeting '{title}' scheduled successfully from {start_time} to {end_time} with {len(attendees)} attendees"
else:
return "Failed to schedule meeting"
except Exception as e:
return f"Error scheduling meeting: {str(e)}"
-23
View File
@@ -1,23 +0,0 @@
"""Tool prompt templates for Gmail integration."""
# Gmail tools prompt for insertion into agent system prompts
GMAIL_TOOLS_PROMPT = """
1. fetch_emails_tool(email_address, minutes_since) - Fetch recent emails from Gmail
2. send_email_tool(email_id, response_text, email_address, additional_recipients) - Send a reply to an email thread
3. check_calendar_tool(dates) - Check Google Calendar availability for specific dates
4. schedule_meeting_tool(attendees, title, start_time, end_time, organizer_email, timezone) - Schedule a meeting and send invites
5. triage_email(ignore, notify, respond) - Triage emails into one of three categories
6. Done - E-mail has been sent
"""
# Combined tools prompt (default + Gmail) for full integration
COMBINED_TOOLS_PROMPT = """
1. fetch_emails_tool(email_address, minutes_since) - Fetch recent emails from Gmail
2. send_email_tool(email_id, response_text, email_address, additional_recipients) - Send a reply to an email thread
3. check_calendar_tool(dates) - Check Google Calendar availability for specific dates
4. schedule_meeting_tool(attendees, title, start_time, end_time, organizer_email, timezone) - Schedule a meeting and send invites
5. write_email(to, subject, content) - Draft emails to specified recipients
6. triage_email(ignore, notify, respond) - Triage emails into one of three categories
7. check_calendar_availability(day) - Check available time slots for a given day
8. Done - E-mail has been sent
"""
-285
View File
@@ -1,285 +0,0 @@
#!/usr/bin/env python
"""
Simple Gmail ingestion script based directly on test.ipynb that works with LangSmith tracing.
This script provides a minimal implementation for ingesting emails to the LangGraph server,
with reliable LangSmith tracing.
"""
import base64
import json
import uuid
import hashlib
import asyncio
import argparse
from pathlib import Path
from datetime import datetime
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langgraph_sdk import get_client
# Setup nest_asyncio for asyncio support
try:
import nest_asyncio
nest_asyncio.apply()
print("Applied nest_asyncio patch")
except ImportError:
print("Warning: nest_asyncio not available. Install with: pip install nest_asyncio")
# Setup paths
_ROOT = Path(__file__).parent.absolute()
_SECRETS_DIR = _ROOT / ".secrets"
TOKEN_PATH = _SECRETS_DIR / "token.json"
def extract_message_part(payload):
"""Extract content from a message part."""
if payload.get("body", {}).get("data"):
# Handle base64 encoded content
data = payload["body"]["data"]
decoded = base64.urlsafe_b64decode(data).decode("utf-8")
return decoded
# Handle multipart messages
if payload.get("parts"):
text_parts = []
for part in payload["parts"]:
# Recursively process parts
content = extract_message_part(part)
if content:
text_parts.append(content)
return "\n".join(text_parts)
return ""
def load_gmail_credentials():
"""Load Gmail credentials from token.json"""
if not TOKEN_PATH.exists():
print(f"Error: Token file not found at {TOKEN_PATH}")
return None
try:
with open(TOKEN_PATH, "r") as f:
token_data = json.load(f)
credentials = Credentials(
token=token_data.get("token"),
refresh_token=token_data.get("refresh_token"),
token_uri=token_data.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=token_data.get("client_id"),
client_secret=token_data.get("client_secret"),
scopes=token_data.get("scopes", ["https://www.googleapis.com/auth/gmail.modify"])
)
return credentials
except Exception as e:
print(f"Error loading credentials: {str(e)}")
return None
def extract_email_data(message):
"""Extract key information from a Gmail message."""
headers = message['payload']['headers']
# Extract key headers
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
from_email = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown Sender')
to_email = next((h['value'] for h in headers if h['name'] == 'To'), 'Unknown Recipient')
date = next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown Date')
# Extract message content
content = extract_message_part(message['payload'])
# Create email data object
email_data = {
"from_email": from_email,
"to_email": to_email,
"subject": subject,
"page_content": content,
"id": message['id'],
"thread_id": message['threadId'],
"send_time": date
}
return email_data
async def ingest_email_to_langgraph(email_data, graph_name, url="http://127.0.0.1:2024"):
"""Ingest an email to LangGraph."""
# Connect to LangGraph server
client = get_client(url=url)
# Create a consistent UUID for the thread
raw_thread_id = email_data["thread_id"]
thread_id = str(
uuid.UUID(hex=hashlib.md5(raw_thread_id.encode("UTF-8")).hexdigest())
)
print(f"Gmail thread ID: {raw_thread_id} → LangGraph thread ID: {thread_id}")
try:
# Try to get existing thread info
thread_info = await client.threads.get(thread_id)
print(f"Found existing thread: {thread_id}")
except Exception as e:
# If thread doesn't exist, create it
print(f"Creating new thread: {thread_id}")
thread_info = await client.threads.create(thread_id=thread_id)
# Update thread metadata with current email ID
await client.threads.update(thread_id, metadata={"email_id": email_data["id"]})
# Create a run for this email
print(f"Creating run for thread {thread_id} with graph {graph_name}")
run = await client.runs.create(
thread_id,
graph_name,
input={"email_input": {
"from": email_data["from_email"],
"to": email_data["to_email"],
"subject": email_data["subject"],
"body": email_data["page_content"],
"id": email_data["id"]
}},
multitask_strategy="rollback",
)
print(f"Run created successfully")
print(f"View in LangGraph Studio: http://127.0.0.1:2024/threads/{thread_id}")
return thread_id, run
async def fetch_and_process_emails(args):
"""Fetch emails from Gmail and process them through LangGraph."""
# Load Gmail credentials
credentials = load_gmail_credentials()
if not credentials:
print("Failed to load Gmail credentials")
return 1
# Build Gmail service
service = build("gmail", "v1", credentials=credentials)
# Process emails
processed_count = 0
try:
# Get messages from the specified email address
email_address = args.email
# Construct Gmail search query
query = f"to:{email_address} OR from:{email_address}"
# Add time constraint if specified
if args.minutes_since > 0:
# Calculate timestamp for filtering
from datetime import timedelta
after = int((datetime.now() - timedelta(minutes=args.minutes_since)).timestamp())
query += f" after:{after}"
# Only include unread emails unless include_read is True
if not args.include_read:
query += " is:unread"
print(f"Gmail search query: {query}")
# Execute the search
results = service.users().messages().list(userId="me", q=query).execute()
messages = results.get("messages", [])
if not messages:
print("No emails found matching the criteria")
return 0
print(f"Found {len(messages)} emails")
# Process each email
for i, message_info in enumerate(messages):
# Stop early if requested
if args.early and i > 0:
print(f"Early stop after processing {i} emails")
break
# Check if we should reprocess this email
if not args.rerun:
# TODO: Add check for already processed emails
pass
# Get the full message
message = service.users().messages().get(userId="me", id=message_info["id"]).execute()
# Extract email data
email_data = extract_email_data(message)
print(f"\nProcessing email {i+1}/{len(messages)}:")
print(f"From: {email_data['from_email']}")
print(f"Subject: {email_data['subject']}")
# Ingest to LangGraph
thread_id, run = await ingest_email_to_langgraph(
email_data,
args.graph_name,
url=args.url
)
processed_count += 1
print(f"\nProcessed {processed_count} emails successfully")
return 0
except Exception as e:
print(f"Error processing emails: {str(e)}")
return 1
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Simple Gmail ingestion for LangGraph with reliable tracing")
parser.add_argument(
"--email",
type=str,
required=True,
help="Email address to fetch messages for"
)
parser.add_argument(
"--minutes-since",
type=int,
default=120,
help="Only retrieve emails newer than this many minutes"
)
parser.add_argument(
"--graph-name",
type=str,
default="email_assistant_hitl_memory",
help="Name of the LangGraph to use"
)
parser.add_argument(
"--url",
type=str,
default="http://127.0.0.1:2024",
help="URL of the LangGraph deployment"
)
parser.add_argument(
"--early",
action="store_true",
help="Early stop after processing one email"
)
parser.add_argument(
"--include-read",
action="store_true",
help="Include emails that have already been read"
)
parser.add_argument(
"--rerun",
action="store_true",
help="Process the same emails again even if already processed"
)
parser.add_argument(
"--skip-filters",
action="store_true",
help="Skip filtering of emails"
)
return parser.parse_args()
if __name__ == "__main__":
# Get command line arguments
args = parse_args()
# Run the script
exit(asyncio.run(fetch_and_process_emails(args)))
-87
View File
@@ -1,87 +0,0 @@
#!/usr/bin/env python
"""
Setup script for Gmail API integration.
This script handles the OAuth flow for Gmail API access by:
1. Creating a .secrets directory if it doesn't exist
2. Using credentials from .secrets/secrets.json to authenticate
3. Opening a browser window for user authentication
4. Storing the access token in .secrets/token.json
"""
import os
import sys
import json
from pathlib import Path
# Add project root to sys.path for imports to work correctly
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../")))
# Import required Google libraries
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2.credentials import Credentials
def main():
"""Run Gmail authentication setup."""
# Create .secrets directory
secrets_dir = Path(__file__).parent.absolute() / ".secrets"
secrets_dir.mkdir(parents=True, exist_ok=True)
# Check for secrets.json
secrets_path = secrets_dir / "secrets.json"
if not secrets_path.exists():
print(f"Error: Client secrets file not found at {secrets_path}")
print("Please download your OAuth client ID JSON from Google Cloud Console")
print("and save it as .secrets/secrets.json")
return 1
print("Starting Gmail API authentication flow...")
print("A browser window will open for you to authorize access.")
# This will trigger the OAuth flow and create token.json
try:
# Define the scopes we need
SCOPES = [
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/calendar'
]
# Load client secrets
with open(secrets_path, 'r') as f:
client_config = json.load(f)
# Create the flow using the client_secrets.json format
flow = InstalledAppFlow.from_client_secrets_file(
str(secrets_path),
SCOPES
)
# Run the OAuth flow
credentials = flow.run_local_server(port=0)
# Save the credentials to token.json
token_path = secrets_dir / "token.json"
token_data = {
'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes,
'universe_domain': 'googleapis.com',
'account': '',
'expiry': credentials.expiry.isoformat() + "Z"
}
with open(token_path, 'w') as token_file:
json.dump(token_data, token_file)
print("\nAuthentication successful!")
print(f"Access token stored at {token_path}")
return 0
except Exception as e:
print(f"Authentication failed: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())
-243
View File
@@ -1,243 +0,0 @@
from typing import List, Any
import io
import sys
import json
def format_email_markdown(subject, author, to, email_thread):
"""Format email details into a nicely formatted markdown string for display"""
return f"""
**Subject**: {subject}
**From**: {author}
**To**: {to}
{email_thread}
---
"""
def format_for_display(state, tool_call):
"""Format content for display in Agent Inbox
Args:
state: Current message state
tool_call: The tool call to format
"""
# Initialize empty display
display = ""
# Add tool call information
if tool_call["name"] == "write_email":
display += f"""# Email Draft
**To**: {tool_call["args"].get("to")}
**Subject**: {tool_call["args"].get("subject")}
{tool_call["args"].get("content")}
"""
elif tool_call["name"] == "schedule_meeting":
display += f"""# Calendar Invite
**Meeting**: {tool_call["args"].get("subject")}
**Attendees**: {', '.join(tool_call["args"].get("attendees"))}
**Duration**: {tool_call["args"].get("duration_minutes")} minutes
**Day**: {tool_call["args"].get("preferred_day")}
"""
elif tool_call["name"] == "Question":
# Special formatting for questions to make them clear
display += f"""# Question for User
{tool_call["args"].get("content")}
"""
else:
# Generic format for other tools
display += f"""# Tool Call: {tool_call["name"]}
Arguments:"""
# Check if args is a dictionary or string
if isinstance(tool_call["args"], dict):
display += f"\n{json.dumps(tool_call['args'], indent=2)}\n"
else:
display += f"\n{tool_call['args']}\n"
return display
def parse_email(email_input: dict) -> tuple[str, str, str, str]:
"""Parse an email input dictionary, supporting multiple schemas.
Supports multiple schema formats:
- Standard schema (author, to, subject, email_thread)
- Gmail-specific schema (from_email, to_email, subject, page_content)
- Direct API schema (from, to, subject, body)
Args:
email_input (dict): Dictionary containing email fields in any of these formats:
Standard schema:
- author: Sender's name and email
- to: Recipient's name and email
- subject: Email subject line
- email_thread: Full email content
Gmail schema:
- from_email: Sender's email
- to_email: Recipient's email
- subject: Email subject line
- page_content: Full email content
- id: Gmail message ID
- thread_id: Gmail thread ID
- send_time: Time the email was sent
Direct API schema:
- from: Sender's email
- to: Recipient's email
- subject: Email subject line
- body: Full email content
Returns:
tuple[str, str, str, str]: Tuple containing:
- author: Sender's name and email
- to: Recipient's name and email
- subject: Email subject line
- email_thread: Full email content
"""
# Detect schema based on keys present in the input
if "author" in email_input and "email_thread" in email_input:
# Standard schema
return (
email_input["author"],
email_input["to"],
email_input["subject"],
email_input["email_thread"],
)
elif "from_email" in email_input and "page_content" in email_input:
# Gmail schema
return (
email_input["from_email"],
email_input["to_email"],
email_input["subject"],
email_input["page_content"],
)
elif "from" in email_input and "body" in email_input:
# Direct API schema
return (
email_input["from"],
email_input["to"],
email_input["subject"],
email_input["body"],
)
else:
# Unknown schema, try to handle gracefully by looking for equivalent fields
author = (
email_input.get("author") or
email_input.get("from_email") or
email_input.get("from") or
"Unknown Sender"
)
to = (
email_input.get("to") or
email_input.get("to_email") or
"Unknown Recipient"
)
subject = email_input.get("subject") or "No Subject"
content = (
email_input.get("email_thread") or
email_input.get("page_content") or
email_input.get("body") or
email_input.get("content") or
"No content available"
)
return (author, to, subject, content)
def extract_message_content(message) -> str:
"""Extract content from different message types as clean string.
Args:
message: A message object (HumanMessage, AIMessage, ToolMessage)
Returns:
str: Extracted content as clean string
"""
content = message.content
# Check for recursion marker in string
if isinstance(content, str) and '<Recursion on AIMessage with id=' in content:
return "[Recursive content]"
# Handle string content
if isinstance(content, str):
return content
# Handle list content (AIMessage format)
elif isinstance(content, list):
text_parts = []
for item in content:
if isinstance(item, dict) and 'text' in item:
text_parts.append(item['text'])
return "\n".join(text_parts)
# Don't try to handle recursion to avoid infinite loops
# Just return string representation instead
return str(content)
def format_few_shot_examples(examples):
"""Format examples into a readable string representation.
Args:
examples (List[Item]): List of example items from the vector store, where each item
contains a value string with the format:
'Email: {...} Original routing: {...} Correct routing: {...}'
Returns:
str: A formatted string containing all examples, with each example formatted as:
Example:
Email: {email_details}
Original Classification: {original_routing}
Correct Classification: {correct_routing}
---
"""
formatted = []
for example in examples:
# Parse the example value string into components
email_part = example.value.split('Original routing:')[0].strip()
original_routing = example.value.split('Original routing:')[1].split('Correct routing:')[0].strip()
correct_routing = example.value.split('Correct routing:')[1].strip()
# Format into clean string
formatted_example = f"""Example:
Email: {email_part}
Original Classification: {original_routing}
Correct Classification: {correct_routing}
---"""
formatted.append(formatted_example)
return "\n".join(formatted)
def extract_tool_calls(messages: List[Any]) -> List[str]:
"""Extract tool call names from messages, safely handling messages without tool_calls."""
tool_call_names = []
for message in messages:
# Check if message is a dict and has tool_calls
if isinstance(message, dict) and message.get("tool_calls"):
tool_call_names.extend([call["name"].lower() for call in message["tool_calls"]])
# Check if message is an object with tool_calls attribute
elif hasattr(message, "tool_calls") and message.tool_calls:
tool_call_names.extend([call["name"].lower() for call in message.tool_calls])
return tool_call_names
def format_messages_string(messages: List[Any]) -> str:
"""Format messages into a single string for analysis."""
# Redirect stdout to capture output
old_stdout = sys.stdout
new_stdout = io.StringIO()
sys.stdout = new_stdout
# Run the pretty_print calls
for m in messages:
m.pretty_print()
# Get the captured output
output = new_stdout.getvalue()
# Restore original stdout
sys.stdout = old_stdout
return output
+35
View File
@@ -0,0 +1,35 @@
import { getEmailAssistant } from './email_assistant.js';
import { EmailData } from '../lib/schemas.js';
async function main() {
console.log('Starting email assistant...');
try {
// Initialize the email assistant
const agent = await getEmailAssistant();
// Example email data
const exampleEmail: EmailData = {
id: "123456",
thread_id: "thread_123456",
from_email: "example@example.com",
to_email: "assistant@yourcompany.com",
subject: "Meeting Request Tomorrow",
page_content: "Hi there,\n\nI was wondering if we could schedule a meeting tomorrow to discuss the project.\n\nThanks,\nExample User",
send_time: new Date().toISOString()
};
// Invoke the agent with example data
console.log('Processing email...');
const result = await agent.invoke({
email_input: exampleEmail
});
console.log('Processing complete!');
console.log('Result:', JSON.stringify(result, null, 2));
} catch (error) {
console.error('Error running email assistant:', error);
}
}
main().catch(console.error);
+5 -27
View File
@@ -1,4 +1,3 @@
import Image from "next/image";
// LangChain imports for chat models
import { initChatModel } from "langchain/chat_models/universal";
@@ -27,7 +26,7 @@ import { ToolNode } from "@langchain/langgraph/prebuilt";
import {
getTools,
getToolsByName
} from "../lib/tools/base";
} from "../lib/tools/base.js";
import {
agentSystemPromptBaseline,
triageSystemPrompt,
@@ -40,40 +39,19 @@ import {
defaultCalPreferences,
defaultTriageInstructions,
AGENT_TOOLS_PROMPT
} from "../lib/prompts";
} from "../lib/prompts.js";
import {
RouterSchema,
RouterOutput,
EmailData,
StateInput,
State,
} from "../lib/schemas";
} from "../lib/schemas.js";
import {
parseEmail,
formatEmailMarkdown
} from "../lib/utils";
} from "../lib/utils.js";
export default function Home() {
return (
<div className="grid grid-rows-[20px_1fr_20px] items-center justify-items-center min-h-screen p-8 pb-20 gap-16 sm:p-20 font-[family-name:var(--font-geist-sans)]">
<h1>Hello World</h1>
<div className="flex flex-col w-full max-w-3xl gap-8">
<h2 className="text-2xl font-bold">Email Assistant</h2>
<p>This page contains the TypeScript implementation of the email assistant workflow.</p>
<div className="mt-4">
<a
href="/test-email-assistant"
className="px-4 py-2 bg-blue-600 text-white rounded hover:bg-blue-700"
>
Test Email Assistant
</a>
</div>
</div>
</div>
);
}
// Create the email assistant workflow
export const createEmailAssistant = async () => {
@@ -140,7 +118,7 @@ export const createEmailAssistant = async () => {
if (isAIMessage(lastMessage) && lastMessage.tool_calls && lastMessage.tool_calls.length > 0) {
// Check if any tool call is the "Done" tool
if (lastMessage.tool_calls.some(toolCall => toolCall.name === "Done")) {
if (lastMessage.tool_calls.some((toolCall: ToolCall) => toolCall.name === "Done")) {
return END;
}
return "environment";
@@ -26,7 +26,7 @@ import { ToolNode } from "@langchain/langgraph/prebuilt";
import {
getTools,
getToolsByName
} from "../../lib/tools/base";
} from "../lib/tools/base.js";
import {
HITL_TOOLS_PROMPT,
triageSystemPrompt,
@@ -36,19 +36,19 @@ import {
defaultResponsePreferences,
defaultCalPreferences,
defaultTriageInstructions
} from "../../lib/prompts";
} from "../lib/prompts.js";
import {
RouterSchema,
RouterOutput,
EmailData,
StateInput,
State,
} from "../../lib/schemas";
} from "../lib/schemas.js";
import {
parseEmail,
formatEmailMarkdown,
formatForDisplay
} from "../../lib/utils";
} from "../lib/utils.js";
/**
* Create the Human-in-the-Loop Email Assistant
+10 -15
View File
@@ -1,27 +1,22 @@
{
"compilerOptions": {
"target": "ES2017",
"lib": ["dom", "dom.iterable", "esnext"],
"target": "ES2020",
"lib": ["ES2020", "DOM"],
"allowJs": true,
"skipLibCheck": true,
"strict": true,
"noEmit": true,
"forceConsistentCasingInFileNames": true,
"esModuleInterop": true,
"module": "esnext",
"moduleResolution": "bundler",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"resolveJsonModule": true,
"isolatedModules": true,
"jsx": "preserve",
"incremental": true,
"plugins": [
{
"name": "next"
}
],
"baseUrl": ".",
"paths": {
"@/*": ["./*"]
}
},
"outDir": "dist"
},
"include": ["next-env.d.ts", "**/*.ts", "**/*.tsx", ".next/types/**/*.ts"],
"exclude": ["node_modules"]
"include": ["**/*.ts"],
"exclude": ["node_modules", "dist"]
}