feat(ai): Show recordings filters from within session_summarization (#41466)

This commit is contained in:
Michael Matloka
2025-11-13 20:31:31 +01:00
committed by GitHub
parent de43b1efa4
commit 80356b50b0

View File

@@ -59,12 +59,24 @@ class SessionSummarizationNode(AssistantNode):
self._session_search = _SessionSearch(self)
self._session_summarizer = _SessionSummarizer(self)
async def _stream_progress(self, progress_message: str) -> None:
def _stream_progress(self, progress_message: str) -> None:
"""Push summarization progress as reasoning messages"""
content = prepare_reasoning_progress_message(progress_message)
if content:
self.dispatcher.update(content)
def _stream_filters(self, filters: MaxRecordingUniversalFilters) -> None:
"""Stream filters to the user"""
self.dispatcher.message(
AssistantToolCallMessage(
content="",
ui_payload={"search_session_recordings": filters.model_dump(exclude_none=True)},
# Randomized tool call ID, as we don't want this to be THE result of the actual session summarization tool call
# - it's OK because this is only dispatched ephemerally, so the tool message doesn't get added to the state
tool_call_id=str(uuid4()),
)
)
async def _stream_notebook_content(self, content: dict, state: AssistantState, partial: bool = True) -> None:
"""Stream TipTap content directly to a notebook if notebook_id is present in state."""
# Check if we have a notebook_id in the state
@@ -384,6 +396,7 @@ class _SessionSearch:
root_tool_call_id=None,
)
# Use filters when generated successfully
self._node._stream_filters(filter_generation_result)
replay_filters = self._convert_max_filters_to_recordings_query(filter_generation_result)
# Query the filters to get session ids
query_limit = state.session_summarization_limit
@@ -427,13 +440,13 @@ class _SessionSummarizer:
)
completed += 1
# Update the user on the progress
await self._node._stream_progress(progress_message=f"Watching sessions ({completed}/{total})")
self._node._stream_progress(progress_message=f"Watching sessions ({completed}/{total})")
return result
# Run all tasks concurrently
tasks = [_summarize(sid) for sid in session_ids]
summaries = await asyncio.gather(*tasks)
await self._node._stream_progress(progress_message=f"Generating a summary, almost there")
self._node._stream_progress(progress_message=f"Generating a summary, almost there")
# Stringify, as chat doesn't need full JSON to be context-aware, while providing it could overload the context
stringified_summaries = []
for summary in summaries:
@@ -481,7 +494,7 @@ class _SessionSummarizer:
# Update intermediate state based on step enum (no content, as it's just a status message)
self._intermediate_state.update_step_progress(content=None, step=step)
# Status message - stream to user
await self._node._stream_progress(progress_message=data)
self._node._stream_progress(progress_message=data)
# Notebook intermediate data update messages
elif update_type == SessionSummaryStreamUpdate.NOTEBOOK_UPDATE:
if not isinstance(data, dict):
@@ -541,7 +554,7 @@ class _SessionSummarizer:
base_message = f"Found sessions ({len(session_ids)})"
if len(session_ids) <= GROUP_SUMMARIES_MIN_SESSIONS:
# If small amount of sessions - there are no patterns to extract, so summarize them individually and return as is
await self._node._stream_progress(
self._node._stream_progress(
progress_message=f"{base_message}. We will do a quick summary, as the scope is small",
)
summaries_content = await self._summarize_sessions_individually(session_ids=session_ids)
@@ -556,7 +569,7 @@ class _SessionSummarizer:
state.notebook_short_id = notebook.short_id
# For large groups, process in detail, searching for patterns
# TODO: Allow users to define the pattern themselves (or rather catch it from the query)
await self._node._stream_progress(
self._node._stream_progress(
progress_message=f"{base_message}. We will analyze in detail, and store the report in a notebook",
)
summaries_content = await self._summarize_sessions_as_group(