This commit is contained in:
Tanushree Sharma
2025-12-07 21:28:48 -08:00
parent 2dbeca0244
commit 264ae6b59f
7 changed files with 1746 additions and 2 deletions
+2 -2
View File
@@ -51,8 +51,8 @@ env/
# Project-specific: ignore everything except pipecat-test/ and livekit-test/
# Ignore root-level files
/*
!/pipecat-test/
!/livekit-test/
!/pipecat/
!/livekit/
!README.md
!.gitignore
+617
View File
@@ -0,0 +1,617 @@
"""
LangSmith span processor for LiveKit Agents.
Enriches OpenTelemetry spans from LiveKit Agents with LangSmith-compatible attributes
for proper conversation tracking and visualization.
"""
import json
import os
import logging
from copy import deepcopy
from typing import Optional
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# Optional verbose logging for local debugging
DEBUG = os.getenv("LANGSMITH_PROCESSOR_DEBUG", "false").lower() in ("true", "1", "yes")
logger = logging.getLogger("langsmith_processor")
if DEBUG and not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s [LANGSMITH] %(levelname)s %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
class LangSmithSpanProcessor(SpanProcessor):
"""
Custom OpenTelemetry span processor that enriches LiveKit Agents spans with LangSmith-compatible attributes.
This enables proper conversation tracking and message visualization in LangSmith's UI.
"""
def __init__(self, downstream_processor: Optional[SpanProcessor] = None):
super().__init__()
if downstream_processor is None:
downstream_processor = BatchSpanProcessor(OTLPSpanExporter())
self.downstream = downstream_processor
# Track conversation messages across spans for proper LangSmith grouping
self.conversation_messages = {} # trace_id -> list of messages
self.trace_to_conversation_id = {} # trace_id -> conversation_id
# Hold root/job spans until conversation data is ready
self.deferred_job_spans = {} # trace_id -> ReadableSpan
def on_start(self, span: ReadableSpan, parent_context=None) -> None:
if self.downstream:
self.downstream.on_start(span, parent_context)
def on_end(self, span: ReadableSpan) -> None:
"""
Enriches spans with LangSmith-compatible attributes before they're exported.
Maps LiveKit Agents span types to LangSmith's expected format.
"""
# Always log that we're processing a span (even without DEBUG mode)
# Use print to stderr to ensure it's visible
import sys
print(f"[LANGSMITH-PROCESSOR] Processing span: {span.name}", file=sys.stderr, flush=True)
# Track each conversation as a thread in LangSmith
trace_id = format(span.context.trace_id, '032x')
span._attributes["langsmith.metadata.thread_id"] = trace_id
# Link all spans to their conversation for proper grouping in LangSmith
if trace_id in self.trace_to_conversation_id:
conversation_id = self.trace_to_conversation_id[trace_id]
span._attributes["conversation.id"] = conversation_id
span._attributes["langsmith.parent_span_id"] = "conversation"
span_name = span.name.lower()
# STT span: audio input -> transcribed text
if "stt" in span_name or "speech_to_text" in span_name or "transcription" in span_name:
span._attributes["langsmith.span.kind"] = "llm"
transcript = span.attributes.get("transcript") or span.attributes.get("text") or span.attributes.get("output", "")
self._set_prompt_attributes(span, [{"role": "user", "content": "audio_segment"}])
if transcript:
self._set_completion_attributes(span, [{"role": "assistant", "content": str(transcript)}])
# LLM span: conversation messages -> AI response
elif "llm" in span_name or "chat" in span_name or "completion" in span_name or "openai" in span_name:
span._attributes["langsmith.span.kind"] = "llm"
messages = self._extract_llm_messages(span)
if not messages:
messages = self._fallback_messages(span, span_name)
self._set_prompt_attributes(span, messages)
output_data = self._extract_llm_output(span)
if output_data:
completion = [{"role": "assistant", "content": str(output_data)}]
self._set_completion_attributes(span, completion)
self._track_messages(self.conversation_messages, trace_id, messages, str(output_data))
# TTS span: text -> audio
elif "tts" in span_name or "text_to_speech" in span_name or "synthesis" in span_name:
span._attributes["langsmith.span.kind"] = "llm"
# Debug TTS spans - always print attributes to see what LiveKit uses
import sys
print(f"\n[LANGSMITH-PROCESSOR] 🔊 TTS SPAN: {span.name}", file=sys.stderr, flush=True)
print(f" 📋 All attributes for {span.name} ({len(span.attributes)} total):", file=sys.stderr, flush=True)
for key, value in sorted(span.attributes.items()):
value_str = str(value)
if len(value_str) > 500:
value_str = value_str[:500] + "... (truncated)"
print(f"{key} = {value_str}", file=sys.stderr, flush=True)
# Try LiveKit-specific attributes first
text = (
span.attributes.get("lk.input_text") or
span.attributes.get("lk.request.text") or
span.attributes.get("lk.text") or
span.attributes.get("text") or
span.attributes.get("input") or
span.attributes.get("prompt") or
""
)
# Extract voice/model from lk.tts_metrics or other attributes
voice_id = "unknown"
tts_metrics = span.attributes.get("lk.tts_metrics")
if tts_metrics:
try:
if isinstance(tts_metrics, str):
metrics_data = json.loads(tts_metrics)
else:
metrics_data = tts_metrics
if isinstance(metrics_data, dict):
metadata = metrics_data.get("metadata", {})
model_name = metadata.get("model_name") or metrics_data.get("model_name")
if model_name:
voice_id = str(model_name)
except (json.JSONDecodeError, TypeError, KeyError):
pass
# Fallback to other voice attributes
if voice_id == "unknown":
voice_id = (
span.attributes.get("lk.voice") or
span.attributes.get("voice") or
span.attributes.get("voice_id") or
"unknown"
)
print(f" ✅ Extracted text: length={len(str(text))}, voice={voice_id}", file=sys.stderr, flush=True)
self._set_prompt_attributes(span, [
{"role": "system", "content": f"Convert to speech with voice: {voice_id}"},
{"role": "user", "content": str(text) if text else "text_to_speech"}
])
self._set_completion_attributes(span, [{"role": "assistant", "content": f"Generated audio for: {text}"}])
# Agent/Chain/Job spans: aggregate conversation
elif (
"agent" in span_name
or "session" in span_name
or "conversation" in span_name
or "job" in span_name
):
span._attributes["langsmith.span.kind"] = "chain"
is_job_span = "job" in span_name
# Try to extract conversation ID
conversation_id = (
span.attributes.get("conversation.id") or
span.attributes.get("conversation_id") or
span.attributes.get("session_id") or
(span.attributes.get("lk.job_id") if is_job_span else "") or
""
)
if conversation_id:
self.trace_to_conversation_id[trace_id] = str(conversation_id)
span._attributes["conversation.id"] = str(conversation_id)
span._attributes["langsmith.root_span"] = True
elif is_job_span:
# Ensure the root job span is treated as the LangSmith conversation root
span._attributes["conversation.id"] = trace_id
span._attributes["langsmith.root_span"] = True
# Aggregate messages from conversation
conv_msgs = self.conversation_messages.get(trace_id, [])
if conv_msgs:
system_msg, first_user_msg, remaining_msgs = self._split_conversation_messages(conv_msgs)
# Add input (first user message only, exclude system message)
# System message is only shown in LLM call spans, not in job entrypoint
prompt_msgs = []
if first_user_msg:
prompt_msgs.append(first_user_msg)
if prompt_msgs:
self._set_prompt_attributes(span, prompt_msgs)
# Add output (remaining conversation)
if remaining_msgs:
self._set_completion_attributes(span, remaining_msgs)
self._release_job_span_if_waiting(trace_id, prompt_msgs, remaining_msgs)
elif is_job_span:
# Defer export until conversation data becomes available
self._defer_job_span(trace_id, span)
return
# Cleanup
should_cleanup_trace = is_job_span or span.parent is None
if should_cleanup_trace:
if trace_id in self.conversation_messages:
del self.conversation_messages[trace_id]
if trace_id in self.trace_to_conversation_id:
del self.trace_to_conversation_id[trace_id]
# Default: mark as chain if no specific type detected
else:
# Check if it has LLM-like attributes
if span.attributes.get("input") or span.attributes.get("output"):
span._attributes["langsmith.span.kind"] = "llm"
input_val = span.attributes.get("input", "")
output_val = span.attributes.get("output", "")
if input_val:
self._set_prompt_attributes(span, [{"role": "user", "content": str(input_val)}])
if output_val:
self._set_completion_attributes(span, [{"role": "assistant", "content": str(output_val)}])
else:
span._attributes["langsmith.span.kind"] = "chain"
# Export span downstream (unless it was deferred earlier)
self._export_span(span)
def _set_prompt_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0, log: bool = False):
"""Set gen_ai.prompt.* attributes from a list of messages."""
import sys
for i, msg in enumerate(messages):
idx = start_idx + i
if isinstance(msg, dict):
role = msg.get("role", "user")
content = str(msg.get("content", ""))
span._attributes[f"gen_ai.prompt.{idx}.role"] = role
span._attributes[f"gen_ai.prompt.{idx}.content"] = content
if log:
content_preview = content[:100] + "..." if len(content) > 100 else content
print(f" Set gen_ai.prompt.{idx}.role = '{role}', gen_ai.prompt.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
else:
# Handle string messages
content = str(msg)
span._attributes[f"gen_ai.prompt.{idx}.role"] = "user"
span._attributes[f"gen_ai.prompt.{idx}.content"] = content
if log:
content_preview = content[:100] + "..." if len(content) > 100 else content
print(f" Set gen_ai.prompt.{idx}.role = 'user', gen_ai.prompt.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
def _set_completion_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0, log: bool = False):
"""Set gen_ai.completion.* attributes from a list of messages."""
import sys
for i, msg in enumerate(messages):
idx = start_idx + i
if isinstance(msg, dict):
role = msg.get("role", "assistant")
content = str(msg.get("content", ""))
span._attributes[f"gen_ai.completion.{idx}.role"] = role
span._attributes[f"gen_ai.completion.{idx}.content"] = content
if log:
content_preview = content[:200] + "..." if len(content) > 200 else content
print(f" Set gen_ai.completion.{idx}.role = '{role}', gen_ai.completion.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
else:
# Handle string messages
content = str(msg)
span._attributes[f"gen_ai.completion.{idx}.role"] = "assistant"
span._attributes[f"gen_ai.completion.{idx}.content"] = content
if log:
content_preview = content[:200] + "..." if len(content) > 200 else content
print(f" Set gen_ai.completion.{idx}.role = 'assistant', gen_ai.completion.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
def _fallback_messages(self, span: ReadableSpan, span_name: str) -> list:
"""Use system/user attributes or span name when no chat context is available."""
system_prompt = span.attributes.get("gen_ai.system") or span.attributes.get("system") or ""
user_prompt = (
span.attributes.get("gen_ai.user")
or span.attributes.get("user")
or span.attributes.get("input")
or ""
)
fallback = []
if system_prompt:
fallback.append({"role": "system", "content": str(system_prompt)})
if user_prompt:
fallback.append({"role": "user", "content": str(user_prompt)})
if not fallback:
fallback.append({"role": "user", "content": f"LLM request: {span_name}"})
return fallback
def _split_conversation_messages(self, messages: list) -> tuple:
"""
Split conversation messages into system, first user, and remaining messages.
Returns: (system_msg, first_user_msg, remaining_msgs)
"""
system_msg = None
first_user_msg = None
remaining_msgs = []
first_user_found = False
for msg in messages:
role = msg.get("role", "") if isinstance(msg, dict) else "user"
if role == "system" and system_msg is None:
system_msg = msg
elif role == "user" and not first_user_found:
first_user_msg = msg
first_user_found = True
elif first_user_found:
remaining_msgs.append(msg)
return (system_msg, first_user_msg, remaining_msgs)
def _extract_llm_messages(self, span: ReadableSpan) -> list:
"""
Extract LLM input messages from span attributes using multiple strategies.
Returns a list of message dicts with 'role' and 'content' keys.
"""
import sys
print(f" 🔍 Strategy 1: Checking lk.chat_ctx...", file=sys.stderr, flush=True)
# Strategy 1: LiveKit-specific attribute: lk.chat_ctx
chat_ctx = span.attributes.get("lk.chat_ctx")
if chat_ctx:
print(f" ✓ Found lk.chat_ctx, type={type(chat_ctx)}, length={len(str(chat_ctx)) if isinstance(chat_ctx, str) else 'N/A'}", file=sys.stderr, flush=True)
try:
if isinstance(chat_ctx, str):
ctx_data = json.loads(chat_ctx)
else:
ctx_data = chat_ctx
# Extract messages from items array
if isinstance(ctx_data, dict) and "items" in ctx_data:
messages = []
for item in ctx_data["items"]:
if isinstance(item, dict) and item.get("type") == "message":
role = item.get("role", "user")
content = item.get("content", "")
# Content might be a list of strings or a single string
if isinstance(content, list):
content = " ".join(str(c) for c in content)
if content:
messages.append({"role": str(role), "content": str(content)})
if messages:
print(f" ✅ Strategy 1 SUCCESS: Found {len(messages)} messages from lk.chat_ctx", file=sys.stderr, flush=True)
return messages
except (json.JSONDecodeError, TypeError, KeyError, AttributeError) as e:
print(f" ✗ Strategy 1 FAILED: {type(e).__name__}: {e}", file=sys.stderr, flush=True)
else:
print(f" ✗ lk.chat_ctx not found", file=sys.stderr, flush=True)
# Strategy 2: Check for OpenTelemetry semantic convention attributes
# gen_ai.request.prompt.* or gen_ai.prompt.*
print(f" 🔍 Strategy 2: Checking gen_ai.request.prompt.*...", file=sys.stderr, flush=True)
messages = []
idx = 0
while True:
role_key = f"gen_ai.request.prompt.{idx}.role"
content_key = f"gen_ai.request.prompt.{idx}.content"
if role_key in span.attributes or content_key in span.attributes:
role = span.attributes.get(role_key, "user")
content = span.attributes.get(content_key, "")
if content:
messages.append({"role": str(role), "content": str(content)})
idx += 1
else:
break
if messages:
print(f" ✅ Strategy 2 SUCCESS: Found {len(messages)} messages from gen_ai.request.prompt.*", file=sys.stderr, flush=True)
return messages
else:
print(f" ✗ No gen_ai.request.prompt.* attributes found", file=sys.stderr, flush=True)
# Strategy 2b: Check for gen_ai.prompt.* (alternative format)
print(f" 🔍 Strategy 2b: Checking gen_ai.prompt.*...", file=sys.stderr, flush=True)
idx = 0
while True:
role_key = f"gen_ai.prompt.{idx}.role"
content_key = f"gen_ai.prompt.{idx}.content"
if role_key in span.attributes or content_key in span.attributes:
role = span.attributes.get(role_key, "user")
content = span.attributes.get(content_key, "")
if content:
messages.append({"role": str(role), "content": str(content)})
idx += 1
else:
break
if messages:
print(f" ✅ Strategy 2b SUCCESS: Found {len(messages)} messages from gen_ai.prompt.*", file=sys.stderr, flush=True)
return messages
else:
print(f" ✗ No gen_ai.prompt.* attributes found", file=sys.stderr, flush=True)
# Strategy 3: Check for messages attribute (JSON string or list)
print(f" 🔍 Strategy 3: Checking messages/llm.messages/input attributes...", file=sys.stderr, flush=True)
messages_attr = span.attributes.get("messages") or span.attributes.get("llm.messages") or span.attributes.get("input")
print(f" Checking: messages={bool(span.attributes.get('messages'))}, llm.messages={bool(span.attributes.get('llm.messages'))}, input={bool(span.attributes.get('input'))}", file=sys.stderr, flush=True)
if messages_attr:
try:
if isinstance(messages_attr, str):
if DEBUG:
logger.debug(f" Parsing JSON string, length={len(messages_attr)}")
parsed = json.loads(messages_attr)
if isinstance(parsed, list):
# Validate and normalize message format
normalized = []
for msg in parsed:
if isinstance(msg, dict) and "content" in msg:
normalized.append({
"role": msg.get("role", "user"),
"content": str(msg.get("content", ""))
})
if normalized:
print(f" ✅ Strategy 3 SUCCESS: Found {len(normalized)} messages from JSON string", file=sys.stderr, flush=True)
return normalized
elif isinstance(messages_attr, list):
print(f" Found list type, length={len(messages_attr)}", file=sys.stderr, flush=True)
# Validate and normalize message format
normalized = []
for msg in messages_attr:
if isinstance(msg, dict) and "content" in msg:
normalized.append({
"role": msg.get("role", "user"),
"content": str(msg.get("content", ""))
})
if normalized:
print(f" ✅ Strategy 3 SUCCESS: Found {len(normalized)} messages from list", file=sys.stderr, flush=True)
return normalized
except (json.JSONDecodeError, TypeError, AttributeError) as e:
print(f" ✗ Strategy 3 FAILED: {type(e).__name__}: {e}", file=sys.stderr, flush=True)
else:
print(f" ✗ No messages attribute found", file=sys.stderr, flush=True)
# Strategy 4: Check for individual system/user/assistant attributes
print(f" 🔍 Strategy 4: Checking individual system/user/assistant attributes...", file=sys.stderr, flush=True)
system = span.attributes.get("gen_ai.system") or span.attributes.get("system") or span.attributes.get("system_prompt")
user = span.attributes.get("gen_ai.user") or span.attributes.get("user") or span.attributes.get("user_input")
assistant = span.attributes.get("gen_ai.assistant") or span.attributes.get("assistant")
print(f" system={bool(system)}, user={bool(user)}, assistant={bool(assistant)}", file=sys.stderr, flush=True)
if system or user or assistant:
result = []
if system:
result.append({"role": "system", "content": str(system)})
if user:
result.append({"role": "user", "content": str(user)})
if assistant:
result.append({"role": "assistant", "content": str(assistant)})
if result:
print(f" ✅ Strategy 4 SUCCESS: Found {len(result)} messages from individual attributes", file=sys.stderr, flush=True)
return result
else:
print(f" ✗ No individual attributes found", file=sys.stderr, flush=True)
print(f" ⚠️ All strategies failed - no messages extracted", file=sys.stderr, flush=True)
return []
def _extract_llm_output(self, span: ReadableSpan) -> str:
"""
Extract LLM output/completion from span attributes using multiple strategies.
Returns the output as a string.
"""
import sys
print(f" 🔍 EXTRACTING LLM OUTPUT:", file=sys.stderr, flush=True)
# Strategy 1: LiveKit-specific attribute: lk.response.text
print(f" Strategy 1: Checking lk.response.text...", file=sys.stderr, flush=True)
output = span.attributes.get("lk.response.text")
if output:
print(f" ✅ Strategy 1 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
return str(output)
else:
print(f" ✗ lk.response.text not found", file=sys.stderr, flush=True)
# Strategy 2: OpenTelemetry semantic convention
print(f" Strategy 2: Checking gen_ai.response.text / gen_ai.completion.text...", file=sys.stderr, flush=True)
output = span.attributes.get("gen_ai.response.text") or span.attributes.get("gen_ai.completion.text")
if output:
print(f" ✅ Strategy 2 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
return str(output)
else:
print(f" ✗ gen_ai.response.text and gen_ai.completion.text not found", file=sys.stderr, flush=True)
# Strategy 3: Common attribute names
print(f" Strategy 3: Checking common attribute names...", file=sys.stderr, flush=True)
output = (
span.attributes.get("gen_ai.response") or
span.attributes.get("gen_ai.completion") or
span.attributes.get("output") or
span.attributes.get("response") or
span.attributes.get("completion") or
span.attributes.get("llm.output") or
span.attributes.get("llm.response") or
span.attributes.get("text") or
""
)
if output:
print(f" ✅ Strategy 3 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
return str(output)
else:
print(f" ✗ No common output attributes found", file=sys.stderr, flush=True)
# Strategy 4: Check for completion.* attributes
print(f" Strategy 4: Checking gen_ai.completion.* attributes...", file=sys.stderr, flush=True)
idx = 0
completion_parts = []
while True:
content_key = f"gen_ai.completion.{idx}.content"
if content_key in span.attributes:
completion_parts.append(str(span.attributes[content_key]))
idx += 1
else:
break
if completion_parts:
print(f" ✅ Strategy 4 SUCCESS: Found {len(completion_parts)} completion parts", file=sys.stderr, flush=True)
return "\n".join(completion_parts)
else:
print(f" ✗ No gen_ai.completion.* attributes found", file=sys.stderr, flush=True)
print(f" ⚠️ All strategies failed - no output extracted", file=sys.stderr, flush=True)
return ""
def _get_messages_from_attributes(self, span: ReadableSpan) -> list:
"""Extract messages from span attributes as fallback."""
messages = []
system = span.attributes.get("gen_ai.system") or span.attributes.get("system")
user = span.attributes.get("gen_ai.user") or span.attributes.get("user") or span.attributes.get("input")
if system:
messages.append({"role": "system", "content": str(system)})
if user:
messages.append({"role": "user", "content": str(user)})
return messages
def _track_messages(self, target_dict: dict, key: str, messages: list, output_data: str):
"""
Track messages in target_dict, avoiding duplicates.
Preserves deduplication logic: case-insensitive content comparison.
"""
if key not in target_dict:
target_dict[key] = []
# Add system prompt once at the start
for msg in messages:
if isinstance(msg, dict) and msg.get("role") == "system":
target_dict[key].append(msg)
break
# Add the latest user message if it's new
last_user_msg = next(
(msg for msg in reversed(messages) if isinstance(msg, dict) and msg.get("role") == "user"),
None
)
if last_user_msg:
new_content = str(last_user_msg.get("content", "")).strip().lower()
existing_contents = [
str(m.get("content", "")).strip().lower()
for m in target_dict[key]
if isinstance(m, dict) and m.get("role") == "user"
]
if new_content and new_content not in existing_contents:
target_dict[key].append(last_user_msg)
# Add the assistant response if it's new
if output_data:
new_assistant_content = str(output_data).strip().lower()
existing_assistant_contents = [
str(m.get("content", "")).strip().lower()
for m in target_dict[key]
if isinstance(m, dict) and m.get("role") == "assistant"
]
if new_assistant_content not in existing_assistant_contents:
target_dict[key].append({"role": "assistant", "content": output_data})
def shutdown(self) -> None:
self._flush_deferred_job_spans()
if self.downstream:
self.downstream.shutdown()
def force_flush(self, timeout_millis: int = 30000) -> bool:
self._flush_deferred_job_spans()
if self.downstream:
return self.downstream.force_flush(timeout_millis)
return True
def _defer_job_span(self, trace_id: str, span: ReadableSpan):
import sys
self.deferred_job_spans[trace_id] = span
print(f"⏸️ Deferring export of job span for trace {trace_id}", file=sys.stderr, flush=True)
def _release_job_span_if_waiting(self, trace_id: str, prompt_msgs: list, completion_msgs: list):
job_span = self.deferred_job_spans.pop(trace_id, None)
if not job_span:
return
import sys
print(f"🧩 Releasing deferred job span for trace {trace_id}", file=sys.stderr, flush=True)
if prompt_msgs:
self._set_prompt_attributes(job_span, deepcopy(prompt_msgs))
if completion_msgs:
self._set_completion_attributes(job_span, deepcopy(completion_msgs))
self._export_span(job_span)
def _flush_deferred_job_spans(self):
if not self.deferred_job_spans:
return
import sys
print(f"⚠️ Flushing {len(self.deferred_job_spans)} deferred job span(s) without conversation data", file=sys.stderr, flush=True)
for trace_id, span in list(self.deferred_job_spans.items()):
self._set_prompt_attributes(span, [{"role": "system", "content": "Conversation not captured"}])
self._set_completion_attributes(span, [{"role": "assistant", "content": "No conversation turns recorded."}])
self._export_span(span)
del self.deferred_job_spans[trace_id]
def _export_span(self, span: ReadableSpan):
if self.downstream:
self.downstream.on_end(span)
+138
View File
@@ -0,0 +1,138 @@
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from livekit import agents, rtc
from livekit.agents import AgentServer, AgentSession, Agent, room_io
from livekit.agents.telemetry import set_tracer_provider
from livekit.plugins import noise_cancellation, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from opentelemetry.sdk.trace import TracerProvider
from langsmith_processor import LangSmithSpanProcessor
# Try loading .env.local first, then .env, then check parent directory
script_dir = Path(__file__).parent
env_loaded = False
env_files_tried = []
for env_file in [".env.local", ".env"]:
env_path = script_dir / env_file
if env_path.exists():
try:
load_dotenv(env_path, override=True)
env_loaded = True
break
except Exception as e:
print(
f"⚠️ Warning: Error loading {env_path}: {e}\n"
f" Continuing without loading this file...",
file=__import__("sys").stderr
)
env_files_tried.append(str(env_path))
# Also try parent directory
if not env_loaded:
parent_env = script_dir.parent / ".env.local"
if parent_env.exists():
try:
load_dotenv(parent_env, override=True)
env_loaded = True
except Exception as e:
print(
f"⚠️ Warning: Error loading {parent_env}: {e}\n"
f" Continuing without loading this file...",
file=__import__("sys").stderr
)
env_files_tried.append(str(parent_env))
else:
parent_env = script_dir.parent / ".env"
if parent_env.exists():
try:
load_dotenv(parent_env, override=True)
env_loaded = True
except Exception as e:
print(
f"⚠️ Warning: Error loading {parent_env}: {e}\n"
f" Continuing without loading this file...",
file=__import__("sys").stderr
)
env_files_tried.append(str(parent_env))
# Fallback: try loading from current directory without specifying file
if not env_loaded:
try:
load_dotenv(override=True)
except Exception as e:
print(
f"⚠️ Warning: Error loading .env file: {e}\n"
f" Continuing without environment file...",
file=__import__("sys").stderr
)
def setup_langsmith():
"""Setup OpenTelemetry tracing to export spans to LangSmith."""
endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
headers = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
if not endpoint or not headers:
print(
"⚠️ Warning: OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS not set. "
"LangSmith tracing will be disabled. To enable, set these environment variables:\n"
" - OTEL_EXPORTER_OTLP_ENDPOINT=https://api.smith.langchain.com/otel\n"
" - OTEL_EXPORTER_OTLP_HEADERS=x-api-key=your_langsmith_api_key\n"
"You can set them in a .env.local or .env file in this directory.",
file=__import__("sys").stderr
)
return
trace_provider = TracerProvider()
# Register LangSmith processor (handles enrichment + forwarding to OTLP)
trace_provider.add_span_processor(LangSmithSpanProcessor())
set_tracer_provider(trace_provider)
print("✅ LangSmith tracing enabled", file=__import__("sys").stderr)
# Setup LangSmith tracing before creating the server (optional if env vars not set)
setup_langsmith()
class Assistant(Agent):
def __init__(self) -> None:
super().__init__(
instructions="""You are a helpful voice AI assistant.
You eagerly assist users with their questions by providing information from your extensive knowledge.
Your responses are concise, to the point, and without any complex formatting or punctuation including emojis, asterisks, or other symbols.
You are curious, friendly, and have a sense of humor.""",
)
server = AgentServer()
@server.rtc_session()
async def my_agent(ctx: agents.JobContext):
session = AgentSession(
stt="assemblyai/universal-streaming:en",
llm="openai/gpt-4.1-mini",
tts="cartesia/sonic-3:9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
)
await session.start(
room=ctx.room,
agent=Assistant(),
room_options=room_io.RoomOptions(
audio_input=room_io.AudioInputOptions(
noise_cancellation=lambda params: noise_cancellation.BVCTelephony() if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP else noise_cancellation.BVC(),
),
),
)
if __name__ == "__main__":
# Run in console mode (local terminal demo)
# Override sys.argv to always use console mode, ignoring any command-line args
sys.argv = [sys.argv[0], "console"]
agents.cli.run_app(server)
+127
View File
@@ -0,0 +1,127 @@
"""
Audio recorder for capturing and saving conversation audio.
Handles sample rate mismatches between input (microphone) and output (TTS) audio
by automatically resampling to the highest detected rate.
Uses a streaming approach with a bounded buffer to avoid memory leaks on long conversations.
"""
import wave
import numpy as np
from scipy import signal
from loguru import logger
from pipecat.frames.frames import Frame, AudioRawFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class AudioRecorder(FrameProcessor):
"""
Custom frame processor that captures all audio (user input + TTS output) and saves to WAV.
Handles sample rate mismatches by resampling to the highest detected rate.
Uses a bounded buffer to prevent memory leaks - flushes to disk periodically.
"""
# Maximum frames to buffer before flushing to disk (prevents memory leak)
MAX_BUFFER_FRAMES = 500 # ~10 seconds at typical frame rates
def __init__(self, output_path: str, sample_rate: int = 24000, channels: int = 1):
super().__init__()
self.output_path = output_path
self._default_sample_rate = sample_rate # Store default, never mutate
self._target_sample_rate = None # Will be set during initialization
self.channels = channels
self.audio_frames = [] # (audio_data, frame_sample_rate) tuples
self.detected_sample_rates = set()
self._wav_file = None
self._is_initialized = False
def _initialize_wav_file(self):
"""Initialize the WAV file with headers when first frame arrives."""
if self._is_initialized:
return
try:
# Determine target sample rate: use highest detected, or fall back to default
# Store in separate variable to avoid mutating the default
self._target_sample_rate = (
max(self.detected_sample_rates)
if self.detected_sample_rates
else self._default_sample_rate
)
self._wav_file = wave.open(self.output_path, 'wb')
self._wav_file.setnchannels(self.channels)
self._wav_file.setsampwidth(2) # 16-bit PCM
self._wav_file.setframerate(self._target_sample_rate)
self._is_initialized = True
logger.debug(f"Initialized WAV file at {self.output_path} with sample rate {self._target_sample_rate}")
except Exception as e:
logger.error(f"Failed to initialize WAV file {self.output_path}: {e}")
def _flush_buffer(self):
"""Flush buffered frames to disk and clear buffer."""
if not self.audio_frames or not self._wav_file:
return
try:
for audio_data, frame_sample_rate in self.audio_frames:
# Resample if needed
if frame_sample_rate and frame_sample_rate != self._target_sample_rate:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
num_samples = int(len(audio_array) * self._target_sample_rate / frame_sample_rate)
resampled = signal.resample(audio_array, num_samples)
audio_data = resampled.astype(np.int16).tobytes()
self._wav_file.writeframes(audio_data)
# Clear buffer after writing to disk
self.audio_frames.clear()
logger.debug(f"Flushed audio buffer to {self.output_path}")
except Exception as e:
logger.error(f"Failed to flush audio buffer to {self.output_path}: {e}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Capture all audio frames
if isinstance(frame, AudioRawFrame):
# Pipecat uses 'sample_rate' attribute on AudioRawFrame
frame_sample_rate = getattr(frame, 'sample_rate', None)
if frame_sample_rate:
self.detected_sample_rates.add(frame_sample_rate)
# Initialize file on first frame
if not self._is_initialized:
self._initialize_wav_file()
# Buffer the frame
self.audio_frames.append((frame.audio, frame_sample_rate))
# Flush to disk if buffer is getting too large (prevents memory leak)
if len(self.audio_frames) >= self.MAX_BUFFER_FRAMES:
self._flush_buffer()
await self.push_frame(frame, direction)
def save_recording(self):
"""Flush any remaining buffered audio and close the WAV file."""
if not self._is_initialized:
logger.warning("No audio frames to save - WAV file was never initialized")
return
try:
# Flush any remaining buffered frames
self._flush_buffer()
# Close the WAV file
if self._wav_file:
self._wav_file.close()
self._wav_file = None
logger.info(f"Recording saved to: {self.output_path}")
except Exception as e:
logger.error(f"Failed to save recording to {self.output_path}: {e}")
finally:
self._is_initialized = False
+400
View File
@@ -0,0 +1,400 @@
"""
LangSmith span processor for Pipecat.
Enriches OpenTelemetry spans from Pipecat with LangSmith-compatible attributes
for proper conversation tracking and visualization.
"""
import base64
import json
from pathlib import Path
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan, TracerProvider
from opentelemetry import trace
from loguru import logger
from pipecat.utils.tracing.setup import setup_tracing
class LangSmithSTTSpanProcessor(SpanProcessor):
"""
Custom OpenTelemetry span processor that enriches Pipecat spans with LangSmith-compatible attributes.
This enables proper conversation tracking and message visualization in LangSmith's UI.
"""
def __init__(self):
super().__init__()
# Track conversation messages across spans for proper LangSmith grouping
self.conversation_messages = {} # trace_id -> list of messages
self.turn_messages = {} # parent_span_id -> list of messages
self.trace_to_conversation_id = {} # trace_id -> conversation_id
self.conversation_recordings = {} # conversation_id -> recording_path
self.conversation_recorders = {} # conversation_id -> AudioRecorder instance
self.turn_recordings = {} # conversation_id -> {turn_number: {user: path, ai: path}}
self.turn_audio_recorders = {} # conversation_id -> TurnAudioRecorder instance
def on_start(self, span: ReadableSpan, parent_context=None) -> None:
pass
def register_recording(self, conversation_id: str, recording_path: str, audio_recorder=None):
"""Register a recording path and optional audio recorder for a conversation to attach to the root span."""
self.conversation_recordings[conversation_id] = recording_path
if audio_recorder:
self.conversation_recorders[conversation_id] = audio_recorder
def register_turn_audio_recorder(self, conversation_id: str, turn_audio_recorder):
"""
Register the TurnAudioRecorder instance for a conversation.
This allows the span processor to call save_turn_audio_sync() when turn spans end.
Args:
conversation_id: The conversation ID
turn_audio_recorder: TurnAudioRecorder instance
"""
self.turn_audio_recorders[conversation_id] = turn_audio_recorder
def register_turn_recording(self, conversation_id: str, turn_number: int, recording_paths: dict):
"""
Register turn-specific audio recordings for attachment to turn spans.
Args:
conversation_id: The conversation ID
turn_number: The turn number
recording_paths: Dict with 'user' and/or 'ai' keys pointing to WAV file paths
"""
if conversation_id not in self.turn_recordings:
self.turn_recordings[conversation_id] = {}
self.turn_recordings[conversation_id][turn_number] = recording_paths
def on_end(self, span: ReadableSpan) -> None:
"""
Enriches spans with LangSmith-compatible attributes before they're exported.
Maps Pipecat span types (stt, llm, tts, turn, conversation) to LangSmith's expected format.
"""
# Track each conversation as a thread in LangSmith
trace_id = format(span.context.trace_id, '032x')
span._attributes["langsmith.metadata.thread_id"] = trace_id
# Link all spans to their conversation for proper grouping in LangSmith
if trace_id in self.trace_to_conversation_id:
conversation_id = self.trace_to_conversation_id[trace_id]
span._attributes["conversation.id"] = conversation_id
span._attributes["langsmith.parent_span_id"] = "conversation"
# STT span: audio input -> transcribed text
if span.name == "stt":
transcript = span.attributes.get("transcript", "")
span._attributes["langsmith.span.kind"] = "llm"
self._set_prompt_attributes(span, [{"role": "user", "content": "audio_segment"}])
self._set_completion_attributes(span, [{"role": "assistant", "content": transcript}])
# LLM span: conversation messages -> AI response
elif span.name == "llm":
input_data = span.attributes.get("input", "")
output_data = span.attributes.get("output", "")
span._attributes["langsmith.span.kind"] = "llm"
# Parse and add input messages
messages = []
try:
messages = json.loads(input_data)
self._set_prompt_attributes(span, messages)
except json.JSONDecodeError:
pass
# Add LLM output
if output_data:
self._set_completion_attributes(span, [{"role": "assistant", "content": output_data}])
# Track messages for aggregating into turn and conversation spans
parent_span_id = format(span.parent.span_id, '016x') if span.parent else None
self._track_messages(self.conversation_messages, trace_id, messages, output_data)
if parent_span_id:
self._track_messages(self.turn_messages, parent_span_id, messages, output_data)
# TTS span: text -> audio
elif span.name == "tts":
text = span.attributes.get("text", "")
voice_id = span.attributes.get("voice_id", "")
span._attributes["langsmith.span.kind"] = "llm"
self._set_prompt_attributes(span, [
{"role": "system", "content": f"Convert to speech with voice: {voice_id}"},
{"role": "user", "content": text}
])
self._set_completion_attributes(span, [{"role": "assistant", "content": f"Generated audio for: {text}"}])
# Turn span: represents a single user-assistant interaction
elif span.name == "turn":
turn_number = span.attributes.get("turn.number", 0)
was_interrupted = span.attributes.get("turn.was_interrupted", False)
span._attributes["langsmith.span.kind"] = "chain"
# Aggregate messages from this turn's child spans
span_id = format(span.context.span_id, '016x')
turn_msgs = self.turn_messages.get(span_id, [])
user_msgs = self._get_messages_by_role(turn_msgs, "user")
assistant_msgs = self._get_messages_by_role(turn_msgs, "assistant")
# Add user input(s)
if user_msgs:
self._set_prompt_attributes(span, user_msgs)
else:
self._set_prompt_attributes(span, [{"role": "user", "content": f"Turn {turn_number}"}])
# Add assistant response(s)
if assistant_msgs:
self._set_completion_attributes(span, assistant_msgs)
else:
status = "interrupted" if was_interrupted else "no response"
self._set_completion_attributes(span, [{"role": "assistant", "content": status}])
# Attach turn audio files - save them NOW before span is exported
conversation_id = span.attributes.get("conversation.id", "")
if not conversation_id:
conversation_id = self.trace_to_conversation_id.get(trace_id, "")
# Get the turn audio recorder and save files synchronously RIGHT NOW
if conversation_id and conversation_id in self.turn_audio_recorders:
turn_audio_recorder = self.turn_audio_recorders[conversation_id]
# Save files synchronously before span is exported
turn_files = turn_audio_recorder.save_turn_audio_sync(turn_number)
if turn_files:
attachments = []
# Attach user audio
if 'user' in turn_files:
user_audio = self._load_audio_file(turn_files['user'])
if user_audio:
attachments.append({
"name": f"turn_{turn_number}_user.wav",
"content": user_audio,
"mime_type": "audio/wav"
})
# Attach AI audio
if 'ai' in turn_files:
ai_audio = self._load_audio_file(turn_files['ai'])
if ai_audio:
attachments.append({
"name": f"turn_{turn_number}_ai.wav",
"content": ai_audio,
"mime_type": "audio/wav"
})
if attachments:
span._attributes["langsmith.attachments"] = json.dumps(attachments)
# Cleanup
if span_id in self.turn_messages:
del self.turn_messages[span_id]
# Conversation span: represents the entire conversation session
elif span.name == "conversation":
conversation_id = span.attributes.get("conversation.id", "")
conversation_type = span.attributes.get("conversation.type", "voice")
# Try alternative conversation_id keys if the standard one is empty
if not conversation_id:
conversation_id = span.attributes.get("conversation_id", "")
# Store conversation_id for linking child spans
self.trace_to_conversation_id[trace_id] = conversation_id
span._attributes["langsmith.span.kind"] = "chain"
span._attributes["langsmith.root_span"] = True
# Aggregate all messages from the conversation
conv_msgs = self.conversation_messages.get(trace_id, [])
if conv_msgs:
system_msg, first_user_msg, remaining_msgs = self._split_conversation_messages(conv_msgs)
# Add input (first user message only, exclude system message)
prompt_msgs = []
if first_user_msg:
prompt_msgs.append(first_user_msg)
self._set_prompt_attributes(span, prompt_msgs)
# Add output (remaining conversation)
if remaining_msgs:
self._set_completion_attributes(span, remaining_msgs)
else:
self._set_completion_attributes(span, [{"role": "assistant", "content": "No responses yet"}])
else:
self._set_prompt_attributes(span, [{"role": "system", "content": f"Starting {conversation_type} conversation"}])
self._set_completion_attributes(span, [{"role": "assistant", "content": "No messages"}])
# Save and attach recording if available
# First, try to save the recording if we have the audio recorder instance
if conversation_id and conversation_id in self.conversation_recorders:
audio_recorder = self.conversation_recorders[conversation_id]
try:
audio_recorder.save_recording()
except Exception as e:
logger.warning(f"Failed to save recording for conversation {conversation_id}: {e}")
# Try to find recording by conversation_id, or use single available recording
recording_path_str = None
if conversation_id and conversation_id in self.conversation_recordings:
recording_path_str = self.conversation_recordings[conversation_id]
elif len(self.conversation_recordings) == 1:
# If there's only one recording, use it (likely this conversation)
recording_path_str = list(self.conversation_recordings.values())[0]
if recording_path_str:
recording_path = Path(recording_path_str)
# Try to read the file once - don't block the telemetry pipeline with retries
# The file should already exist since save_recording() is called before span ends
if recording_path.exists():
try:
with open(recording_path, 'rb') as f:
audio_data = f.read()
if audio_data: # Ensure file is not empty
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
attachments = [{
"name": recording_path.name,
"content": audio_base64,
"mime_type": "audio/wav"
}]
span._attributes["langsmith.attachments"] = json.dumps(attachments)
logger.debug(f"Attached recording {recording_path.name} to conversation span")
else:
logger.warning(f"Recording file {recording_path} exists but is empty")
except Exception as e:
logger.warning(f"Failed to read recording file {recording_path}: {e}")
else:
logger.warning(f"Recording file {recording_path} does not exist when attaching to span")
# Cleanup
if trace_id in self.conversation_messages:
del self.conversation_messages[trace_id]
if trace_id in self.trace_to_conversation_id:
del self.trace_to_conversation_id[trace_id]
if conversation_id in self.conversation_recordings:
del self.conversation_recordings[conversation_id]
if conversation_id in self.conversation_recorders:
del self.conversation_recorders[conversation_id]
if conversation_id in self.turn_recordings:
del self.turn_recordings[conversation_id]
if conversation_id in self.turn_audio_recorders:
del self.turn_audio_recorders[conversation_id]
def _set_prompt_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0):
"""Set gen_ai.prompt.* attributes from a list of messages."""
for i, msg in enumerate(messages):
idx = start_idx + i
span._attributes[f"gen_ai.prompt.{idx}.role"] = msg.get("role", "")
span._attributes[f"gen_ai.prompt.{idx}.content"] = msg.get("content", "")
def _set_completion_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0):
"""Set gen_ai.completion.* attributes from a list of messages."""
for i, msg in enumerate(messages):
idx = start_idx + i
span._attributes[f"gen_ai.completion.{idx}.role"] = msg.get("role", "")
span._attributes[f"gen_ai.completion.{idx}.content"] = msg.get("content", "")
def _get_messages_by_role(self, messages: list, role: str) -> list:
"""Filter messages by role."""
return [msg for msg in messages if msg.get("role") == role]
def _split_conversation_messages(self, messages: list) -> tuple:
"""
Split conversation messages into system, first user, and remaining messages.
Returns: (system_msg, first_user_msg, remaining_msgs)
"""
system_msg = None
first_user_msg = None
remaining_msgs = []
first_user_found = False
for msg in messages:
role = msg.get("role", "")
if role == "system" and system_msg is None:
system_msg = msg
elif role == "user" and not first_user_found:
first_user_msg = msg
first_user_found = True
elif first_user_found:
remaining_msgs.append(msg)
return (system_msg, first_user_msg, remaining_msgs)
def _track_messages(self, target_dict: dict, key: str, messages: list, output_data: str):
"""
Track messages in target_dict, avoiding duplicates.
Preserves all deduplication logic: case-insensitive content comparison,
system prompt handling, and duplicate detection.
"""
if key not in target_dict:
target_dict[key] = []
# Add system prompt once at the start
for msg in messages:
if msg.get("role") == "system":
target_dict[key].append(msg)
break
# Add the latest user message if it's new
last_user_msg = next((msg for msg in reversed(messages) if msg.get("role") == "user"), None)
if last_user_msg:
new_content = last_user_msg.get("content", "").strip().lower()
existing_contents = [m.get("content", "").strip().lower()
for m in target_dict[key] if m.get("role") == "user"]
if new_content and new_content not in existing_contents:
target_dict[key].append(last_user_msg)
# Add the assistant response if it's new
if output_data:
new_assistant_content = output_data.strip().lower()
existing_assistant_contents = [m.get("content", "").strip().lower()
for m in target_dict[key] if m.get("role") == "assistant"]
if new_assistant_content not in existing_assistant_contents:
target_dict[key].append({"role": "assistant", "content": output_data})
def _load_audio_file(self, file_path: str):
"""
Load and base64 encode an audio file.
Args:
file_path: Path to the audio file
Returns:
Base64-encoded audio data or None if file doesn't exist/can't be read
"""
try:
recording_path = Path(file_path)
if recording_path.exists():
with open(recording_path, 'rb') as f:
audio_data = f.read()
if audio_data:
return base64.b64encode(audio_data).decode('utf-8')
except Exception as e:
logger.warning(f"Failed to load audio file {file_path}: {e}")
return None
def shutdown(self) -> None:
pass
def force_flush(self, timeout_millis: int = 30000) -> bool:
return True
# Setup OpenTelemetry tracing with LangSmith
# Configure OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS in your .env
setup_tracing(
service_name="pipecat-langsmith-demo",
exporter=OTLPSpanExporter(),
console_export=False, # Disable console export to avoid base64 audio spam
)
# Register our custom span processor to enrich Pipecat spans for LangSmith
tracer_provider = trace.get_tracer_provider()
if isinstance(tracer_provider, TracerProvider):
span_processor = LangSmithSTTSpanProcessor()
tracer_provider.add_span_processor(span_processor)
else:
# Fallback if tracer provider is not the expected type
span_processor = LangSmithSTTSpanProcessor()
+174
View File
@@ -0,0 +1,174 @@
"""
Pipecat + LangSmith Demo: Voice Agent with Full Observability
This demo shows how to build a voice agent using Pipecat and send telemetry to LangSmith
for observability. It uses a local Whisper model that takes in microphone input for Speech To Text (STT), OpenAI for the LLM call and for Text To Speech (TTS), and records conversations.
Core flow: Audio input -> STT -> LLM -> TTS -> Audio output
Setup: Configure these environment variables in your .env file:
- OPENAI_API_KEY: Your OpenAI API key
- OTEL_EXPORTER_OTLP_ENDPOINT: LangSmith OTLP endpoint (https://api.smith.langchain.com/otel for LangSmith SaaS)
- OTEL_EXPORTER_OTLP_HEADERS: LangSmith auth headers (x-api-key=your_key)
"""
import asyncio
import os
import sys
import uuid
import warnings
from datetime import datetime
from pathlib import Path
import numpy as np
from dotenv import load_dotenv
from loguru import logger
# Load environment variables FIRST (before any imports that need them)
load_dotenv(override=True)
# Suppress numpy warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='faster_whisper')
np.seterr(divide='ignore', invalid='ignore')
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.whisper.stt import WhisperSTTService
from pipecat.services.openai import OpenAILLMService, OpenAITTSService, OpenAISTTService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
from langsmith_processor import LangSmithSTTSpanProcessor, span_processor # noqa: F401 - registers processor
from audio_recorder import AudioRecorder
from turn_audio_recorder import TurnAudioRecorder
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
def validate_environment():
"""
Validate that all required environment variables are set.
Raises:
ValueError: If any required environment variable is missing
"""
required_vars = {
"OPENAI_API_KEY": "OpenAI API key for LLM and TTS services",
"OTEL_EXPORTER_OTLP_ENDPOINT": "LangSmith OTLP endpoint (e.g., https://api.smith.langchain.com/otel)",
"OTEL_EXPORTER_OTLP_HEADERS": "LangSmith authentication headers (e.g., x-api-key=your_key)",
}
missing_vars = []
for var_name, description in required_vars.items():
if not os.getenv(var_name):
missing_vars.append(f" - {var_name}: {description}")
if missing_vars:
error_msg = "Missing required environment variables:\n" + "\n".join(missing_vars)
error_msg += "\n\nPlease set these in your .env file or environment."
logger.error(error_msg)
raise ValueError(error_msg)
logger.info("Environment validation passed")
async def main():
"""
Main demo function showing how to build a voice agent with Pipecat and LangSmith observability.
Flow: Audio input -> STT -> LLM -> TTS -> Audio output
"""
# Validate environment variables before proceeding
validate_environment()
# Generate unique conversation ID (used for grouping spans in LangSmith)
conversation_id = str(uuid.uuid4())
logger.info(f"Starting conversation: {conversation_id}")
# Setup recording directory (in same directory as this script)
recordings_dir = Path(__file__).parent / "recordings"
recordings_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
recording_path = recordings_dir / f"conversation_{timestamp}.wav"
# Configure local audio transport with voice activity detection
transport = LocalAudioTransport(
LocalAudioTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
)
# Initialize services (API keys read from .env)
stt = WhisperSTTService()
llm = OpenAILLMService(model="gpt-4o-mini")
tts = OpenAITTSService(voice="alloy")
# Define the system prompt and conversation context
context = OpenAILLMContext(
messages=[
{
"role": "system",
"content": """You are an expert French tutor coach. Your role is to help students learn and practice French by having natural conversations in French with students at their level. Assume the user is a beginner and start simple. Keep your responses short, let the user guide the conversation. Assume the user is speaking in french and if you cant understand then ask them to speak louder and slower."""
}
]
)
context_aggregator = llm.create_context_aggregator(context)
audio_recorder = AudioRecorder(str(recording_path))
# Create turn audio recorder for per-turn audio snippets
turn_audio_recorder = TurnAudioRecorder(
span_processor=span_processor,
conversation_id=conversation_id,
recordings_dir=recordings_dir,
turn_tracker=None, # Will be set after task creation
)
# Register recorders with span processor
span_processor.register_recording(conversation_id, str(recording_path), audio_recorder=audio_recorder)
span_processor.register_turn_audio_recorder(conversation_id, turn_audio_recorder)
# Build the pipeline: audio flows through each processor in order
pipeline = Pipeline([
transport.input(), # Capture microphone input
stt, # Transcribe audio to text
context_aggregator.user(), # Add user message to context
llm, # Generate AI response
tts, # Convert response to speech
audio_recorder, # Record all audio (full conversation)
turn_audio_recorder, # Record per-turn audio snippets
transport.output(), # Play audio through speakers
context_aggregator.assistant(), # Add assistant message to context
])
# Create task with tracing enabled for LangSmith observability
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True),
enable_tracing=True,
enable_turn_tracking=True, # Required when tracing is enabled
conversation_id=conversation_id,
)
# Wire up turn tracker to turn audio recorder
# The TurnTrackingObserver is created by the task when enable_turn_tracking=True
if task.turn_tracking_observer:
turn_audio_recorder.connect_to_turn_tracker(task.turn_tracking_observer)
logger.info("Turn audio recorder connected to turn tracker")
else:
logger.warning("TurnTrackingObserver not found - turn audio recording disabled")
runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True)
try:
await runner.run(task)
finally:
# Save recording before conversation span completes (order matters for LangSmith attachments)
audio_recorder.save_recording()
logger.info(f"Recording saved to: {recording_path}")
if __name__ == "__main__":
asyncio.run(main())
+288
View File
@@ -0,0 +1,288 @@
"""
Turn-aware audio recorder for capturing and saving per-turn audio snippets.
Records separate audio files for user speech and AI responses for each conversation turn,
enabling fine-grained audio analysis in LangSmith.
"""
import wave
import numpy as np
from pathlib import Path
from typing import Optional
from scipy import signal
from loguru import logger
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
InputAudioRawFrame,
OutputAudioRawFrame,
TTSAudioRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class TurnAudioRecorder(FrameProcessor):
"""
Frame processor that captures user and AI audio separately per turn.
Subscribes to TurnTrackingObserver events to detect turn boundaries and saves
separate WAV files for user and AI audio when each turn ends.
Uses bounded buffers to prevent memory issues on very long turns.
"""
# Maximum frames to buffer per turn per source (prevents memory leak on long turns)
MAX_BUFFER_FRAMES = 1000 # ~20 seconds at typical frame rates
def __init__(
self,
span_processor,
conversation_id: str,
recordings_dir: Path,
turn_tracker=None,
user_sample_rate: int = 16000,
ai_sample_rate: int = 24000,
channels: int = 1,
):
"""
Initialize turn audio recorder.
Args:
span_processor: LangSmithSTTSpanProcessor instance for registering recordings
conversation_id: Unique conversation identifier
recordings_dir: Directory to save audio files
turn_tracker: TurnTrackingObserver instance (can be set later via connect_to_turn_tracker)
user_sample_rate: Default sample rate for user audio (16kHz typical for mic)
ai_sample_rate: Default sample rate for AI audio (24kHz typical for TTS)
channels: Number of audio channels (1 for mono)
"""
super().__init__()
self._span_processor = span_processor
self._conversation_id = conversation_id
self._recordings_dir = Path(recordings_dir)
self._turn_tracker = turn_tracker
self._channels = channels
# Current turn state
self._current_turn_number = 0
self._is_turn_active = False
# Buffers for current turn: [(audio_data, sample_rate), ...]
self._current_user_frames = []
self._current_ai_frames = []
# Track detected sample rates separately for user and AI
self._user_detected_rates = set()
self._ai_detected_rates = set()
# Default sample rates
self._default_user_rate = user_sample_rate
self._default_ai_rate = ai_sample_rate
# Map turn numbers to saved file paths
self._turn_recordings = {} # turn_number -> {user: path, ai: path}
# Ensure recordings directory exists
self._recordings_dir.mkdir(parents=True, exist_ok=True)
logger.info(
f"TurnAudioRecorder initialized for conversation {conversation_id}"
)
def connect_to_turn_tracker(self, turn_tracker):
"""
Connect to turn tracker and register event handlers.
This method should be called after the PipelineTask is created,
as the TurnTrackingObserver is created by the task.
Args:
turn_tracker: TurnTrackingObserver instance
"""
self._turn_tracker = turn_tracker
# Register event handlers using wrapper methods
turn_tracker.add_event_handler("on_turn_started", self._on_turn_started_wrapper)
turn_tracker.add_event_handler("on_turn_ended", self._on_turn_ended_wrapper)
logger.info(
f"TurnAudioRecorder connected to TurnTrackingObserver for conversation {self._conversation_id}"
)
async def _on_turn_started_wrapper(self, observer, turn_number: int):
"""
Wrapper for turn started event handler.
Event handlers receive (observer, *args) - we ignore observer and delegate to handler.
Args:
observer: TurnTrackingObserver instance (ignored)
turn_number: The turn number that just started
"""
await self._handle_turn_started(turn_number)
async def _on_turn_ended_wrapper(self, observer, turn_number: int, duration: float, was_interrupted: bool):
"""
Wrapper for turn ended event handler.
Event handlers receive (observer, *args) - we ignore observer and delegate to handler.
Args:
observer: TurnTrackingObserver instance (ignored)
turn_number: The turn number that just ended
duration: Duration of the turn in seconds
was_interrupted: Whether the turn was interrupted by user
"""
await self._handle_turn_ended(turn_number, duration, was_interrupted)
async def _handle_turn_started(self, turn_number: int):
"""
Handle turn started event.
Resets buffers and prepares for new turn audio capture.
Buffer clearing here prevents unbounded memory growth across turns.
Args:
turn_number: The turn number that just started
"""
self._current_turn_number = turn_number
self._is_turn_active = True
# Clear buffers from previous turn (prevents memory accumulation)
self._current_user_frames = []
self._current_ai_frames = []
self._user_detected_rates.clear()
self._ai_detected_rates.clear()
async def _handle_turn_ended(
self, turn_number: int, duration: float, was_interrupted: bool
):
"""
Handle turn ended event.
Args:
turn_number: The turn number that just ended
duration: Duration of the turn in seconds
was_interrupted: Whether the turn was interrupted by user
"""
self._is_turn_active = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""
Process audio frames and buffer them by type (user vs AI).
Args:
frame: The frame to process
direction: Direction of frame flow
"""
await super().process_frame(frame, direction)
# Only capture audio frames when turn is active
if isinstance(frame, AudioRawFrame) and self._is_turn_active:
# Pipecat uses 'sample_rate' attribute on AudioRawFrame
frame_sample_rate = getattr(frame, 'sample_rate', None)
# Distinguish user audio from AI audio
if isinstance(frame, InputAudioRawFrame):
# User audio from microphone
if frame_sample_rate:
self._user_detected_rates.add(frame_sample_rate)
# Enforce buffer limit to prevent memory leak on very long turns
if len(self._current_user_frames) < self.MAX_BUFFER_FRAMES:
self._current_user_frames.append((frame.audio, frame_sample_rate))
elif len(self._current_user_frames) == self.MAX_BUFFER_FRAMES:
logger.warning(
f"User audio buffer limit reached for turn {self._current_turn_number}. "
f"Dropping additional frames to prevent memory leak."
)
elif isinstance(frame, (OutputAudioRawFrame, TTSAudioRawFrame)):
# AI/TTS audio output
if frame_sample_rate:
self._ai_detected_rates.add(frame_sample_rate)
# Enforce buffer limit to prevent memory leak on very long turns
if len(self._current_ai_frames) < self.MAX_BUFFER_FRAMES:
self._current_ai_frames.append((frame.audio, frame_sample_rate))
elif len(self._current_ai_frames) == self.MAX_BUFFER_FRAMES:
logger.warning(
f"AI audio buffer limit reached for turn {self._current_turn_number}. "
f"Dropping additional frames to prevent memory leak."
)
await self.push_frame(frame, direction)
def save_turn_audio_sync(self, turn_number: int):
"""
Synchronously save audio files for the given turn number.
Called directly by the span processor when the turn span ends.
Args:
turn_number: The turn number to save
Returns:
Dict with 'user' and/or 'ai' keys pointing to saved file paths, or empty dict
"""
saved_files = {}
# Only save if this is the current turn
if turn_number != self._current_turn_number:
return saved_files
try:
# Save user audio if exists
if self._current_user_frames:
user_path = self._recordings_dir / f"turn_{turn_number}_user.wav"
sample_rate = (
max(self._user_detected_rates)
if self._user_detected_rates
else self._default_user_rate
)
self._save_wav_file(user_path, self._current_user_frames, sample_rate)
saved_files['user'] = str(user_path)
logger.debug(f"Saved user audio for turn {turn_number} to {user_path}")
# Save AI audio if exists
if self._current_ai_frames:
ai_path = self._recordings_dir / f"turn_{turn_number}_ai.wav"
sample_rate = (
max(self._ai_detected_rates)
if self._ai_detected_rates
else self._default_ai_rate
)
self._save_wav_file(ai_path, self._current_ai_frames, sample_rate)
saved_files['ai'] = str(ai_path)
logger.debug(f"Saved AI audio for turn {turn_number} to {ai_path}")
except Exception as e:
logger.error(f"Failed to save turn {turn_number} audio in conversation {self._conversation_id}: {e}")
return saved_files
def _save_wav_file(
self, output_path: Path, frames: list, target_sample_rate: int
):
"""
Save audio frames to a WAV file with resampling if needed.
Args:
output_path: Path to save the WAV file
frames: List of (audio_data, frame_sample_rate) tuples
target_sample_rate: Target sample rate for the output file
"""
if not frames:
return
with wave.open(str(output_path), 'wb') as wav_file:
wav_file.setnchannels(self._channels)
wav_file.setsampwidth(2) # 16-bit PCM
wav_file.setframerate(target_sample_rate)
for audio_data, frame_sample_rate in frames:
# Resample if needed
if frame_sample_rate and frame_sample_rate != target_sample_rate:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
num_samples = int(
len(audio_array) * target_sample_rate / frame_sample_rate
)
resampled = signal.resample(audio_array, num_samples)
audio_data = resampled.astype(np.int16).tobytes()
wav_file.writeframes(audio_data)