mirror of
https://github.com/langchain-ai/ambient-agent-101.git
synced 2026-07-01 21:14:02 -04:00
157 lines
5.2 KiB
Python
157 lines
5.2 KiB
Python
from typing import Literal
|
|
|
|
from email_assistant.tools import get_tools, get_tools_by_name
|
|
from email_assistant.tools.default.prompt_templates import AGENT_TOOLS_PROMPT
|
|
from email_assistant.prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt, default_background, default_triage_instructions, default_response_preferences, default_cal_preferences
|
|
from email_assistant.schemas import State, RouterSchema, StateInput
|
|
from email_assistant.utils import llm, parse_email, format_email_markdown
|
|
|
|
from langgraph.graph import StateGraph, START, END
|
|
from langgraph.types import Command
|
|
from dotenv import load_dotenv
|
|
load_dotenv(".env")
|
|
|
|
# Get tools
|
|
tools = get_tools()
|
|
tools_by_name = get_tools_by_name(tools)
|
|
|
|
# Initialize the LLM for use with router / structured output
|
|
llm_router = llm.with_structured_output(RouterSchema)
|
|
|
|
# Initialize the LLM, enforcing tool use (of any available tools) for agent
|
|
llm_with_tools = llm.bind_tools(tools, tool_choice="any")
|
|
|
|
# Nodes
|
|
def llm_call(state: State):
|
|
"""LLM decides whether to call a tool or not"""
|
|
|
|
return {
|
|
"messages": [
|
|
llm_with_tools.invoke(
|
|
[
|
|
{"role": "system", "content": agent_system_prompt.format(
|
|
tools_prompt=AGENT_TOOLS_PROMPT,
|
|
background=default_background,
|
|
response_preferences=default_response_preferences,
|
|
cal_preferences=default_cal_preferences)
|
|
},
|
|
|
|
]
|
|
+ state["messages"]
|
|
)
|
|
]
|
|
}
|
|
|
|
def tool_node(state: State):
|
|
"""Performs the tool call"""
|
|
|
|
result = []
|
|
for tool_call in state["messages"][-1].tool_calls:
|
|
tool = tools_by_name[tool_call["name"]]
|
|
observation = tool.invoke(tool_call["args"])
|
|
result.append({"role": "tool", "content" : observation, "tool_call_id": tool_call["id"]})
|
|
return {"messages": result}
|
|
|
|
# Conditional edge function
|
|
def should_continue(state: State) -> Literal["Action", "__end__"]:
|
|
"""Route to Action, or end if Done tool called"""
|
|
messages = state["messages"]
|
|
last_message = messages[-1]
|
|
if last_message.tool_calls:
|
|
for tool_call in last_message.tool_calls:
|
|
if tool_call["name"] == "Done":
|
|
return END
|
|
else:
|
|
return "Action"
|
|
|
|
# Build workflow
|
|
agent_builder = StateGraph(State)
|
|
|
|
# Add nodes
|
|
agent_builder.add_node("llm_call", llm_call)
|
|
agent_builder.add_node("environment", tool_node)
|
|
|
|
# Add edges to connect nodes
|
|
agent_builder.add_edge(START, "llm_call")
|
|
agent_builder.add_conditional_edges(
|
|
"llm_call",
|
|
should_continue,
|
|
{
|
|
# Name returned by should_continue : Name of next node to visit
|
|
"Action": "environment",
|
|
END: END,
|
|
},
|
|
)
|
|
agent_builder.add_edge("environment", "llm_call")
|
|
|
|
# Compile the agent
|
|
agent = agent_builder.compile()
|
|
|
|
def triage_router(state: State) -> Command[Literal["response_agent", "__end__"]]:
|
|
"""Analyze email content to decide if we should respond, notify, or ignore.
|
|
|
|
The triage step prevents the assistant from wasting time on:
|
|
- Marketing emails and spam
|
|
- Company-wide announcements
|
|
- Messages meant for other teams
|
|
"""
|
|
author, to, subject, email_thread = parse_email(state["email_input"])
|
|
system_prompt = triage_system_prompt.format(
|
|
background=default_background,
|
|
triage_instructions=default_triage_instructions
|
|
)
|
|
|
|
user_prompt = triage_user_prompt.format(
|
|
author=author, to=to, subject=subject, email_thread=email_thread
|
|
)
|
|
|
|
# Create email markdown for Agent Inbox in case of notification
|
|
email_markdown = format_email_markdown(subject, author, to, email_thread)
|
|
|
|
# Run the router LLM
|
|
result = llm_router.invoke(
|
|
[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt},
|
|
]
|
|
)
|
|
|
|
# Decision
|
|
classification = result.classification
|
|
|
|
if classification == "respond":
|
|
print("📧 Classification: RESPOND - This email requires a response")
|
|
goto = "response_agent"
|
|
# Add the email to the messages
|
|
update = {
|
|
"classification_decision": result.classification,
|
|
"messages": [{"role": "user",
|
|
"content": f"Respond to the email: {email_markdown}"
|
|
}],
|
|
}
|
|
elif result.classification == "ignore":
|
|
print("🚫 Classification: IGNORE - This email can be safely ignored")
|
|
update = {
|
|
"classification_decision": result.classification,
|
|
}
|
|
goto = END
|
|
elif result.classification == "notify":
|
|
# If real life, this would do something else
|
|
print("🔔 Classification: NOTIFY - This email contains important information")
|
|
update = {
|
|
"classification_decision": result.classification,
|
|
}
|
|
goto = END
|
|
else:
|
|
raise ValueError(f"Invalid classification: {result.classification}")
|
|
return Command(goto=goto, update=update)
|
|
|
|
# Build workflow
|
|
overall_workflow = (
|
|
StateGraph(State, input=StateInput)
|
|
.add_node(triage_router)
|
|
.add_node("response_agent", agent)
|
|
.add_edge(START, "triage_router")
|
|
)
|
|
|
|
email_assistant = overall_workflow.compile() |