Files
2025-07-10 00:22:24 -07:00

157 lines
5.2 KiB
Python

from typing import Literal
from email_assistant.tools import get_tools, get_tools_by_name
from email_assistant.tools.default.prompt_templates import AGENT_TOOLS_PROMPT
from email_assistant.prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt, default_background, default_triage_instructions, default_response_preferences, default_cal_preferences
from email_assistant.schemas import State, RouterSchema, StateInput
from email_assistant.utils import llm, parse_email, format_email_markdown
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from dotenv import load_dotenv
load_dotenv(".env")
# Get tools
tools = get_tools()
tools_by_name = get_tools_by_name(tools)
# Initialize the LLM for use with router / structured output
llm_router = llm.with_structured_output(RouterSchema)
# Initialize the LLM, enforcing tool use (of any available tools) for agent
llm_with_tools = llm.bind_tools(tools, tool_choice="any")
# Nodes
def llm_call(state: State):
"""LLM decides whether to call a tool or not"""
return {
"messages": [
llm_with_tools.invoke(
[
{"role": "system", "content": agent_system_prompt.format(
tools_prompt=AGENT_TOOLS_PROMPT,
background=default_background,
response_preferences=default_response_preferences,
cal_preferences=default_cal_preferences)
},
]
+ state["messages"]
)
]
}
def tool_node(state: State):
"""Performs the tool call"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append({"role": "tool", "content" : observation, "tool_call_id": tool_call["id"]})
return {"messages": result}
# Conditional edge function
def should_continue(state: State) -> Literal["Action", "__end__"]:
"""Route to Action, or end if Done tool called"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
for tool_call in last_message.tool_calls:
if tool_call["name"] == "Done":
return END
else:
return "Action"
# Build workflow
agent_builder = StateGraph(State)
# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("environment", tool_node)
# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
{
# Name returned by should_continue : Name of next node to visit
"Action": "environment",
END: END,
},
)
agent_builder.add_edge("environment", "llm_call")
# Compile the agent
agent = agent_builder.compile()
def triage_router(state: State) -> Command[Literal["response_agent", "__end__"]]:
"""Analyze email content to decide if we should respond, notify, or ignore.
The triage step prevents the assistant from wasting time on:
- Marketing emails and spam
- Company-wide announcements
- Messages meant for other teams
"""
author, to, subject, email_thread = parse_email(state["email_input"])
system_prompt = triage_system_prompt.format(
background=default_background,
triage_instructions=default_triage_instructions
)
user_prompt = triage_user_prompt.format(
author=author, to=to, subject=subject, email_thread=email_thread
)
# Create email markdown for Agent Inbox in case of notification
email_markdown = format_email_markdown(subject, author, to, email_thread)
# Run the router LLM
result = llm_router.invoke(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
)
# Decision
classification = result.classification
if classification == "respond":
print("📧 Classification: RESPOND - This email requires a response")
goto = "response_agent"
# Add the email to the messages
update = {
"classification_decision": result.classification,
"messages": [{"role": "user",
"content": f"Respond to the email: {email_markdown}"
}],
}
elif result.classification == "ignore":
print("🚫 Classification: IGNORE - This email can be safely ignored")
update = {
"classification_decision": result.classification,
}
goto = END
elif result.classification == "notify":
# If real life, this would do something else
print("🔔 Classification: NOTIFY - This email contains important information")
update = {
"classification_decision": result.classification,
}
goto = END
else:
raise ValueError(f"Invalid classification: {result.classification}")
return Command(goto=goto, update=update)
# Build workflow
overall_workflow = (
StateGraph(State, input=StateInput)
.add_node(triage_router)
.add_node("response_agent", agent)
.add_edge(START, "triage_router")
)
email_assistant = overall_workflow.compile()