mirror of
https://github.com/langchain-ai/voice-agents-tracing.git
synced 2026-07-01 19:54:16 -04:00
rename
This commit is contained in:
@@ -84,10 +84,10 @@ python pipecat-test/pipecat_langsmith.py
|
||||
|
||||
```
|
||||
audio/
|
||||
├── livekit-test/
|
||||
├── livekit/
|
||||
│ ├── langsmith_processor.py
|
||||
│ └── livekitagents_langsmith.py
|
||||
├── pipecat-test/
|
||||
├── pipecat/
|
||||
│ ├── langsmith_processor.py
|
||||
│ ├── pipecat_langsmith.py
|
||||
│ └── recordings/
|
||||
|
||||
@@ -1,617 +0,0 @@
|
||||
"""
|
||||
LangSmith span processor for LiveKit Agents.
|
||||
|
||||
Enriches OpenTelemetry spans from LiveKit Agents with LangSmith-compatible attributes
|
||||
for proper conversation tracking and visualization.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from typing import Optional
|
||||
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
|
||||
# Optional verbose logging for local debugging
|
||||
DEBUG = os.getenv("LANGSMITH_PROCESSOR_DEBUG", "false").lower() in ("true", "1", "yes")
|
||||
logger = logging.getLogger("langsmith_processor")
|
||||
if DEBUG and not logger.handlers:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter('%(asctime)s [LANGSMITH] %(levelname)s %(message)s'))
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class LangSmithSpanProcessor(SpanProcessor):
|
||||
"""
|
||||
Custom OpenTelemetry span processor that enriches LiveKit Agents spans with LangSmith-compatible attributes.
|
||||
This enables proper conversation tracking and message visualization in LangSmith's UI.
|
||||
"""
|
||||
|
||||
def __init__(self, downstream_processor: Optional[SpanProcessor] = None):
|
||||
super().__init__()
|
||||
if downstream_processor is None:
|
||||
downstream_processor = BatchSpanProcessor(OTLPSpanExporter())
|
||||
self.downstream = downstream_processor
|
||||
# Track conversation messages across spans for proper LangSmith grouping
|
||||
self.conversation_messages = {} # trace_id -> list of messages
|
||||
self.trace_to_conversation_id = {} # trace_id -> conversation_id
|
||||
# Hold root/job spans until conversation data is ready
|
||||
self.deferred_job_spans = {} # trace_id -> ReadableSpan
|
||||
|
||||
def on_start(self, span: ReadableSpan, parent_context=None) -> None:
|
||||
if self.downstream:
|
||||
self.downstream.on_start(span, parent_context)
|
||||
|
||||
def on_end(self, span: ReadableSpan) -> None:
|
||||
"""
|
||||
Enriches spans with LangSmith-compatible attributes before they're exported.
|
||||
Maps LiveKit Agents span types to LangSmith's expected format.
|
||||
"""
|
||||
# Always log that we're processing a span (even without DEBUG mode)
|
||||
# Use print to stderr to ensure it's visible
|
||||
import sys
|
||||
print(f"[LANGSMITH-PROCESSOR] Processing span: {span.name}", file=sys.stderr, flush=True)
|
||||
|
||||
# Track each conversation as a thread in LangSmith
|
||||
trace_id = format(span.context.trace_id, '032x')
|
||||
span._attributes["langsmith.metadata.thread_id"] = trace_id
|
||||
|
||||
# Link all spans to their conversation for proper grouping in LangSmith
|
||||
if trace_id in self.trace_to_conversation_id:
|
||||
conversation_id = self.trace_to_conversation_id[trace_id]
|
||||
span._attributes["conversation.id"] = conversation_id
|
||||
span._attributes["langsmith.parent_span_id"] = "conversation"
|
||||
|
||||
span_name = span.name.lower()
|
||||
|
||||
# STT span: audio input -> transcribed text
|
||||
if "stt" in span_name or "speech_to_text" in span_name or "transcription" in span_name:
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
transcript = span.attributes.get("transcript") or span.attributes.get("text") or span.attributes.get("output", "")
|
||||
self._set_prompt_attributes(span, [{"role": "user", "content": "audio_segment"}])
|
||||
if transcript:
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": str(transcript)}])
|
||||
|
||||
# LLM span: conversation messages -> AI response
|
||||
elif "llm" in span_name or "chat" in span_name or "completion" in span_name or "openai" in span_name:
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
messages = self._extract_llm_messages(span)
|
||||
if not messages:
|
||||
messages = self._fallback_messages(span, span_name)
|
||||
self._set_prompt_attributes(span, messages)
|
||||
|
||||
output_data = self._extract_llm_output(span)
|
||||
if output_data:
|
||||
completion = [{"role": "assistant", "content": str(output_data)}]
|
||||
self._set_completion_attributes(span, completion)
|
||||
self._track_messages(self.conversation_messages, trace_id, messages, str(output_data))
|
||||
|
||||
# TTS span: text -> audio
|
||||
elif "tts" in span_name or "text_to_speech" in span_name or "synthesis" in span_name:
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
|
||||
# Debug TTS spans - always print attributes to see what LiveKit uses
|
||||
import sys
|
||||
print(f"\n[LANGSMITH-PROCESSOR] 🔊 TTS SPAN: {span.name}", file=sys.stderr, flush=True)
|
||||
print(f" 📋 All attributes for {span.name} ({len(span.attributes)} total):", file=sys.stderr, flush=True)
|
||||
for key, value in sorted(span.attributes.items()):
|
||||
value_str = str(value)
|
||||
if len(value_str) > 500:
|
||||
value_str = value_str[:500] + "... (truncated)"
|
||||
print(f" • {key} = {value_str}", file=sys.stderr, flush=True)
|
||||
|
||||
# Try LiveKit-specific attributes first
|
||||
text = (
|
||||
span.attributes.get("lk.input_text") or
|
||||
span.attributes.get("lk.request.text") or
|
||||
span.attributes.get("lk.text") or
|
||||
span.attributes.get("text") or
|
||||
span.attributes.get("input") or
|
||||
span.attributes.get("prompt") or
|
||||
""
|
||||
)
|
||||
|
||||
# Extract voice/model from lk.tts_metrics or other attributes
|
||||
voice_id = "unknown"
|
||||
tts_metrics = span.attributes.get("lk.tts_metrics")
|
||||
if tts_metrics:
|
||||
try:
|
||||
if isinstance(tts_metrics, str):
|
||||
metrics_data = json.loads(tts_metrics)
|
||||
else:
|
||||
metrics_data = tts_metrics
|
||||
if isinstance(metrics_data, dict):
|
||||
metadata = metrics_data.get("metadata", {})
|
||||
model_name = metadata.get("model_name") or metrics_data.get("model_name")
|
||||
if model_name:
|
||||
voice_id = str(model_name)
|
||||
except (json.JSONDecodeError, TypeError, KeyError):
|
||||
pass
|
||||
|
||||
# Fallback to other voice attributes
|
||||
if voice_id == "unknown":
|
||||
voice_id = (
|
||||
span.attributes.get("lk.voice") or
|
||||
span.attributes.get("voice") or
|
||||
span.attributes.get("voice_id") or
|
||||
"unknown"
|
||||
)
|
||||
|
||||
print(f" ✅ Extracted text: length={len(str(text))}, voice={voice_id}", file=sys.stderr, flush=True)
|
||||
|
||||
self._set_prompt_attributes(span, [
|
||||
{"role": "system", "content": f"Convert to speech with voice: {voice_id}"},
|
||||
{"role": "user", "content": str(text) if text else "text_to_speech"}
|
||||
])
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": f"Generated audio for: {text}"}])
|
||||
|
||||
# Agent/Chain/Job spans: aggregate conversation
|
||||
elif (
|
||||
"agent" in span_name
|
||||
or "session" in span_name
|
||||
or "conversation" in span_name
|
||||
or "job" in span_name
|
||||
):
|
||||
span._attributes["langsmith.span.kind"] = "chain"
|
||||
is_job_span = "job" in span_name
|
||||
|
||||
# Try to extract conversation ID
|
||||
conversation_id = (
|
||||
span.attributes.get("conversation.id") or
|
||||
span.attributes.get("conversation_id") or
|
||||
span.attributes.get("session_id") or
|
||||
(span.attributes.get("lk.job_id") if is_job_span else "") or
|
||||
""
|
||||
)
|
||||
if conversation_id:
|
||||
self.trace_to_conversation_id[trace_id] = str(conversation_id)
|
||||
span._attributes["conversation.id"] = str(conversation_id)
|
||||
span._attributes["langsmith.root_span"] = True
|
||||
elif is_job_span:
|
||||
# Ensure the root job span is treated as the LangSmith conversation root
|
||||
span._attributes["conversation.id"] = trace_id
|
||||
span._attributes["langsmith.root_span"] = True
|
||||
|
||||
# Aggregate messages from conversation
|
||||
conv_msgs = self.conversation_messages.get(trace_id, [])
|
||||
if conv_msgs:
|
||||
system_msg, first_user_msg, remaining_msgs = self._split_conversation_messages(conv_msgs)
|
||||
|
||||
# Add input (first user message only, exclude system message)
|
||||
# System message is only shown in LLM call spans, not in job entrypoint
|
||||
prompt_msgs = []
|
||||
if first_user_msg:
|
||||
prompt_msgs.append(first_user_msg)
|
||||
if prompt_msgs:
|
||||
self._set_prompt_attributes(span, prompt_msgs)
|
||||
|
||||
# Add output (remaining conversation)
|
||||
if remaining_msgs:
|
||||
self._set_completion_attributes(span, remaining_msgs)
|
||||
self._release_job_span_if_waiting(trace_id, prompt_msgs, remaining_msgs)
|
||||
elif is_job_span:
|
||||
# Defer export until conversation data becomes available
|
||||
self._defer_job_span(trace_id, span)
|
||||
return
|
||||
|
||||
# Cleanup
|
||||
should_cleanup_trace = is_job_span or span.parent is None
|
||||
if should_cleanup_trace:
|
||||
if trace_id in self.conversation_messages:
|
||||
del self.conversation_messages[trace_id]
|
||||
if trace_id in self.trace_to_conversation_id:
|
||||
del self.trace_to_conversation_id[trace_id]
|
||||
|
||||
# Default: mark as chain if no specific type detected
|
||||
else:
|
||||
# Check if it has LLM-like attributes
|
||||
if span.attributes.get("input") or span.attributes.get("output"):
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
input_val = span.attributes.get("input", "")
|
||||
output_val = span.attributes.get("output", "")
|
||||
if input_val:
|
||||
self._set_prompt_attributes(span, [{"role": "user", "content": str(input_val)}])
|
||||
if output_val:
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": str(output_val)}])
|
||||
else:
|
||||
span._attributes["langsmith.span.kind"] = "chain"
|
||||
|
||||
# Export span downstream (unless it was deferred earlier)
|
||||
self._export_span(span)
|
||||
|
||||
def _set_prompt_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0, log: bool = False):
|
||||
"""Set gen_ai.prompt.* attributes from a list of messages."""
|
||||
import sys
|
||||
for i, msg in enumerate(messages):
|
||||
idx = start_idx + i
|
||||
if isinstance(msg, dict):
|
||||
role = msg.get("role", "user")
|
||||
content = str(msg.get("content", ""))
|
||||
span._attributes[f"gen_ai.prompt.{idx}.role"] = role
|
||||
span._attributes[f"gen_ai.prompt.{idx}.content"] = content
|
||||
if log:
|
||||
content_preview = content[:100] + "..." if len(content) > 100 else content
|
||||
print(f" Set gen_ai.prompt.{idx}.role = '{role}', gen_ai.prompt.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
||||
else:
|
||||
# Handle string messages
|
||||
content = str(msg)
|
||||
span._attributes[f"gen_ai.prompt.{idx}.role"] = "user"
|
||||
span._attributes[f"gen_ai.prompt.{idx}.content"] = content
|
||||
if log:
|
||||
content_preview = content[:100] + "..." if len(content) > 100 else content
|
||||
print(f" Set gen_ai.prompt.{idx}.role = 'user', gen_ai.prompt.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
||||
|
||||
def _set_completion_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0, log: bool = False):
|
||||
"""Set gen_ai.completion.* attributes from a list of messages."""
|
||||
import sys
|
||||
for i, msg in enumerate(messages):
|
||||
idx = start_idx + i
|
||||
if isinstance(msg, dict):
|
||||
role = msg.get("role", "assistant")
|
||||
content = str(msg.get("content", ""))
|
||||
span._attributes[f"gen_ai.completion.{idx}.role"] = role
|
||||
span._attributes[f"gen_ai.completion.{idx}.content"] = content
|
||||
if log:
|
||||
content_preview = content[:200] + "..." if len(content) > 200 else content
|
||||
print(f" Set gen_ai.completion.{idx}.role = '{role}', gen_ai.completion.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
||||
else:
|
||||
# Handle string messages
|
||||
content = str(msg)
|
||||
span._attributes[f"gen_ai.completion.{idx}.role"] = "assistant"
|
||||
span._attributes[f"gen_ai.completion.{idx}.content"] = content
|
||||
if log:
|
||||
content_preview = content[:200] + "..." if len(content) > 200 else content
|
||||
print(f" Set gen_ai.completion.{idx}.role = 'assistant', gen_ai.completion.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
||||
|
||||
def _fallback_messages(self, span: ReadableSpan, span_name: str) -> list:
|
||||
"""Use system/user attributes or span name when no chat context is available."""
|
||||
system_prompt = span.attributes.get("gen_ai.system") or span.attributes.get("system") or ""
|
||||
user_prompt = (
|
||||
span.attributes.get("gen_ai.user")
|
||||
or span.attributes.get("user")
|
||||
or span.attributes.get("input")
|
||||
or ""
|
||||
)
|
||||
fallback = []
|
||||
if system_prompt:
|
||||
fallback.append({"role": "system", "content": str(system_prompt)})
|
||||
if user_prompt:
|
||||
fallback.append({"role": "user", "content": str(user_prompt)})
|
||||
if not fallback:
|
||||
fallback.append({"role": "user", "content": f"LLM request: {span_name}"})
|
||||
return fallback
|
||||
|
||||
def _split_conversation_messages(self, messages: list) -> tuple:
|
||||
"""
|
||||
Split conversation messages into system, first user, and remaining messages.
|
||||
Returns: (system_msg, first_user_msg, remaining_msgs)
|
||||
"""
|
||||
system_msg = None
|
||||
first_user_msg = None
|
||||
remaining_msgs = []
|
||||
first_user_found = False
|
||||
|
||||
for msg in messages:
|
||||
role = msg.get("role", "") if isinstance(msg, dict) else "user"
|
||||
if role == "system" and system_msg is None:
|
||||
system_msg = msg
|
||||
elif role == "user" and not first_user_found:
|
||||
first_user_msg = msg
|
||||
first_user_found = True
|
||||
elif first_user_found:
|
||||
remaining_msgs.append(msg)
|
||||
|
||||
return (system_msg, first_user_msg, remaining_msgs)
|
||||
|
||||
def _extract_llm_messages(self, span: ReadableSpan) -> list:
|
||||
"""
|
||||
Extract LLM input messages from span attributes using multiple strategies.
|
||||
Returns a list of message dicts with 'role' and 'content' keys.
|
||||
"""
|
||||
import sys
|
||||
print(f" 🔍 Strategy 1: Checking lk.chat_ctx...", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 1: LiveKit-specific attribute: lk.chat_ctx
|
||||
chat_ctx = span.attributes.get("lk.chat_ctx")
|
||||
if chat_ctx:
|
||||
print(f" ✓ Found lk.chat_ctx, type={type(chat_ctx)}, length={len(str(chat_ctx)) if isinstance(chat_ctx, str) else 'N/A'}", file=sys.stderr, flush=True)
|
||||
try:
|
||||
if isinstance(chat_ctx, str):
|
||||
ctx_data = json.loads(chat_ctx)
|
||||
else:
|
||||
ctx_data = chat_ctx
|
||||
|
||||
# Extract messages from items array
|
||||
if isinstance(ctx_data, dict) and "items" in ctx_data:
|
||||
messages = []
|
||||
for item in ctx_data["items"]:
|
||||
if isinstance(item, dict) and item.get("type") == "message":
|
||||
role = item.get("role", "user")
|
||||
content = item.get("content", "")
|
||||
# Content might be a list of strings or a single string
|
||||
if isinstance(content, list):
|
||||
content = " ".join(str(c) for c in content)
|
||||
if content:
|
||||
messages.append({"role": str(role), "content": str(content)})
|
||||
|
||||
if messages:
|
||||
print(f" ✅ Strategy 1 SUCCESS: Found {len(messages)} messages from lk.chat_ctx", file=sys.stderr, flush=True)
|
||||
return messages
|
||||
except (json.JSONDecodeError, TypeError, KeyError, AttributeError) as e:
|
||||
print(f" ✗ Strategy 1 FAILED: {type(e).__name__}: {e}", file=sys.stderr, flush=True)
|
||||
else:
|
||||
print(f" ✗ lk.chat_ctx not found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 2: Check for OpenTelemetry semantic convention attributes
|
||||
# gen_ai.request.prompt.* or gen_ai.prompt.*
|
||||
print(f" 🔍 Strategy 2: Checking gen_ai.request.prompt.*...", file=sys.stderr, flush=True)
|
||||
messages = []
|
||||
idx = 0
|
||||
while True:
|
||||
role_key = f"gen_ai.request.prompt.{idx}.role"
|
||||
content_key = f"gen_ai.request.prompt.{idx}.content"
|
||||
if role_key in span.attributes or content_key in span.attributes:
|
||||
role = span.attributes.get(role_key, "user")
|
||||
content = span.attributes.get(content_key, "")
|
||||
if content:
|
||||
messages.append({"role": str(role), "content": str(content)})
|
||||
idx += 1
|
||||
else:
|
||||
break
|
||||
|
||||
if messages:
|
||||
print(f" ✅ Strategy 2 SUCCESS: Found {len(messages)} messages from gen_ai.request.prompt.*", file=sys.stderr, flush=True)
|
||||
return messages
|
||||
else:
|
||||
print(f" ✗ No gen_ai.request.prompt.* attributes found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 2b: Check for gen_ai.prompt.* (alternative format)
|
||||
print(f" 🔍 Strategy 2b: Checking gen_ai.prompt.*...", file=sys.stderr, flush=True)
|
||||
idx = 0
|
||||
while True:
|
||||
role_key = f"gen_ai.prompt.{idx}.role"
|
||||
content_key = f"gen_ai.prompt.{idx}.content"
|
||||
if role_key in span.attributes or content_key in span.attributes:
|
||||
role = span.attributes.get(role_key, "user")
|
||||
content = span.attributes.get(content_key, "")
|
||||
if content:
|
||||
messages.append({"role": str(role), "content": str(content)})
|
||||
idx += 1
|
||||
else:
|
||||
break
|
||||
|
||||
if messages:
|
||||
print(f" ✅ Strategy 2b SUCCESS: Found {len(messages)} messages from gen_ai.prompt.*", file=sys.stderr, flush=True)
|
||||
return messages
|
||||
else:
|
||||
print(f" ✗ No gen_ai.prompt.* attributes found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 3: Check for messages attribute (JSON string or list)
|
||||
print(f" 🔍 Strategy 3: Checking messages/llm.messages/input attributes...", file=sys.stderr, flush=True)
|
||||
messages_attr = span.attributes.get("messages") or span.attributes.get("llm.messages") or span.attributes.get("input")
|
||||
print(f" Checking: messages={bool(span.attributes.get('messages'))}, llm.messages={bool(span.attributes.get('llm.messages'))}, input={bool(span.attributes.get('input'))}", file=sys.stderr, flush=True)
|
||||
if messages_attr:
|
||||
try:
|
||||
if isinstance(messages_attr, str):
|
||||
if DEBUG:
|
||||
logger.debug(f" Parsing JSON string, length={len(messages_attr)}")
|
||||
parsed = json.loads(messages_attr)
|
||||
if isinstance(parsed, list):
|
||||
# Validate and normalize message format
|
||||
normalized = []
|
||||
for msg in parsed:
|
||||
if isinstance(msg, dict) and "content" in msg:
|
||||
normalized.append({
|
||||
"role": msg.get("role", "user"),
|
||||
"content": str(msg.get("content", ""))
|
||||
})
|
||||
if normalized:
|
||||
print(f" ✅ Strategy 3 SUCCESS: Found {len(normalized)} messages from JSON string", file=sys.stderr, flush=True)
|
||||
return normalized
|
||||
elif isinstance(messages_attr, list):
|
||||
print(f" Found list type, length={len(messages_attr)}", file=sys.stderr, flush=True)
|
||||
# Validate and normalize message format
|
||||
normalized = []
|
||||
for msg in messages_attr:
|
||||
if isinstance(msg, dict) and "content" in msg:
|
||||
normalized.append({
|
||||
"role": msg.get("role", "user"),
|
||||
"content": str(msg.get("content", ""))
|
||||
})
|
||||
if normalized:
|
||||
print(f" ✅ Strategy 3 SUCCESS: Found {len(normalized)} messages from list", file=sys.stderr, flush=True)
|
||||
return normalized
|
||||
except (json.JSONDecodeError, TypeError, AttributeError) as e:
|
||||
print(f" ✗ Strategy 3 FAILED: {type(e).__name__}: {e}", file=sys.stderr, flush=True)
|
||||
else:
|
||||
print(f" ✗ No messages attribute found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 4: Check for individual system/user/assistant attributes
|
||||
print(f" 🔍 Strategy 4: Checking individual system/user/assistant attributes...", file=sys.stderr, flush=True)
|
||||
system = span.attributes.get("gen_ai.system") or span.attributes.get("system") or span.attributes.get("system_prompt")
|
||||
user = span.attributes.get("gen_ai.user") or span.attributes.get("user") or span.attributes.get("user_input")
|
||||
assistant = span.attributes.get("gen_ai.assistant") or span.attributes.get("assistant")
|
||||
|
||||
print(f" system={bool(system)}, user={bool(user)}, assistant={bool(assistant)}", file=sys.stderr, flush=True)
|
||||
|
||||
if system or user or assistant:
|
||||
result = []
|
||||
if system:
|
||||
result.append({"role": "system", "content": str(system)})
|
||||
if user:
|
||||
result.append({"role": "user", "content": str(user)})
|
||||
if assistant:
|
||||
result.append({"role": "assistant", "content": str(assistant)})
|
||||
if result:
|
||||
print(f" ✅ Strategy 4 SUCCESS: Found {len(result)} messages from individual attributes", file=sys.stderr, flush=True)
|
||||
return result
|
||||
else:
|
||||
print(f" ✗ No individual attributes found", file=sys.stderr, flush=True)
|
||||
|
||||
print(f" ⚠️ All strategies failed - no messages extracted", file=sys.stderr, flush=True)
|
||||
return []
|
||||
|
||||
def _extract_llm_output(self, span: ReadableSpan) -> str:
|
||||
"""
|
||||
Extract LLM output/completion from span attributes using multiple strategies.
|
||||
Returns the output as a string.
|
||||
"""
|
||||
import sys
|
||||
print(f" 🔍 EXTRACTING LLM OUTPUT:", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 1: LiveKit-specific attribute: lk.response.text
|
||||
print(f" Strategy 1: Checking lk.response.text...", file=sys.stderr, flush=True)
|
||||
output = span.attributes.get("lk.response.text")
|
||||
if output:
|
||||
print(f" ✅ Strategy 1 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
|
||||
return str(output)
|
||||
else:
|
||||
print(f" ✗ lk.response.text not found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 2: OpenTelemetry semantic convention
|
||||
print(f" Strategy 2: Checking gen_ai.response.text / gen_ai.completion.text...", file=sys.stderr, flush=True)
|
||||
output = span.attributes.get("gen_ai.response.text") or span.attributes.get("gen_ai.completion.text")
|
||||
if output:
|
||||
print(f" ✅ Strategy 2 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
|
||||
return str(output)
|
||||
else:
|
||||
print(f" ✗ gen_ai.response.text and gen_ai.completion.text not found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 3: Common attribute names
|
||||
print(f" Strategy 3: Checking common attribute names...", file=sys.stderr, flush=True)
|
||||
output = (
|
||||
span.attributes.get("gen_ai.response") or
|
||||
span.attributes.get("gen_ai.completion") or
|
||||
span.attributes.get("output") or
|
||||
span.attributes.get("response") or
|
||||
span.attributes.get("completion") or
|
||||
span.attributes.get("llm.output") or
|
||||
span.attributes.get("llm.response") or
|
||||
span.attributes.get("text") or
|
||||
""
|
||||
)
|
||||
|
||||
if output:
|
||||
print(f" ✅ Strategy 3 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
|
||||
return str(output)
|
||||
else:
|
||||
print(f" ✗ No common output attributes found", file=sys.stderr, flush=True)
|
||||
|
||||
# Strategy 4: Check for completion.* attributes
|
||||
print(f" Strategy 4: Checking gen_ai.completion.* attributes...", file=sys.stderr, flush=True)
|
||||
idx = 0
|
||||
completion_parts = []
|
||||
while True:
|
||||
content_key = f"gen_ai.completion.{idx}.content"
|
||||
if content_key in span.attributes:
|
||||
completion_parts.append(str(span.attributes[content_key]))
|
||||
idx += 1
|
||||
else:
|
||||
break
|
||||
|
||||
if completion_parts:
|
||||
print(f" ✅ Strategy 4 SUCCESS: Found {len(completion_parts)} completion parts", file=sys.stderr, flush=True)
|
||||
return "\n".join(completion_parts)
|
||||
else:
|
||||
print(f" ✗ No gen_ai.completion.* attributes found", file=sys.stderr, flush=True)
|
||||
|
||||
print(f" ⚠️ All strategies failed - no output extracted", file=sys.stderr, flush=True)
|
||||
return ""
|
||||
|
||||
def _get_messages_from_attributes(self, span: ReadableSpan) -> list:
|
||||
"""Extract messages from span attributes as fallback."""
|
||||
messages = []
|
||||
system = span.attributes.get("gen_ai.system") or span.attributes.get("system")
|
||||
user = span.attributes.get("gen_ai.user") or span.attributes.get("user") or span.attributes.get("input")
|
||||
|
||||
if system:
|
||||
messages.append({"role": "system", "content": str(system)})
|
||||
if user:
|
||||
messages.append({"role": "user", "content": str(user)})
|
||||
|
||||
return messages
|
||||
|
||||
def _track_messages(self, target_dict: dict, key: str, messages: list, output_data: str):
|
||||
"""
|
||||
Track messages in target_dict, avoiding duplicates.
|
||||
Preserves deduplication logic: case-insensitive content comparison.
|
||||
"""
|
||||
if key not in target_dict:
|
||||
target_dict[key] = []
|
||||
# Add system prompt once at the start
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict) and msg.get("role") == "system":
|
||||
target_dict[key].append(msg)
|
||||
break
|
||||
|
||||
# Add the latest user message if it's new
|
||||
last_user_msg = next(
|
||||
(msg for msg in reversed(messages) if isinstance(msg, dict) and msg.get("role") == "user"),
|
||||
None
|
||||
)
|
||||
if last_user_msg:
|
||||
new_content = str(last_user_msg.get("content", "")).strip().lower()
|
||||
existing_contents = [
|
||||
str(m.get("content", "")).strip().lower()
|
||||
for m in target_dict[key]
|
||||
if isinstance(m, dict) and m.get("role") == "user"
|
||||
]
|
||||
if new_content and new_content not in existing_contents:
|
||||
target_dict[key].append(last_user_msg)
|
||||
|
||||
# Add the assistant response if it's new
|
||||
if output_data:
|
||||
new_assistant_content = str(output_data).strip().lower()
|
||||
existing_assistant_contents = [
|
||||
str(m.get("content", "")).strip().lower()
|
||||
for m in target_dict[key]
|
||||
if isinstance(m, dict) and m.get("role") == "assistant"
|
||||
]
|
||||
if new_assistant_content not in existing_assistant_contents:
|
||||
target_dict[key].append({"role": "assistant", "content": output_data})
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._flush_deferred_job_spans()
|
||||
if self.downstream:
|
||||
self.downstream.shutdown()
|
||||
|
||||
def force_flush(self, timeout_millis: int = 30000) -> bool:
|
||||
self._flush_deferred_job_spans()
|
||||
if self.downstream:
|
||||
return self.downstream.force_flush(timeout_millis)
|
||||
return True
|
||||
|
||||
def _defer_job_span(self, trace_id: str, span: ReadableSpan):
|
||||
import sys
|
||||
self.deferred_job_spans[trace_id] = span
|
||||
print(f"⏸️ Deferring export of job span for trace {trace_id}", file=sys.stderr, flush=True)
|
||||
|
||||
def _release_job_span_if_waiting(self, trace_id: str, prompt_msgs: list, completion_msgs: list):
|
||||
job_span = self.deferred_job_spans.pop(trace_id, None)
|
||||
if not job_span:
|
||||
return
|
||||
import sys
|
||||
print(f"🧩 Releasing deferred job span for trace {trace_id}", file=sys.stderr, flush=True)
|
||||
if prompt_msgs:
|
||||
self._set_prompt_attributes(job_span, deepcopy(prompt_msgs))
|
||||
if completion_msgs:
|
||||
self._set_completion_attributes(job_span, deepcopy(completion_msgs))
|
||||
self._export_span(job_span)
|
||||
|
||||
def _flush_deferred_job_spans(self):
|
||||
if not self.deferred_job_spans:
|
||||
return
|
||||
import sys
|
||||
print(f"⚠️ Flushing {len(self.deferred_job_spans)} deferred job span(s) without conversation data", file=sys.stderr, flush=True)
|
||||
for trace_id, span in list(self.deferred_job_spans.items()):
|
||||
self._set_prompt_attributes(span, [{"role": "system", "content": "Conversation not captured"}])
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": "No conversation turns recorded."}])
|
||||
self._export_span(span)
|
||||
del self.deferred_job_spans[trace_id]
|
||||
|
||||
def _export_span(self, span: ReadableSpan):
|
||||
if self.downstream:
|
||||
self.downstream.on_end(span)
|
||||
|
||||
@@ -1,138 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from livekit import agents, rtc
|
||||
from livekit.agents import AgentServer, AgentSession, Agent, room_io
|
||||
from livekit.agents.telemetry import set_tracer_provider
|
||||
from livekit.plugins import noise_cancellation, silero
|
||||
from livekit.plugins.turn_detector.multilingual import MultilingualModel
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
|
||||
from langsmith_processor import LangSmithSpanProcessor
|
||||
|
||||
# Try loading .env.local first, then .env, then check parent directory
|
||||
script_dir = Path(__file__).parent
|
||||
env_loaded = False
|
||||
env_files_tried = []
|
||||
|
||||
for env_file in [".env.local", ".env"]:
|
||||
env_path = script_dir / env_file
|
||||
if env_path.exists():
|
||||
try:
|
||||
load_dotenv(env_path, override=True)
|
||||
env_loaded = True
|
||||
break
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Warning: Error loading {env_path}: {e}\n"
|
||||
f" Continuing without loading this file...",
|
||||
file=__import__("sys").stderr
|
||||
)
|
||||
env_files_tried.append(str(env_path))
|
||||
|
||||
# Also try parent directory
|
||||
if not env_loaded:
|
||||
parent_env = script_dir.parent / ".env.local"
|
||||
if parent_env.exists():
|
||||
try:
|
||||
load_dotenv(parent_env, override=True)
|
||||
env_loaded = True
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Warning: Error loading {parent_env}: {e}\n"
|
||||
f" Continuing without loading this file...",
|
||||
file=__import__("sys").stderr
|
||||
)
|
||||
env_files_tried.append(str(parent_env))
|
||||
else:
|
||||
parent_env = script_dir.parent / ".env"
|
||||
if parent_env.exists():
|
||||
try:
|
||||
load_dotenv(parent_env, override=True)
|
||||
env_loaded = True
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Warning: Error loading {parent_env}: {e}\n"
|
||||
f" Continuing without loading this file...",
|
||||
file=__import__("sys").stderr
|
||||
)
|
||||
env_files_tried.append(str(parent_env))
|
||||
|
||||
# Fallback: try loading from current directory without specifying file
|
||||
if not env_loaded:
|
||||
try:
|
||||
load_dotenv(override=True)
|
||||
except Exception as e:
|
||||
print(
|
||||
f"⚠️ Warning: Error loading .env file: {e}\n"
|
||||
f" Continuing without environment file...",
|
||||
file=__import__("sys").stderr
|
||||
)
|
||||
|
||||
|
||||
def setup_langsmith():
|
||||
"""Setup OpenTelemetry tracing to export spans to LangSmith."""
|
||||
endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
|
||||
headers = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
|
||||
|
||||
if not endpoint or not headers:
|
||||
print(
|
||||
"⚠️ Warning: OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS not set. "
|
||||
"LangSmith tracing will be disabled. To enable, set these environment variables:\n"
|
||||
" - OTEL_EXPORTER_OTLP_ENDPOINT=https://api.smith.langchain.com/otel\n"
|
||||
" - OTEL_EXPORTER_OTLP_HEADERS=x-api-key=your_langsmith_api_key\n"
|
||||
"You can set them in a .env.local or .env file in this directory.",
|
||||
file=__import__("sys").stderr
|
||||
)
|
||||
return
|
||||
|
||||
trace_provider = TracerProvider()
|
||||
# Register LangSmith processor (handles enrichment + forwarding to OTLP)
|
||||
trace_provider.add_span_processor(LangSmithSpanProcessor())
|
||||
set_tracer_provider(trace_provider)
|
||||
print("✅ LangSmith tracing enabled", file=__import__("sys").stderr)
|
||||
|
||||
|
||||
# Setup LangSmith tracing before creating the server (optional if env vars not set)
|
||||
setup_langsmith()
|
||||
|
||||
|
||||
class Assistant(Agent):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
instructions="""You are a helpful voice AI assistant.
|
||||
You eagerly assist users with their questions by providing information from your extensive knowledge.
|
||||
Your responses are concise, to the point, and without any complex formatting or punctuation including emojis, asterisks, or other symbols.
|
||||
You are curious, friendly, and have a sense of humor.""",
|
||||
)
|
||||
|
||||
server = AgentServer()
|
||||
|
||||
@server.rtc_session()
|
||||
async def my_agent(ctx: agents.JobContext):
|
||||
session = AgentSession(
|
||||
stt="assemblyai/universal-streaming:en",
|
||||
llm="openai/gpt-4.1-mini",
|
||||
tts="cartesia/sonic-3:9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
|
||||
vad=silero.VAD.load(),
|
||||
turn_detection=MultilingualModel(),
|
||||
)
|
||||
|
||||
await session.start(
|
||||
room=ctx.room,
|
||||
agent=Assistant(),
|
||||
room_options=room_io.RoomOptions(
|
||||
audio_input=room_io.AudioInputOptions(
|
||||
noise_cancellation=lambda params: noise_cancellation.BVCTelephony() if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP else noise_cancellation.BVC(),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run in console mode (local terminal demo)
|
||||
# Override sys.argv to always use console mode, ignoring any command-line args
|
||||
sys.argv = [sys.argv[0], "console"]
|
||||
agents.cli.run_app(server)
|
||||
@@ -1,127 +0,0 @@
|
||||
"""
|
||||
Audio recorder for capturing and saving conversation audio.
|
||||
|
||||
Handles sample rate mismatches between input (microphone) and output (TTS) audio
|
||||
by automatically resampling to the highest detected rate.
|
||||
|
||||
Uses a streaming approach with a bounded buffer to avoid memory leaks on long conversations.
|
||||
"""
|
||||
import wave
|
||||
import numpy as np
|
||||
from scipy import signal
|
||||
from loguru import logger
|
||||
from pipecat.frames.frames import Frame, AudioRawFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class AudioRecorder(FrameProcessor):
|
||||
"""
|
||||
Custom frame processor that captures all audio (user input + TTS output) and saves to WAV.
|
||||
Handles sample rate mismatches by resampling to the highest detected rate.
|
||||
|
||||
Uses a bounded buffer to prevent memory leaks - flushes to disk periodically.
|
||||
"""
|
||||
|
||||
# Maximum frames to buffer before flushing to disk (prevents memory leak)
|
||||
MAX_BUFFER_FRAMES = 500 # ~10 seconds at typical frame rates
|
||||
|
||||
def __init__(self, output_path: str, sample_rate: int = 24000, channels: int = 1):
|
||||
super().__init__()
|
||||
self.output_path = output_path
|
||||
self._default_sample_rate = sample_rate # Store default, never mutate
|
||||
self._target_sample_rate = None # Will be set during initialization
|
||||
self.channels = channels
|
||||
self.audio_frames = [] # (audio_data, frame_sample_rate) tuples
|
||||
self.detected_sample_rates = set()
|
||||
self._wav_file = None
|
||||
self._is_initialized = False
|
||||
|
||||
def _initialize_wav_file(self):
|
||||
"""Initialize the WAV file with headers when first frame arrives."""
|
||||
if self._is_initialized:
|
||||
return
|
||||
|
||||
try:
|
||||
# Determine target sample rate: use highest detected, or fall back to default
|
||||
# Store in separate variable to avoid mutating the default
|
||||
self._target_sample_rate = (
|
||||
max(self.detected_sample_rates)
|
||||
if self.detected_sample_rates
|
||||
else self._default_sample_rate
|
||||
)
|
||||
|
||||
self._wav_file = wave.open(self.output_path, 'wb')
|
||||
self._wav_file.setnchannels(self.channels)
|
||||
self._wav_file.setsampwidth(2) # 16-bit PCM
|
||||
self._wav_file.setframerate(self._target_sample_rate)
|
||||
self._is_initialized = True
|
||||
logger.debug(f"Initialized WAV file at {self.output_path} with sample rate {self._target_sample_rate}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize WAV file {self.output_path}: {e}")
|
||||
|
||||
def _flush_buffer(self):
|
||||
"""Flush buffered frames to disk and clear buffer."""
|
||||
if not self.audio_frames or not self._wav_file:
|
||||
return
|
||||
|
||||
try:
|
||||
for audio_data, frame_sample_rate in self.audio_frames:
|
||||
# Resample if needed
|
||||
if frame_sample_rate and frame_sample_rate != self._target_sample_rate:
|
||||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||||
num_samples = int(len(audio_array) * self._target_sample_rate / frame_sample_rate)
|
||||
resampled = signal.resample(audio_array, num_samples)
|
||||
audio_data = resampled.astype(np.int16).tobytes()
|
||||
|
||||
self._wav_file.writeframes(audio_data)
|
||||
|
||||
# Clear buffer after writing to disk
|
||||
self.audio_frames.clear()
|
||||
logger.debug(f"Flushed audio buffer to {self.output_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to flush audio buffer to {self.output_path}: {e}")
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Capture all audio frames
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
# Pipecat uses 'sample_rate' attribute on AudioRawFrame
|
||||
frame_sample_rate = getattr(frame, 'sample_rate', None)
|
||||
if frame_sample_rate:
|
||||
self.detected_sample_rates.add(frame_sample_rate)
|
||||
|
||||
# Initialize file on first frame
|
||||
if not self._is_initialized:
|
||||
self._initialize_wav_file()
|
||||
|
||||
# Buffer the frame
|
||||
self.audio_frames.append((frame.audio, frame_sample_rate))
|
||||
|
||||
# Flush to disk if buffer is getting too large (prevents memory leak)
|
||||
if len(self.audio_frames) >= self.MAX_BUFFER_FRAMES:
|
||||
self._flush_buffer()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def save_recording(self):
|
||||
"""Flush any remaining buffered audio and close the WAV file."""
|
||||
if not self._is_initialized:
|
||||
logger.warning("No audio frames to save - WAV file was never initialized")
|
||||
return
|
||||
|
||||
try:
|
||||
# Flush any remaining buffered frames
|
||||
self._flush_buffer()
|
||||
|
||||
# Close the WAV file
|
||||
if self._wav_file:
|
||||
self._wav_file.close()
|
||||
self._wav_file = None
|
||||
|
||||
logger.info(f"Recording saved to: {self.output_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save recording to {self.output_path}: {e}")
|
||||
finally:
|
||||
self._is_initialized = False
|
||||
|
||||
@@ -1,400 +0,0 @@
|
||||
"""
|
||||
LangSmith span processor for Pipecat.
|
||||
|
||||
Enriches OpenTelemetry spans from Pipecat with LangSmith-compatible attributes
|
||||
for proper conversation tracking and visualization.
|
||||
"""
|
||||
import base64
|
||||
import json
|
||||
from pathlib import Path
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan, TracerProvider
|
||||
from opentelemetry import trace
|
||||
from loguru import logger
|
||||
from pipecat.utils.tracing.setup import setup_tracing
|
||||
|
||||
|
||||
class LangSmithSTTSpanProcessor(SpanProcessor):
|
||||
"""
|
||||
Custom OpenTelemetry span processor that enriches Pipecat spans with LangSmith-compatible attributes.
|
||||
This enables proper conversation tracking and message visualization in LangSmith's UI.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
# Track conversation messages across spans for proper LangSmith grouping
|
||||
self.conversation_messages = {} # trace_id -> list of messages
|
||||
self.turn_messages = {} # parent_span_id -> list of messages
|
||||
self.trace_to_conversation_id = {} # trace_id -> conversation_id
|
||||
self.conversation_recordings = {} # conversation_id -> recording_path
|
||||
self.conversation_recorders = {} # conversation_id -> AudioRecorder instance
|
||||
self.turn_recordings = {} # conversation_id -> {turn_number: {user: path, ai: path}}
|
||||
self.turn_audio_recorders = {} # conversation_id -> TurnAudioRecorder instance
|
||||
|
||||
def on_start(self, span: ReadableSpan, parent_context=None) -> None:
|
||||
pass
|
||||
|
||||
def register_recording(self, conversation_id: str, recording_path: str, audio_recorder=None):
|
||||
"""Register a recording path and optional audio recorder for a conversation to attach to the root span."""
|
||||
self.conversation_recordings[conversation_id] = recording_path
|
||||
if audio_recorder:
|
||||
self.conversation_recorders[conversation_id] = audio_recorder
|
||||
|
||||
def register_turn_audio_recorder(self, conversation_id: str, turn_audio_recorder):
|
||||
"""
|
||||
Register the TurnAudioRecorder instance for a conversation.
|
||||
This allows the span processor to call save_turn_audio_sync() when turn spans end.
|
||||
|
||||
Args:
|
||||
conversation_id: The conversation ID
|
||||
turn_audio_recorder: TurnAudioRecorder instance
|
||||
"""
|
||||
self.turn_audio_recorders[conversation_id] = turn_audio_recorder
|
||||
|
||||
def register_turn_recording(self, conversation_id: str, turn_number: int, recording_paths: dict):
|
||||
"""
|
||||
Register turn-specific audio recordings for attachment to turn spans.
|
||||
|
||||
Args:
|
||||
conversation_id: The conversation ID
|
||||
turn_number: The turn number
|
||||
recording_paths: Dict with 'user' and/or 'ai' keys pointing to WAV file paths
|
||||
"""
|
||||
if conversation_id not in self.turn_recordings:
|
||||
self.turn_recordings[conversation_id] = {}
|
||||
self.turn_recordings[conversation_id][turn_number] = recording_paths
|
||||
|
||||
def on_end(self, span: ReadableSpan) -> None:
|
||||
"""
|
||||
Enriches spans with LangSmith-compatible attributes before they're exported.
|
||||
Maps Pipecat span types (stt, llm, tts, turn, conversation) to LangSmith's expected format.
|
||||
"""
|
||||
# Track each conversation as a thread in LangSmith
|
||||
trace_id = format(span.context.trace_id, '032x')
|
||||
span._attributes["langsmith.metadata.thread_id"] = trace_id
|
||||
|
||||
# Link all spans to their conversation for proper grouping in LangSmith
|
||||
if trace_id in self.trace_to_conversation_id:
|
||||
conversation_id = self.trace_to_conversation_id[trace_id]
|
||||
span._attributes["conversation.id"] = conversation_id
|
||||
span._attributes["langsmith.parent_span_id"] = "conversation"
|
||||
|
||||
# STT span: audio input -> transcribed text
|
||||
if span.name == "stt":
|
||||
transcript = span.attributes.get("transcript", "")
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
self._set_prompt_attributes(span, [{"role": "user", "content": "audio_segment"}])
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": transcript}])
|
||||
|
||||
# LLM span: conversation messages -> AI response
|
||||
elif span.name == "llm":
|
||||
input_data = span.attributes.get("input", "")
|
||||
output_data = span.attributes.get("output", "")
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
|
||||
# Parse and add input messages
|
||||
messages = []
|
||||
try:
|
||||
messages = json.loads(input_data)
|
||||
self._set_prompt_attributes(span, messages)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Add LLM output
|
||||
if output_data:
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": output_data}])
|
||||
|
||||
# Track messages for aggregating into turn and conversation spans
|
||||
parent_span_id = format(span.parent.span_id, '016x') if span.parent else None
|
||||
self._track_messages(self.conversation_messages, trace_id, messages, output_data)
|
||||
if parent_span_id:
|
||||
self._track_messages(self.turn_messages, parent_span_id, messages, output_data)
|
||||
|
||||
# TTS span: text -> audio
|
||||
elif span.name == "tts":
|
||||
text = span.attributes.get("text", "")
|
||||
voice_id = span.attributes.get("voice_id", "")
|
||||
span._attributes["langsmith.span.kind"] = "llm"
|
||||
self._set_prompt_attributes(span, [
|
||||
{"role": "system", "content": f"Convert to speech with voice: {voice_id}"},
|
||||
{"role": "user", "content": text}
|
||||
])
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": f"Generated audio for: {text}"}])
|
||||
|
||||
# Turn span: represents a single user-assistant interaction
|
||||
elif span.name == "turn":
|
||||
turn_number = span.attributes.get("turn.number", 0)
|
||||
was_interrupted = span.attributes.get("turn.was_interrupted", False)
|
||||
span._attributes["langsmith.span.kind"] = "chain"
|
||||
|
||||
# Aggregate messages from this turn's child spans
|
||||
span_id = format(span.context.span_id, '016x')
|
||||
turn_msgs = self.turn_messages.get(span_id, [])
|
||||
user_msgs = self._get_messages_by_role(turn_msgs, "user")
|
||||
assistant_msgs = self._get_messages_by_role(turn_msgs, "assistant")
|
||||
|
||||
# Add user input(s)
|
||||
if user_msgs:
|
||||
self._set_prompt_attributes(span, user_msgs)
|
||||
else:
|
||||
self._set_prompt_attributes(span, [{"role": "user", "content": f"Turn {turn_number}"}])
|
||||
|
||||
# Add assistant response(s)
|
||||
if assistant_msgs:
|
||||
self._set_completion_attributes(span, assistant_msgs)
|
||||
else:
|
||||
status = "interrupted" if was_interrupted else "no response"
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": status}])
|
||||
|
||||
# Attach turn audio files - save them NOW before span is exported
|
||||
conversation_id = span.attributes.get("conversation.id", "")
|
||||
if not conversation_id:
|
||||
conversation_id = self.trace_to_conversation_id.get(trace_id, "")
|
||||
|
||||
# Get the turn audio recorder and save files synchronously RIGHT NOW
|
||||
if conversation_id and conversation_id in self.turn_audio_recorders:
|
||||
turn_audio_recorder = self.turn_audio_recorders[conversation_id]
|
||||
|
||||
# Save files synchronously before span is exported
|
||||
turn_files = turn_audio_recorder.save_turn_audio_sync(turn_number)
|
||||
|
||||
if turn_files:
|
||||
attachments = []
|
||||
|
||||
# Attach user audio
|
||||
if 'user' in turn_files:
|
||||
user_audio = self._load_audio_file(turn_files['user'])
|
||||
if user_audio:
|
||||
attachments.append({
|
||||
"name": f"turn_{turn_number}_user.wav",
|
||||
"content": user_audio,
|
||||
"mime_type": "audio/wav"
|
||||
})
|
||||
|
||||
# Attach AI audio
|
||||
if 'ai' in turn_files:
|
||||
ai_audio = self._load_audio_file(turn_files['ai'])
|
||||
if ai_audio:
|
||||
attachments.append({
|
||||
"name": f"turn_{turn_number}_ai.wav",
|
||||
"content": ai_audio,
|
||||
"mime_type": "audio/wav"
|
||||
})
|
||||
|
||||
if attachments:
|
||||
span._attributes["langsmith.attachments"] = json.dumps(attachments)
|
||||
|
||||
# Cleanup
|
||||
if span_id in self.turn_messages:
|
||||
del self.turn_messages[span_id]
|
||||
|
||||
# Conversation span: represents the entire conversation session
|
||||
elif span.name == "conversation":
|
||||
conversation_id = span.attributes.get("conversation.id", "")
|
||||
conversation_type = span.attributes.get("conversation.type", "voice")
|
||||
|
||||
# Try alternative conversation_id keys if the standard one is empty
|
||||
if not conversation_id:
|
||||
conversation_id = span.attributes.get("conversation_id", "")
|
||||
|
||||
# Store conversation_id for linking child spans
|
||||
self.trace_to_conversation_id[trace_id] = conversation_id
|
||||
|
||||
span._attributes["langsmith.span.kind"] = "chain"
|
||||
span._attributes["langsmith.root_span"] = True
|
||||
|
||||
# Aggregate all messages from the conversation
|
||||
conv_msgs = self.conversation_messages.get(trace_id, [])
|
||||
|
||||
if conv_msgs:
|
||||
system_msg, first_user_msg, remaining_msgs = self._split_conversation_messages(conv_msgs)
|
||||
|
||||
# Add input (first user message only, exclude system message)
|
||||
prompt_msgs = []
|
||||
if first_user_msg:
|
||||
prompt_msgs.append(first_user_msg)
|
||||
self._set_prompt_attributes(span, prompt_msgs)
|
||||
|
||||
# Add output (remaining conversation)
|
||||
if remaining_msgs:
|
||||
self._set_completion_attributes(span, remaining_msgs)
|
||||
else:
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": "No responses yet"}])
|
||||
else:
|
||||
self._set_prompt_attributes(span, [{"role": "system", "content": f"Starting {conversation_type} conversation"}])
|
||||
self._set_completion_attributes(span, [{"role": "assistant", "content": "No messages"}])
|
||||
|
||||
# Save and attach recording if available
|
||||
# First, try to save the recording if we have the audio recorder instance
|
||||
if conversation_id and conversation_id in self.conversation_recorders:
|
||||
audio_recorder = self.conversation_recorders[conversation_id]
|
||||
try:
|
||||
audio_recorder.save_recording()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to save recording for conversation {conversation_id}: {e}")
|
||||
|
||||
# Try to find recording by conversation_id, or use single available recording
|
||||
recording_path_str = None
|
||||
if conversation_id and conversation_id in self.conversation_recordings:
|
||||
recording_path_str = self.conversation_recordings[conversation_id]
|
||||
elif len(self.conversation_recordings) == 1:
|
||||
# If there's only one recording, use it (likely this conversation)
|
||||
recording_path_str = list(self.conversation_recordings.values())[0]
|
||||
|
||||
if recording_path_str:
|
||||
recording_path = Path(recording_path_str)
|
||||
|
||||
# Try to read the file once - don't block the telemetry pipeline with retries
|
||||
# The file should already exist since save_recording() is called before span ends
|
||||
if recording_path.exists():
|
||||
try:
|
||||
with open(recording_path, 'rb') as f:
|
||||
audio_data = f.read()
|
||||
if audio_data: # Ensure file is not empty
|
||||
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
|
||||
|
||||
attachments = [{
|
||||
"name": recording_path.name,
|
||||
"content": audio_base64,
|
||||
"mime_type": "audio/wav"
|
||||
}]
|
||||
|
||||
span._attributes["langsmith.attachments"] = json.dumps(attachments)
|
||||
logger.debug(f"Attached recording {recording_path.name} to conversation span")
|
||||
else:
|
||||
logger.warning(f"Recording file {recording_path} exists but is empty")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to read recording file {recording_path}: {e}")
|
||||
else:
|
||||
logger.warning(f"Recording file {recording_path} does not exist when attaching to span")
|
||||
|
||||
# Cleanup
|
||||
if trace_id in self.conversation_messages:
|
||||
del self.conversation_messages[trace_id]
|
||||
if trace_id in self.trace_to_conversation_id:
|
||||
del self.trace_to_conversation_id[trace_id]
|
||||
if conversation_id in self.conversation_recordings:
|
||||
del self.conversation_recordings[conversation_id]
|
||||
if conversation_id in self.conversation_recorders:
|
||||
del self.conversation_recorders[conversation_id]
|
||||
if conversation_id in self.turn_recordings:
|
||||
del self.turn_recordings[conversation_id]
|
||||
if conversation_id in self.turn_audio_recorders:
|
||||
del self.turn_audio_recorders[conversation_id]
|
||||
|
||||
def _set_prompt_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0):
|
||||
"""Set gen_ai.prompt.* attributes from a list of messages."""
|
||||
for i, msg in enumerate(messages):
|
||||
idx = start_idx + i
|
||||
span._attributes[f"gen_ai.prompt.{idx}.role"] = msg.get("role", "")
|
||||
span._attributes[f"gen_ai.prompt.{idx}.content"] = msg.get("content", "")
|
||||
|
||||
def _set_completion_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0):
|
||||
"""Set gen_ai.completion.* attributes from a list of messages."""
|
||||
for i, msg in enumerate(messages):
|
||||
idx = start_idx + i
|
||||
span._attributes[f"gen_ai.completion.{idx}.role"] = msg.get("role", "")
|
||||
span._attributes[f"gen_ai.completion.{idx}.content"] = msg.get("content", "")
|
||||
|
||||
def _get_messages_by_role(self, messages: list, role: str) -> list:
|
||||
"""Filter messages by role."""
|
||||
return [msg for msg in messages if msg.get("role") == role]
|
||||
|
||||
def _split_conversation_messages(self, messages: list) -> tuple:
|
||||
"""
|
||||
Split conversation messages into system, first user, and remaining messages.
|
||||
Returns: (system_msg, first_user_msg, remaining_msgs)
|
||||
"""
|
||||
system_msg = None
|
||||
first_user_msg = None
|
||||
remaining_msgs = []
|
||||
first_user_found = False
|
||||
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
if role == "system" and system_msg is None:
|
||||
system_msg = msg
|
||||
elif role == "user" and not first_user_found:
|
||||
first_user_msg = msg
|
||||
first_user_found = True
|
||||
elif first_user_found:
|
||||
remaining_msgs.append(msg)
|
||||
|
||||
return (system_msg, first_user_msg, remaining_msgs)
|
||||
|
||||
def _track_messages(self, target_dict: dict, key: str, messages: list, output_data: str):
|
||||
"""
|
||||
Track messages in target_dict, avoiding duplicates.
|
||||
Preserves all deduplication logic: case-insensitive content comparison,
|
||||
system prompt handling, and duplicate detection.
|
||||
"""
|
||||
if key not in target_dict:
|
||||
target_dict[key] = []
|
||||
# Add system prompt once at the start
|
||||
for msg in messages:
|
||||
if msg.get("role") == "system":
|
||||
target_dict[key].append(msg)
|
||||
break
|
||||
|
||||
# Add the latest user message if it's new
|
||||
last_user_msg = next((msg for msg in reversed(messages) if msg.get("role") == "user"), None)
|
||||
if last_user_msg:
|
||||
new_content = last_user_msg.get("content", "").strip().lower()
|
||||
existing_contents = [m.get("content", "").strip().lower()
|
||||
for m in target_dict[key] if m.get("role") == "user"]
|
||||
if new_content and new_content not in existing_contents:
|
||||
target_dict[key].append(last_user_msg)
|
||||
|
||||
# Add the assistant response if it's new
|
||||
if output_data:
|
||||
new_assistant_content = output_data.strip().lower()
|
||||
existing_assistant_contents = [m.get("content", "").strip().lower()
|
||||
for m in target_dict[key] if m.get("role") == "assistant"]
|
||||
if new_assistant_content not in existing_assistant_contents:
|
||||
target_dict[key].append({"role": "assistant", "content": output_data})
|
||||
|
||||
def _load_audio_file(self, file_path: str):
|
||||
"""
|
||||
Load and base64 encode an audio file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the audio file
|
||||
|
||||
Returns:
|
||||
Base64-encoded audio data or None if file doesn't exist/can't be read
|
||||
"""
|
||||
try:
|
||||
recording_path = Path(file_path)
|
||||
if recording_path.exists():
|
||||
with open(recording_path, 'rb') as f:
|
||||
audio_data = f.read()
|
||||
if audio_data:
|
||||
return base64.b64encode(audio_data).decode('utf-8')
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load audio file {file_path}: {e}")
|
||||
return None
|
||||
|
||||
def shutdown(self) -> None:
|
||||
pass
|
||||
|
||||
def force_flush(self, timeout_millis: int = 30000) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
# Setup OpenTelemetry tracing with LangSmith
|
||||
# Configure OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS in your .env
|
||||
setup_tracing(
|
||||
service_name="pipecat-langsmith-demo",
|
||||
exporter=OTLPSpanExporter(),
|
||||
console_export=False, # Disable console export to avoid base64 audio spam
|
||||
)
|
||||
|
||||
# Register our custom span processor to enrich Pipecat spans for LangSmith
|
||||
tracer_provider = trace.get_tracer_provider()
|
||||
if isinstance(tracer_provider, TracerProvider):
|
||||
span_processor = LangSmithSTTSpanProcessor()
|
||||
tracer_provider.add_span_processor(span_processor)
|
||||
else:
|
||||
# Fallback if tracer provider is not the expected type
|
||||
span_processor = LangSmithSTTSpanProcessor()
|
||||
|
||||
@@ -1,174 +0,0 @@
|
||||
"""
|
||||
Pipecat + LangSmith Demo: Voice Agent with Full Observability
|
||||
|
||||
This demo shows how to build a voice agent using Pipecat and send telemetry to LangSmith
|
||||
for observability. It uses a local Whisper model that takes in microphone input for Speech To Text (STT), OpenAI for the LLM call and for Text To Speech (TTS), and records conversations.
|
||||
|
||||
Core flow: Audio input -> STT -> LLM -> TTS -> Audio output
|
||||
|
||||
Setup: Configure these environment variables in your .env file:
|
||||
- OPENAI_API_KEY: Your OpenAI API key
|
||||
- OTEL_EXPORTER_OTLP_ENDPOINT: LangSmith OTLP endpoint (https://api.smith.langchain.com/otel for LangSmith SaaS)
|
||||
- OTEL_EXPORTER_OTLP_HEADERS: LangSmith auth headers (x-api-key=your_key)
|
||||
"""
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
# Load environment variables FIRST (before any imports that need them)
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Suppress numpy warnings
|
||||
warnings.filterwarnings('ignore', category=RuntimeWarning, module='faster_whisper')
|
||||
np.seterr(divide='ignore', invalid='ignore')
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.whisper.stt import WhisperSTTService
|
||||
from pipecat.services.openai import OpenAILLMService, OpenAITTSService, OpenAISTTService
|
||||
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
|
||||
|
||||
from langsmith_processor import LangSmithSTTSpanProcessor, span_processor # noqa: F401 - registers processor
|
||||
from audio_recorder import AudioRecorder
|
||||
from turn_audio_recorder import TurnAudioRecorder
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
def validate_environment():
|
||||
"""
|
||||
Validate that all required environment variables are set.
|
||||
|
||||
Raises:
|
||||
ValueError: If any required environment variable is missing
|
||||
"""
|
||||
required_vars = {
|
||||
"OPENAI_API_KEY": "OpenAI API key for LLM and TTS services",
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT": "LangSmith OTLP endpoint (e.g., https://api.smith.langchain.com/otel)",
|
||||
"OTEL_EXPORTER_OTLP_HEADERS": "LangSmith authentication headers (e.g., x-api-key=your_key)",
|
||||
}
|
||||
|
||||
missing_vars = []
|
||||
for var_name, description in required_vars.items():
|
||||
if not os.getenv(var_name):
|
||||
missing_vars.append(f" - {var_name}: {description}")
|
||||
|
||||
if missing_vars:
|
||||
error_msg = "Missing required environment variables:\n" + "\n".join(missing_vars)
|
||||
error_msg += "\n\nPlease set these in your .env file or environment."
|
||||
logger.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
logger.info("Environment validation passed")
|
||||
|
||||
|
||||
async def main():
|
||||
"""
|
||||
Main demo function showing how to build a voice agent with Pipecat and LangSmith observability.
|
||||
Flow: Audio input -> STT -> LLM -> TTS -> Audio output
|
||||
"""
|
||||
# Validate environment variables before proceeding
|
||||
validate_environment()
|
||||
|
||||
# Generate unique conversation ID (used for grouping spans in LangSmith)
|
||||
conversation_id = str(uuid.uuid4())
|
||||
logger.info(f"Starting conversation: {conversation_id}")
|
||||
|
||||
# Setup recording directory (in same directory as this script)
|
||||
recordings_dir = Path(__file__).parent / "recordings"
|
||||
recordings_dir.mkdir(exist_ok=True)
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
recording_path = recordings_dir / f"conversation_{timestamp}.wav"
|
||||
|
||||
# Configure local audio transport with voice activity detection
|
||||
transport = LocalAudioTransport(
|
||||
LocalAudioTransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
)
|
||||
)
|
||||
|
||||
# Initialize services (API keys read from .env)
|
||||
stt = WhisperSTTService()
|
||||
llm = OpenAILLMService(model="gpt-4o-mini")
|
||||
tts = OpenAITTSService(voice="alloy")
|
||||
|
||||
# Define the system prompt and conversation context
|
||||
context = OpenAILLMContext(
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": """You are an expert French tutor coach. Your role is to help students learn and practice French by having natural conversations in French with students at their level. Assume the user is a beginner and start simple. Keep your responses short, let the user guide the conversation. Assume the user is speaking in french and if you cant understand then ask them to speak louder and slower."""
|
||||
}
|
||||
]
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
audio_recorder = AudioRecorder(str(recording_path))
|
||||
|
||||
# Create turn audio recorder for per-turn audio snippets
|
||||
turn_audio_recorder = TurnAudioRecorder(
|
||||
span_processor=span_processor,
|
||||
conversation_id=conversation_id,
|
||||
recordings_dir=recordings_dir,
|
||||
turn_tracker=None, # Will be set after task creation
|
||||
)
|
||||
|
||||
# Register recorders with span processor
|
||||
span_processor.register_recording(conversation_id, str(recording_path), audio_recorder=audio_recorder)
|
||||
span_processor.register_turn_audio_recorder(conversation_id, turn_audio_recorder)
|
||||
|
||||
# Build the pipeline: audio flows through each processor in order
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Capture microphone input
|
||||
stt, # Transcribe audio to text
|
||||
context_aggregator.user(), # Add user message to context
|
||||
llm, # Generate AI response
|
||||
tts, # Convert response to speech
|
||||
audio_recorder, # Record all audio (full conversation)
|
||||
turn_audio_recorder, # Record per-turn audio snippets
|
||||
transport.output(), # Play audio through speakers
|
||||
context_aggregator.assistant(), # Add assistant message to context
|
||||
])
|
||||
|
||||
# Create task with tracing enabled for LangSmith observability
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(enable_metrics=True),
|
||||
enable_tracing=True,
|
||||
enable_turn_tracking=True, # Required when tracing is enabled
|
||||
conversation_id=conversation_id,
|
||||
)
|
||||
|
||||
# Wire up turn tracker to turn audio recorder
|
||||
# The TurnTrackingObserver is created by the task when enable_turn_tracking=True
|
||||
if task.turn_tracking_observer:
|
||||
turn_audio_recorder.connect_to_turn_tracker(task.turn_tracking_observer)
|
||||
logger.info("Turn audio recorder connected to turn tracker")
|
||||
else:
|
||||
logger.warning("TurnTrackingObserver not found - turn audio recording disabled")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True)
|
||||
|
||||
try:
|
||||
await runner.run(task)
|
||||
finally:
|
||||
# Save recording before conversation span completes (order matters for LangSmith attachments)
|
||||
audio_recorder.save_recording()
|
||||
logger.info(f"Recording saved to: {recording_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,288 +0,0 @@
|
||||
"""
|
||||
Turn-aware audio recorder for capturing and saving per-turn audio snippets.
|
||||
|
||||
Records separate audio files for user speech and AI responses for each conversation turn,
|
||||
enabling fine-grained audio analysis in LangSmith.
|
||||
"""
|
||||
import wave
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from scipy import signal
|
||||
from loguru import logger
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
AudioRawFrame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
TTSAudioRawFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class TurnAudioRecorder(FrameProcessor):
|
||||
"""
|
||||
Frame processor that captures user and AI audio separately per turn.
|
||||
|
||||
Subscribes to TurnTrackingObserver events to detect turn boundaries and saves
|
||||
separate WAV files for user and AI audio when each turn ends.
|
||||
|
||||
Uses bounded buffers to prevent memory issues on very long turns.
|
||||
"""
|
||||
|
||||
# Maximum frames to buffer per turn per source (prevents memory leak on long turns)
|
||||
MAX_BUFFER_FRAMES = 1000 # ~20 seconds at typical frame rates
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
span_processor,
|
||||
conversation_id: str,
|
||||
recordings_dir: Path,
|
||||
turn_tracker=None,
|
||||
user_sample_rate: int = 16000,
|
||||
ai_sample_rate: int = 24000,
|
||||
channels: int = 1,
|
||||
):
|
||||
"""
|
||||
Initialize turn audio recorder.
|
||||
|
||||
Args:
|
||||
span_processor: LangSmithSTTSpanProcessor instance for registering recordings
|
||||
conversation_id: Unique conversation identifier
|
||||
recordings_dir: Directory to save audio files
|
||||
turn_tracker: TurnTrackingObserver instance (can be set later via connect_to_turn_tracker)
|
||||
user_sample_rate: Default sample rate for user audio (16kHz typical for mic)
|
||||
ai_sample_rate: Default sample rate for AI audio (24kHz typical for TTS)
|
||||
channels: Number of audio channels (1 for mono)
|
||||
"""
|
||||
super().__init__()
|
||||
self._span_processor = span_processor
|
||||
self._conversation_id = conversation_id
|
||||
self._recordings_dir = Path(recordings_dir)
|
||||
self._turn_tracker = turn_tracker
|
||||
self._channels = channels
|
||||
|
||||
# Current turn state
|
||||
self._current_turn_number = 0
|
||||
self._is_turn_active = False
|
||||
|
||||
# Buffers for current turn: [(audio_data, sample_rate), ...]
|
||||
self._current_user_frames = []
|
||||
self._current_ai_frames = []
|
||||
|
||||
# Track detected sample rates separately for user and AI
|
||||
self._user_detected_rates = set()
|
||||
self._ai_detected_rates = set()
|
||||
|
||||
# Default sample rates
|
||||
self._default_user_rate = user_sample_rate
|
||||
self._default_ai_rate = ai_sample_rate
|
||||
|
||||
# Map turn numbers to saved file paths
|
||||
self._turn_recordings = {} # turn_number -> {user: path, ai: path}
|
||||
|
||||
# Ensure recordings directory exists
|
||||
self._recordings_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
logger.info(
|
||||
f"TurnAudioRecorder initialized for conversation {conversation_id}"
|
||||
)
|
||||
|
||||
def connect_to_turn_tracker(self, turn_tracker):
|
||||
"""
|
||||
Connect to turn tracker and register event handlers.
|
||||
|
||||
This method should be called after the PipelineTask is created,
|
||||
as the TurnTrackingObserver is created by the task.
|
||||
|
||||
Args:
|
||||
turn_tracker: TurnTrackingObserver instance
|
||||
"""
|
||||
self._turn_tracker = turn_tracker
|
||||
|
||||
# Register event handlers using wrapper methods
|
||||
turn_tracker.add_event_handler("on_turn_started", self._on_turn_started_wrapper)
|
||||
turn_tracker.add_event_handler("on_turn_ended", self._on_turn_ended_wrapper)
|
||||
|
||||
logger.info(
|
||||
f"TurnAudioRecorder connected to TurnTrackingObserver for conversation {self._conversation_id}"
|
||||
)
|
||||
|
||||
async def _on_turn_started_wrapper(self, observer, turn_number: int):
|
||||
"""
|
||||
Wrapper for turn started event handler.
|
||||
Event handlers receive (observer, *args) - we ignore observer and delegate to handler.
|
||||
|
||||
Args:
|
||||
observer: TurnTrackingObserver instance (ignored)
|
||||
turn_number: The turn number that just started
|
||||
"""
|
||||
await self._handle_turn_started(turn_number)
|
||||
|
||||
async def _on_turn_ended_wrapper(self, observer, turn_number: int, duration: float, was_interrupted: bool):
|
||||
"""
|
||||
Wrapper for turn ended event handler.
|
||||
Event handlers receive (observer, *args) - we ignore observer and delegate to handler.
|
||||
|
||||
Args:
|
||||
observer: TurnTrackingObserver instance (ignored)
|
||||
turn_number: The turn number that just ended
|
||||
duration: Duration of the turn in seconds
|
||||
was_interrupted: Whether the turn was interrupted by user
|
||||
"""
|
||||
await self._handle_turn_ended(turn_number, duration, was_interrupted)
|
||||
|
||||
async def _handle_turn_started(self, turn_number: int):
|
||||
"""
|
||||
Handle turn started event.
|
||||
|
||||
Resets buffers and prepares for new turn audio capture.
|
||||
Buffer clearing here prevents unbounded memory growth across turns.
|
||||
|
||||
Args:
|
||||
turn_number: The turn number that just started
|
||||
"""
|
||||
self._current_turn_number = turn_number
|
||||
self._is_turn_active = True
|
||||
# Clear buffers from previous turn (prevents memory accumulation)
|
||||
self._current_user_frames = []
|
||||
self._current_ai_frames = []
|
||||
self._user_detected_rates.clear()
|
||||
self._ai_detected_rates.clear()
|
||||
|
||||
async def _handle_turn_ended(
|
||||
self, turn_number: int, duration: float, was_interrupted: bool
|
||||
):
|
||||
"""
|
||||
Handle turn ended event.
|
||||
|
||||
Args:
|
||||
turn_number: The turn number that just ended
|
||||
duration: Duration of the turn in seconds
|
||||
was_interrupted: Whether the turn was interrupted by user
|
||||
"""
|
||||
self._is_turn_active = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""
|
||||
Process audio frames and buffer them by type (user vs AI).
|
||||
|
||||
Args:
|
||||
frame: The frame to process
|
||||
direction: Direction of frame flow
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Only capture audio frames when turn is active
|
||||
if isinstance(frame, AudioRawFrame) and self._is_turn_active:
|
||||
# Pipecat uses 'sample_rate' attribute on AudioRawFrame
|
||||
frame_sample_rate = getattr(frame, 'sample_rate', None)
|
||||
|
||||
# Distinguish user audio from AI audio
|
||||
if isinstance(frame, InputAudioRawFrame):
|
||||
# User audio from microphone
|
||||
if frame_sample_rate:
|
||||
self._user_detected_rates.add(frame_sample_rate)
|
||||
|
||||
# Enforce buffer limit to prevent memory leak on very long turns
|
||||
if len(self._current_user_frames) < self.MAX_BUFFER_FRAMES:
|
||||
self._current_user_frames.append((frame.audio, frame_sample_rate))
|
||||
elif len(self._current_user_frames) == self.MAX_BUFFER_FRAMES:
|
||||
logger.warning(
|
||||
f"User audio buffer limit reached for turn {self._current_turn_number}. "
|
||||
f"Dropping additional frames to prevent memory leak."
|
||||
)
|
||||
|
||||
elif isinstance(frame, (OutputAudioRawFrame, TTSAudioRawFrame)):
|
||||
# AI/TTS audio output
|
||||
if frame_sample_rate:
|
||||
self._ai_detected_rates.add(frame_sample_rate)
|
||||
|
||||
# Enforce buffer limit to prevent memory leak on very long turns
|
||||
if len(self._current_ai_frames) < self.MAX_BUFFER_FRAMES:
|
||||
self._current_ai_frames.append((frame.audio, frame_sample_rate))
|
||||
elif len(self._current_ai_frames) == self.MAX_BUFFER_FRAMES:
|
||||
logger.warning(
|
||||
f"AI audio buffer limit reached for turn {self._current_turn_number}. "
|
||||
f"Dropping additional frames to prevent memory leak."
|
||||
)
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def save_turn_audio_sync(self, turn_number: int):
|
||||
"""
|
||||
Synchronously save audio files for the given turn number.
|
||||
Called directly by the span processor when the turn span ends.
|
||||
|
||||
Args:
|
||||
turn_number: The turn number to save
|
||||
|
||||
Returns:
|
||||
Dict with 'user' and/or 'ai' keys pointing to saved file paths, or empty dict
|
||||
"""
|
||||
saved_files = {}
|
||||
|
||||
# Only save if this is the current turn
|
||||
if turn_number != self._current_turn_number:
|
||||
return saved_files
|
||||
|
||||
try:
|
||||
# Save user audio if exists
|
||||
if self._current_user_frames:
|
||||
user_path = self._recordings_dir / f"turn_{turn_number}_user.wav"
|
||||
sample_rate = (
|
||||
max(self._user_detected_rates)
|
||||
if self._user_detected_rates
|
||||
else self._default_user_rate
|
||||
)
|
||||
self._save_wav_file(user_path, self._current_user_frames, sample_rate)
|
||||
saved_files['user'] = str(user_path)
|
||||
logger.debug(f"Saved user audio for turn {turn_number} to {user_path}")
|
||||
|
||||
# Save AI audio if exists
|
||||
if self._current_ai_frames:
|
||||
ai_path = self._recordings_dir / f"turn_{turn_number}_ai.wav"
|
||||
sample_rate = (
|
||||
max(self._ai_detected_rates)
|
||||
if self._ai_detected_rates
|
||||
else self._default_ai_rate
|
||||
)
|
||||
self._save_wav_file(ai_path, self._current_ai_frames, sample_rate)
|
||||
saved_files['ai'] = str(ai_path)
|
||||
logger.debug(f"Saved AI audio for turn {turn_number} to {ai_path}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save turn {turn_number} audio in conversation {self._conversation_id}: {e}")
|
||||
|
||||
return saved_files
|
||||
|
||||
def _save_wav_file(
|
||||
self, output_path: Path, frames: list, target_sample_rate: int
|
||||
):
|
||||
"""
|
||||
Save audio frames to a WAV file with resampling if needed.
|
||||
|
||||
Args:
|
||||
output_path: Path to save the WAV file
|
||||
frames: List of (audio_data, frame_sample_rate) tuples
|
||||
target_sample_rate: Target sample rate for the output file
|
||||
"""
|
||||
if not frames:
|
||||
return
|
||||
|
||||
with wave.open(str(output_path), 'wb') as wav_file:
|
||||
wav_file.setnchannels(self._channels)
|
||||
wav_file.setsampwidth(2) # 16-bit PCM
|
||||
wav_file.setframerate(target_sample_rate)
|
||||
|
||||
for audio_data, frame_sample_rate in frames:
|
||||
# Resample if needed
|
||||
if frame_sample_rate and frame_sample_rate != target_sample_rate:
|
||||
audio_array = np.frombuffer(audio_data, dtype=np.int16)
|
||||
num_samples = int(
|
||||
len(audio_array) * target_sample_rate / frame_sample_rate
|
||||
)
|
||||
resampled = signal.resample(audio_array, num_samples)
|
||||
audio_data = resampled.astype(np.int16).tobytes()
|
||||
|
||||
wav_file.writeframes(audio_data)
|
||||
Reference in New Issue
Block a user