feat: add streaming for choreography

This commit is contained in:
Marcus Schiesser
2024-09-04 15:05:28 +07:00
committed by Marcus Schiesser
parent 921a748904
commit 395afea8fa
4 changed files with 16 additions and 11 deletions
+1 -1
View File
@@ -96,7 +96,7 @@ class FunctionCallingAgent(Workflow):
self.memory.put(system_msg)
# set streaming
ctx.data["streaming"] = ev.streaming or False
ctx.data["streaming"] = getattr(ev, "streaming", False)
# get user input
user_input = ev.input
+3 -2
View File
@@ -15,7 +15,6 @@ chat_router = r = APIRouter()
logger = logging.getLogger("uvicorn")
# streaming endpoint - delete if not needed
@r.post("")
async def chat(
request: Request,
@@ -31,7 +30,9 @@ async def chat(
# params = data.data or {}
agent: Workflow = create_agent(chat_history=messages)
task = asyncio.create_task(agent.run(input=last_message_content))
task = asyncio.create_task(
agent.run(input=last_message_content, streaming=True)
)
return VercelStreamResponse(request, task, agent.stream_events, data)
except Exception as e:
+3 -3
View File
@@ -1,11 +1,11 @@
from typing import List
from typing import List, Optional
from app.agents.single import FunctionCallingAgent
from app.agents.multi import AgentCallingAgent
from app.examples.researcher import create_researcher
from llama_index.core.chat_engine.types import ChatMessage
def create_choreography(chat_history: List[ChatMessage]):
def create_choreography(chat_history: Optional[List[ChatMessage]] = None):
researcher = create_researcher(chat_history)
reviewer = FunctionCallingAgent(
name="reviewer",
@@ -19,7 +19,7 @@ def create_choreography(chat_history: List[ChatMessage]):
role="expert in writing blog posts",
system_prompt="""You are an expert in writing blog posts. You are given a task to write a blog post. Before starting to write the post, consult the researcher agent to get the information you need. Don't make up any information yourself.
After creating a draft for the post, send it to the reviewer agent to receive some feedback and make sure to incorporate the feedback from the reviewer.
You can consult the reviewer and researcher multiple times. Only finish the task once the reviewer is satisfied.""",
You can consult the reviewer and researcher maximal two times. Your output should just contain the blog post.""",
# TODO: add chat_history support to AgentCallingAgent
# chat_history=chat_history,
)
+9 -5
View File
@@ -52,6 +52,8 @@ class ReviewEvent(Event):
class BlogPostWorkflow(Workflow):
@step()
async def start(self, ctx: Context, ev: StartEvent) -> ResearchEvent:
# set streaming
ctx.data["streaming"] = getattr(ev, "streaming", False)
# start the workflow with researching about a topic
ctx.data["task"] = ev.input
return ResearchEvent(input=f"Research for this task: {ev.input}")
@@ -70,19 +72,21 @@ class BlogPostWorkflow(Workflow):
async def write(
self, ctx: Context, ev: WriteEvent, writer: FunctionCallingAgent
) -> ReviewEvent | StopEvent:
MAX_ATTEMPTS = 3
MAX_ATTEMPTS = 2
ctx.data["attempts"] = ctx.data.get("attempts", 0) + 1
too_many_attempts = ctx.data["attempts"] >= MAX_ATTEMPTS
too_many_attempts = ctx.data["attempts"] > MAX_ATTEMPTS
if too_many_attempts:
ctx.write_event_to_stream(
AgentRunEvent(
name=writer.name,
msg=f"Too many attempts ({ctx.data['attempts']}) to write the blog post. Proceeding with the current version.",
msg=f"Too many attempts ({MAX_ATTEMPTS}) to write the blog post. Proceeding with the current version.",
)
)
if ev.is_good or too_many_attempts:
# too many attempts or the blog post is good - stream the final response
result = await self.run_agent(ctx, writer, ev.input, streaming=True)
# too many attempts or the blog post is good - stream final response if requested
result = await self.run_agent(
ctx, writer, ev.input, streaming=ctx.data["streaming"]
)
return StopEvent(result=result)
result: AgentRunResult = await self.run_agent(ctx, writer, ev.input)
ctx.data["result"] = result