mirror of
https://github.com/langchain-ai/voice-agents-tracing.git
synced 2026-07-01 19:54:16 -04:00
618 lines
30 KiB
Python
618 lines
30 KiB
Python
"""
|
|
LangSmith span processor for LiveKit Agents.
|
|
|
|
Enriches OpenTelemetry spans from LiveKit Agents with LangSmith-compatible attributes
|
|
for proper conversation tracking and visualization.
|
|
"""
|
|
import json
|
|
import os
|
|
import logging
|
|
from copy import deepcopy
|
|
from typing import Optional
|
|
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
|
|
|
# Optional verbose logging for local debugging
|
|
DEBUG = os.getenv("LANGSMITH_PROCESSOR_DEBUG", "false").lower() in ("true", "1", "yes")
|
|
logger = logging.getLogger("langsmith_processor")
|
|
if DEBUG and not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(logging.Formatter('%(asctime)s [LANGSMITH] %(levelname)s %(message)s'))
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class LangSmithSpanProcessor(SpanProcessor):
|
|
"""
|
|
Custom OpenTelemetry span processor that enriches LiveKit Agents spans with LangSmith-compatible attributes.
|
|
This enables proper conversation tracking and message visualization in LangSmith's UI.
|
|
"""
|
|
|
|
def __init__(self, downstream_processor: Optional[SpanProcessor] = None):
|
|
super().__init__()
|
|
if downstream_processor is None:
|
|
downstream_processor = BatchSpanProcessor(OTLPSpanExporter())
|
|
self.downstream = downstream_processor
|
|
# Track conversation messages across spans for proper LangSmith grouping
|
|
self.conversation_messages = {} # trace_id -> list of messages
|
|
self.trace_to_conversation_id = {} # trace_id -> conversation_id
|
|
# Hold root/job spans until conversation data is ready
|
|
self.deferred_job_spans = {} # trace_id -> ReadableSpan
|
|
|
|
def on_start(self, span: ReadableSpan, parent_context=None) -> None:
|
|
if self.downstream:
|
|
self.downstream.on_start(span, parent_context)
|
|
|
|
def on_end(self, span: ReadableSpan) -> None:
|
|
"""
|
|
Enriches spans with LangSmith-compatible attributes before they're exported.
|
|
Maps LiveKit Agents span types to LangSmith's expected format.
|
|
"""
|
|
# Always log that we're processing a span (even without DEBUG mode)
|
|
# Use print to stderr to ensure it's visible
|
|
import sys
|
|
print(f"[LANGSMITH-PROCESSOR] Processing span: {span.name}", file=sys.stderr, flush=True)
|
|
|
|
# Track each conversation as a thread in LangSmith
|
|
trace_id = format(span.context.trace_id, '032x')
|
|
span._attributes["langsmith.metadata.thread_id"] = trace_id
|
|
|
|
# Link all spans to their conversation for proper grouping in LangSmith
|
|
if trace_id in self.trace_to_conversation_id:
|
|
conversation_id = self.trace_to_conversation_id[trace_id]
|
|
span._attributes["conversation.id"] = conversation_id
|
|
span._attributes["langsmith.parent_span_id"] = "conversation"
|
|
|
|
span_name = span.name.lower()
|
|
|
|
# STT span: audio input -> transcribed text
|
|
if "stt" in span_name or "speech_to_text" in span_name or "transcription" in span_name:
|
|
span._attributes["langsmith.span.kind"] = "llm"
|
|
transcript = span.attributes.get("transcript") or span.attributes.get("text") or span.attributes.get("output", "")
|
|
self._set_prompt_attributes(span, [{"role": "user", "content": "audio_segment"}])
|
|
if transcript:
|
|
self._set_completion_attributes(span, [{"role": "assistant", "content": str(transcript)}])
|
|
|
|
# LLM span: conversation messages -> AI response
|
|
elif "llm" in span_name or "chat" in span_name or "completion" in span_name or "openai" in span_name:
|
|
span._attributes["langsmith.span.kind"] = "llm"
|
|
messages = self._extract_llm_messages(span)
|
|
if not messages:
|
|
messages = self._fallback_messages(span, span_name)
|
|
self._set_prompt_attributes(span, messages)
|
|
|
|
output_data = self._extract_llm_output(span)
|
|
if output_data:
|
|
completion = [{"role": "assistant", "content": str(output_data)}]
|
|
self._set_completion_attributes(span, completion)
|
|
self._track_messages(self.conversation_messages, trace_id, messages, str(output_data))
|
|
|
|
# TTS span: text -> audio
|
|
elif "tts" in span_name or "text_to_speech" in span_name or "synthesis" in span_name:
|
|
span._attributes["langsmith.span.kind"] = "llm"
|
|
|
|
# Debug TTS spans - always print attributes to see what LiveKit uses
|
|
import sys
|
|
print(f"\n[LANGSMITH-PROCESSOR] 🔊 TTS SPAN: {span.name}", file=sys.stderr, flush=True)
|
|
print(f" 📋 All attributes for {span.name} ({len(span.attributes)} total):", file=sys.stderr, flush=True)
|
|
for key, value in sorted(span.attributes.items()):
|
|
value_str = str(value)
|
|
if len(value_str) > 500:
|
|
value_str = value_str[:500] + "... (truncated)"
|
|
print(f" • {key} = {value_str}", file=sys.stderr, flush=True)
|
|
|
|
# Try LiveKit-specific attributes first
|
|
text = (
|
|
span.attributes.get("lk.input_text") or
|
|
span.attributes.get("lk.request.text") or
|
|
span.attributes.get("lk.text") or
|
|
span.attributes.get("text") or
|
|
span.attributes.get("input") or
|
|
span.attributes.get("prompt") or
|
|
""
|
|
)
|
|
|
|
# Extract voice/model from lk.tts_metrics or other attributes
|
|
voice_id = "unknown"
|
|
tts_metrics = span.attributes.get("lk.tts_metrics")
|
|
if tts_metrics:
|
|
try:
|
|
if isinstance(tts_metrics, str):
|
|
metrics_data = json.loads(tts_metrics)
|
|
else:
|
|
metrics_data = tts_metrics
|
|
if isinstance(metrics_data, dict):
|
|
metadata = metrics_data.get("metadata", {})
|
|
model_name = metadata.get("model_name") or metrics_data.get("model_name")
|
|
if model_name:
|
|
voice_id = str(model_name)
|
|
except (json.JSONDecodeError, TypeError, KeyError):
|
|
pass
|
|
|
|
# Fallback to other voice attributes
|
|
if voice_id == "unknown":
|
|
voice_id = (
|
|
span.attributes.get("lk.voice") or
|
|
span.attributes.get("voice") or
|
|
span.attributes.get("voice_id") or
|
|
"unknown"
|
|
)
|
|
|
|
print(f" ✅ Extracted text: length={len(str(text))}, voice={voice_id}", file=sys.stderr, flush=True)
|
|
|
|
self._set_prompt_attributes(span, [
|
|
{"role": "system", "content": f"Convert to speech with voice: {voice_id}"},
|
|
{"role": "user", "content": str(text) if text else "text_to_speech"}
|
|
])
|
|
self._set_completion_attributes(span, [{"role": "assistant", "content": f"Generated audio for: {text}"}])
|
|
|
|
# Agent/Chain/Job spans: aggregate conversation
|
|
elif (
|
|
"agent" in span_name
|
|
or "session" in span_name
|
|
or "conversation" in span_name
|
|
or "job" in span_name
|
|
):
|
|
span._attributes["langsmith.span.kind"] = "chain"
|
|
is_job_span = "job" in span_name
|
|
|
|
# Try to extract conversation ID
|
|
conversation_id = (
|
|
span.attributes.get("conversation.id") or
|
|
span.attributes.get("conversation_id") or
|
|
span.attributes.get("session_id") or
|
|
(span.attributes.get("lk.job_id") if is_job_span else "") or
|
|
""
|
|
)
|
|
if conversation_id:
|
|
self.trace_to_conversation_id[trace_id] = str(conversation_id)
|
|
span._attributes["conversation.id"] = str(conversation_id)
|
|
span._attributes["langsmith.root_span"] = True
|
|
elif is_job_span:
|
|
# Ensure the root job span is treated as the LangSmith conversation root
|
|
span._attributes["conversation.id"] = trace_id
|
|
span._attributes["langsmith.root_span"] = True
|
|
|
|
# Aggregate messages from conversation
|
|
conv_msgs = self.conversation_messages.get(trace_id, [])
|
|
if conv_msgs:
|
|
system_msg, first_user_msg, remaining_msgs = self._split_conversation_messages(conv_msgs)
|
|
|
|
# Add input (first user message only, exclude system message)
|
|
# System message is only shown in LLM call spans, not in job entrypoint
|
|
prompt_msgs = []
|
|
if first_user_msg:
|
|
prompt_msgs.append(first_user_msg)
|
|
if prompt_msgs:
|
|
self._set_prompt_attributes(span, prompt_msgs)
|
|
|
|
# Add output (remaining conversation)
|
|
if remaining_msgs:
|
|
self._set_completion_attributes(span, remaining_msgs)
|
|
self._release_job_span_if_waiting(trace_id, prompt_msgs, remaining_msgs)
|
|
elif is_job_span:
|
|
# Defer export until conversation data becomes available
|
|
self._defer_job_span(trace_id, span)
|
|
return
|
|
|
|
# Cleanup
|
|
should_cleanup_trace = is_job_span or span.parent is None
|
|
if should_cleanup_trace:
|
|
if trace_id in self.conversation_messages:
|
|
del self.conversation_messages[trace_id]
|
|
if trace_id in self.trace_to_conversation_id:
|
|
del self.trace_to_conversation_id[trace_id]
|
|
|
|
# Default: mark as chain if no specific type detected
|
|
else:
|
|
# Check if it has LLM-like attributes
|
|
if span.attributes.get("input") or span.attributes.get("output"):
|
|
span._attributes["langsmith.span.kind"] = "llm"
|
|
input_val = span.attributes.get("input", "")
|
|
output_val = span.attributes.get("output", "")
|
|
if input_val:
|
|
self._set_prompt_attributes(span, [{"role": "user", "content": str(input_val)}])
|
|
if output_val:
|
|
self._set_completion_attributes(span, [{"role": "assistant", "content": str(output_val)}])
|
|
else:
|
|
span._attributes["langsmith.span.kind"] = "chain"
|
|
|
|
# Export span downstream (unless it was deferred earlier)
|
|
self._export_span(span)
|
|
|
|
def _set_prompt_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0, log: bool = False):
|
|
"""Set gen_ai.prompt.* attributes from a list of messages."""
|
|
import sys
|
|
for i, msg in enumerate(messages):
|
|
idx = start_idx + i
|
|
if isinstance(msg, dict):
|
|
role = msg.get("role", "user")
|
|
content = str(msg.get("content", ""))
|
|
span._attributes[f"gen_ai.prompt.{idx}.role"] = role
|
|
span._attributes[f"gen_ai.prompt.{idx}.content"] = content
|
|
if log:
|
|
content_preview = content[:100] + "..." if len(content) > 100 else content
|
|
print(f" Set gen_ai.prompt.{idx}.role = '{role}', gen_ai.prompt.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
|
else:
|
|
# Handle string messages
|
|
content = str(msg)
|
|
span._attributes[f"gen_ai.prompt.{idx}.role"] = "user"
|
|
span._attributes[f"gen_ai.prompt.{idx}.content"] = content
|
|
if log:
|
|
content_preview = content[:100] + "..." if len(content) > 100 else content
|
|
print(f" Set gen_ai.prompt.{idx}.role = 'user', gen_ai.prompt.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
|
|
|
def _set_completion_attributes(self, span: ReadableSpan, messages: list, start_idx: int = 0, log: bool = False):
|
|
"""Set gen_ai.completion.* attributes from a list of messages."""
|
|
import sys
|
|
for i, msg in enumerate(messages):
|
|
idx = start_idx + i
|
|
if isinstance(msg, dict):
|
|
role = msg.get("role", "assistant")
|
|
content = str(msg.get("content", ""))
|
|
span._attributes[f"gen_ai.completion.{idx}.role"] = role
|
|
span._attributes[f"gen_ai.completion.{idx}.content"] = content
|
|
if log:
|
|
content_preview = content[:200] + "..." if len(content) > 200 else content
|
|
print(f" Set gen_ai.completion.{idx}.role = '{role}', gen_ai.completion.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
|
else:
|
|
# Handle string messages
|
|
content = str(msg)
|
|
span._attributes[f"gen_ai.completion.{idx}.role"] = "assistant"
|
|
span._attributes[f"gen_ai.completion.{idx}.content"] = content
|
|
if log:
|
|
content_preview = content[:200] + "..." if len(content) > 200 else content
|
|
print(f" Set gen_ai.completion.{idx}.role = 'assistant', gen_ai.completion.{idx}.content = '{content_preview}' (length: {len(content)})", file=sys.stderr, flush=True)
|
|
|
|
def _fallback_messages(self, span: ReadableSpan, span_name: str) -> list:
|
|
"""Use system/user attributes or span name when no chat context is available."""
|
|
system_prompt = span.attributes.get("gen_ai.system") or span.attributes.get("system") or ""
|
|
user_prompt = (
|
|
span.attributes.get("gen_ai.user")
|
|
or span.attributes.get("user")
|
|
or span.attributes.get("input")
|
|
or ""
|
|
)
|
|
fallback = []
|
|
if system_prompt:
|
|
fallback.append({"role": "system", "content": str(system_prompt)})
|
|
if user_prompt:
|
|
fallback.append({"role": "user", "content": str(user_prompt)})
|
|
if not fallback:
|
|
fallback.append({"role": "user", "content": f"LLM request: {span_name}"})
|
|
return fallback
|
|
|
|
def _split_conversation_messages(self, messages: list) -> tuple:
|
|
"""
|
|
Split conversation messages into system, first user, and remaining messages.
|
|
Returns: (system_msg, first_user_msg, remaining_msgs)
|
|
"""
|
|
system_msg = None
|
|
first_user_msg = None
|
|
remaining_msgs = []
|
|
first_user_found = False
|
|
|
|
for msg in messages:
|
|
role = msg.get("role", "") if isinstance(msg, dict) else "user"
|
|
if role == "system" and system_msg is None:
|
|
system_msg = msg
|
|
elif role == "user" and not first_user_found:
|
|
first_user_msg = msg
|
|
first_user_found = True
|
|
elif first_user_found:
|
|
remaining_msgs.append(msg)
|
|
|
|
return (system_msg, first_user_msg, remaining_msgs)
|
|
|
|
def _extract_llm_messages(self, span: ReadableSpan) -> list:
|
|
"""
|
|
Extract LLM input messages from span attributes using multiple strategies.
|
|
Returns a list of message dicts with 'role' and 'content' keys.
|
|
"""
|
|
import sys
|
|
print(f" 🔍 Strategy 1: Checking lk.chat_ctx...", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 1: LiveKit-specific attribute: lk.chat_ctx
|
|
chat_ctx = span.attributes.get("lk.chat_ctx")
|
|
if chat_ctx:
|
|
print(f" ✓ Found lk.chat_ctx, type={type(chat_ctx)}, length={len(str(chat_ctx)) if isinstance(chat_ctx, str) else 'N/A'}", file=sys.stderr, flush=True)
|
|
try:
|
|
if isinstance(chat_ctx, str):
|
|
ctx_data = json.loads(chat_ctx)
|
|
else:
|
|
ctx_data = chat_ctx
|
|
|
|
# Extract messages from items array
|
|
if isinstance(ctx_data, dict) and "items" in ctx_data:
|
|
messages = []
|
|
for item in ctx_data["items"]:
|
|
if isinstance(item, dict) and item.get("type") == "message":
|
|
role = item.get("role", "user")
|
|
content = item.get("content", "")
|
|
# Content might be a list of strings or a single string
|
|
if isinstance(content, list):
|
|
content = " ".join(str(c) for c in content)
|
|
if content:
|
|
messages.append({"role": str(role), "content": str(content)})
|
|
|
|
if messages:
|
|
print(f" ✅ Strategy 1 SUCCESS: Found {len(messages)} messages from lk.chat_ctx", file=sys.stderr, flush=True)
|
|
return messages
|
|
except (json.JSONDecodeError, TypeError, KeyError, AttributeError) as e:
|
|
print(f" ✗ Strategy 1 FAILED: {type(e).__name__}: {e}", file=sys.stderr, flush=True)
|
|
else:
|
|
print(f" ✗ lk.chat_ctx not found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 2: Check for OpenTelemetry semantic convention attributes
|
|
# gen_ai.request.prompt.* or gen_ai.prompt.*
|
|
print(f" 🔍 Strategy 2: Checking gen_ai.request.prompt.*...", file=sys.stderr, flush=True)
|
|
messages = []
|
|
idx = 0
|
|
while True:
|
|
role_key = f"gen_ai.request.prompt.{idx}.role"
|
|
content_key = f"gen_ai.request.prompt.{idx}.content"
|
|
if role_key in span.attributes or content_key in span.attributes:
|
|
role = span.attributes.get(role_key, "user")
|
|
content = span.attributes.get(content_key, "")
|
|
if content:
|
|
messages.append({"role": str(role), "content": str(content)})
|
|
idx += 1
|
|
else:
|
|
break
|
|
|
|
if messages:
|
|
print(f" ✅ Strategy 2 SUCCESS: Found {len(messages)} messages from gen_ai.request.prompt.*", file=sys.stderr, flush=True)
|
|
return messages
|
|
else:
|
|
print(f" ✗ No gen_ai.request.prompt.* attributes found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 2b: Check for gen_ai.prompt.* (alternative format)
|
|
print(f" 🔍 Strategy 2b: Checking gen_ai.prompt.*...", file=sys.stderr, flush=True)
|
|
idx = 0
|
|
while True:
|
|
role_key = f"gen_ai.prompt.{idx}.role"
|
|
content_key = f"gen_ai.prompt.{idx}.content"
|
|
if role_key in span.attributes or content_key in span.attributes:
|
|
role = span.attributes.get(role_key, "user")
|
|
content = span.attributes.get(content_key, "")
|
|
if content:
|
|
messages.append({"role": str(role), "content": str(content)})
|
|
idx += 1
|
|
else:
|
|
break
|
|
|
|
if messages:
|
|
print(f" ✅ Strategy 2b SUCCESS: Found {len(messages)} messages from gen_ai.prompt.*", file=sys.stderr, flush=True)
|
|
return messages
|
|
else:
|
|
print(f" ✗ No gen_ai.prompt.* attributes found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 3: Check for messages attribute (JSON string or list)
|
|
print(f" 🔍 Strategy 3: Checking messages/llm.messages/input attributes...", file=sys.stderr, flush=True)
|
|
messages_attr = span.attributes.get("messages") or span.attributes.get("llm.messages") or span.attributes.get("input")
|
|
print(f" Checking: messages={bool(span.attributes.get('messages'))}, llm.messages={bool(span.attributes.get('llm.messages'))}, input={bool(span.attributes.get('input'))}", file=sys.stderr, flush=True)
|
|
if messages_attr:
|
|
try:
|
|
if isinstance(messages_attr, str):
|
|
if DEBUG:
|
|
logger.debug(f" Parsing JSON string, length={len(messages_attr)}")
|
|
parsed = json.loads(messages_attr)
|
|
if isinstance(parsed, list):
|
|
# Validate and normalize message format
|
|
normalized = []
|
|
for msg in parsed:
|
|
if isinstance(msg, dict) and "content" in msg:
|
|
normalized.append({
|
|
"role": msg.get("role", "user"),
|
|
"content": str(msg.get("content", ""))
|
|
})
|
|
if normalized:
|
|
print(f" ✅ Strategy 3 SUCCESS: Found {len(normalized)} messages from JSON string", file=sys.stderr, flush=True)
|
|
return normalized
|
|
elif isinstance(messages_attr, list):
|
|
print(f" Found list type, length={len(messages_attr)}", file=sys.stderr, flush=True)
|
|
# Validate and normalize message format
|
|
normalized = []
|
|
for msg in messages_attr:
|
|
if isinstance(msg, dict) and "content" in msg:
|
|
normalized.append({
|
|
"role": msg.get("role", "user"),
|
|
"content": str(msg.get("content", ""))
|
|
})
|
|
if normalized:
|
|
print(f" ✅ Strategy 3 SUCCESS: Found {len(normalized)} messages from list", file=sys.stderr, flush=True)
|
|
return normalized
|
|
except (json.JSONDecodeError, TypeError, AttributeError) as e:
|
|
print(f" ✗ Strategy 3 FAILED: {type(e).__name__}: {e}", file=sys.stderr, flush=True)
|
|
else:
|
|
print(f" ✗ No messages attribute found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 4: Check for individual system/user/assistant attributes
|
|
print(f" 🔍 Strategy 4: Checking individual system/user/assistant attributes...", file=sys.stderr, flush=True)
|
|
system = span.attributes.get("gen_ai.system") or span.attributes.get("system") or span.attributes.get("system_prompt")
|
|
user = span.attributes.get("gen_ai.user") or span.attributes.get("user") or span.attributes.get("user_input")
|
|
assistant = span.attributes.get("gen_ai.assistant") or span.attributes.get("assistant")
|
|
|
|
print(f" system={bool(system)}, user={bool(user)}, assistant={bool(assistant)}", file=sys.stderr, flush=True)
|
|
|
|
if system or user or assistant:
|
|
result = []
|
|
if system:
|
|
result.append({"role": "system", "content": str(system)})
|
|
if user:
|
|
result.append({"role": "user", "content": str(user)})
|
|
if assistant:
|
|
result.append({"role": "assistant", "content": str(assistant)})
|
|
if result:
|
|
print(f" ✅ Strategy 4 SUCCESS: Found {len(result)} messages from individual attributes", file=sys.stderr, flush=True)
|
|
return result
|
|
else:
|
|
print(f" ✗ No individual attributes found", file=sys.stderr, flush=True)
|
|
|
|
print(f" ⚠️ All strategies failed - no messages extracted", file=sys.stderr, flush=True)
|
|
return []
|
|
|
|
def _extract_llm_output(self, span: ReadableSpan) -> str:
|
|
"""
|
|
Extract LLM output/completion from span attributes using multiple strategies.
|
|
Returns the output as a string.
|
|
"""
|
|
import sys
|
|
print(f" 🔍 EXTRACTING LLM OUTPUT:", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 1: LiveKit-specific attribute: lk.response.text
|
|
print(f" Strategy 1: Checking lk.response.text...", file=sys.stderr, flush=True)
|
|
output = span.attributes.get("lk.response.text")
|
|
if output:
|
|
print(f" ✅ Strategy 1 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
|
|
return str(output)
|
|
else:
|
|
print(f" ✗ lk.response.text not found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 2: OpenTelemetry semantic convention
|
|
print(f" Strategy 2: Checking gen_ai.response.text / gen_ai.completion.text...", file=sys.stderr, flush=True)
|
|
output = span.attributes.get("gen_ai.response.text") or span.attributes.get("gen_ai.completion.text")
|
|
if output:
|
|
print(f" ✅ Strategy 2 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
|
|
return str(output)
|
|
else:
|
|
print(f" ✗ gen_ai.response.text and gen_ai.completion.text not found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 3: Common attribute names
|
|
print(f" Strategy 3: Checking common attribute names...", file=sys.stderr, flush=True)
|
|
output = (
|
|
span.attributes.get("gen_ai.response") or
|
|
span.attributes.get("gen_ai.completion") or
|
|
span.attributes.get("output") or
|
|
span.attributes.get("response") or
|
|
span.attributes.get("completion") or
|
|
span.attributes.get("llm.output") or
|
|
span.attributes.get("llm.response") or
|
|
span.attributes.get("text") or
|
|
""
|
|
)
|
|
|
|
if output:
|
|
print(f" ✅ Strategy 3 SUCCESS: Found output, length={len(str(output))}", file=sys.stderr, flush=True)
|
|
return str(output)
|
|
else:
|
|
print(f" ✗ No common output attributes found", file=sys.stderr, flush=True)
|
|
|
|
# Strategy 4: Check for completion.* attributes
|
|
print(f" Strategy 4: Checking gen_ai.completion.* attributes...", file=sys.stderr, flush=True)
|
|
idx = 0
|
|
completion_parts = []
|
|
while True:
|
|
content_key = f"gen_ai.completion.{idx}.content"
|
|
if content_key in span.attributes:
|
|
completion_parts.append(str(span.attributes[content_key]))
|
|
idx += 1
|
|
else:
|
|
break
|
|
|
|
if completion_parts:
|
|
print(f" ✅ Strategy 4 SUCCESS: Found {len(completion_parts)} completion parts", file=sys.stderr, flush=True)
|
|
return "\n".join(completion_parts)
|
|
else:
|
|
print(f" ✗ No gen_ai.completion.* attributes found", file=sys.stderr, flush=True)
|
|
|
|
print(f" ⚠️ All strategies failed - no output extracted", file=sys.stderr, flush=True)
|
|
return ""
|
|
|
|
def _get_messages_from_attributes(self, span: ReadableSpan) -> list:
|
|
"""Extract messages from span attributes as fallback."""
|
|
messages = []
|
|
system = span.attributes.get("gen_ai.system") or span.attributes.get("system")
|
|
user = span.attributes.get("gen_ai.user") or span.attributes.get("user") or span.attributes.get("input")
|
|
|
|
if system:
|
|
messages.append({"role": "system", "content": str(system)})
|
|
if user:
|
|
messages.append({"role": "user", "content": str(user)})
|
|
|
|
return messages
|
|
|
|
def _track_messages(self, target_dict: dict, key: str, messages: list, output_data: str):
|
|
"""
|
|
Track messages in target_dict, avoiding duplicates.
|
|
Preserves deduplication logic: case-insensitive content comparison.
|
|
"""
|
|
if key not in target_dict:
|
|
target_dict[key] = []
|
|
# Add system prompt once at the start
|
|
for msg in messages:
|
|
if isinstance(msg, dict) and msg.get("role") == "system":
|
|
target_dict[key].append(msg)
|
|
break
|
|
|
|
# Add the latest user message if it's new
|
|
last_user_msg = next(
|
|
(msg for msg in reversed(messages) if isinstance(msg, dict) and msg.get("role") == "user"),
|
|
None
|
|
)
|
|
if last_user_msg:
|
|
new_content = str(last_user_msg.get("content", "")).strip().lower()
|
|
existing_contents = [
|
|
str(m.get("content", "")).strip().lower()
|
|
for m in target_dict[key]
|
|
if isinstance(m, dict) and m.get("role") == "user"
|
|
]
|
|
if new_content and new_content not in existing_contents:
|
|
target_dict[key].append(last_user_msg)
|
|
|
|
# Add the assistant response if it's new
|
|
if output_data:
|
|
new_assistant_content = str(output_data).strip().lower()
|
|
existing_assistant_contents = [
|
|
str(m.get("content", "")).strip().lower()
|
|
for m in target_dict[key]
|
|
if isinstance(m, dict) and m.get("role") == "assistant"
|
|
]
|
|
if new_assistant_content not in existing_assistant_contents:
|
|
target_dict[key].append({"role": "assistant", "content": output_data})
|
|
|
|
def shutdown(self) -> None:
|
|
self._flush_deferred_job_spans()
|
|
if self.downstream:
|
|
self.downstream.shutdown()
|
|
|
|
def force_flush(self, timeout_millis: int = 30000) -> bool:
|
|
self._flush_deferred_job_spans()
|
|
if self.downstream:
|
|
return self.downstream.force_flush(timeout_millis)
|
|
return True
|
|
|
|
def _defer_job_span(self, trace_id: str, span: ReadableSpan):
|
|
import sys
|
|
self.deferred_job_spans[trace_id] = span
|
|
print(f"⏸️ Deferring export of job span for trace {trace_id}", file=sys.stderr, flush=True)
|
|
|
|
def _release_job_span_if_waiting(self, trace_id: str, prompt_msgs: list, completion_msgs: list):
|
|
job_span = self.deferred_job_spans.pop(trace_id, None)
|
|
if not job_span:
|
|
return
|
|
import sys
|
|
print(f"🧩 Releasing deferred job span for trace {trace_id}", file=sys.stderr, flush=True)
|
|
if prompt_msgs:
|
|
self._set_prompt_attributes(job_span, deepcopy(prompt_msgs))
|
|
if completion_msgs:
|
|
self._set_completion_attributes(job_span, deepcopy(completion_msgs))
|
|
self._export_span(job_span)
|
|
|
|
def _flush_deferred_job_spans(self):
|
|
if not self.deferred_job_spans:
|
|
return
|
|
import sys
|
|
print(f"⚠️ Flushing {len(self.deferred_job_spans)} deferred job span(s) without conversation data", file=sys.stderr, flush=True)
|
|
for trace_id, span in list(self.deferred_job_spans.items()):
|
|
self._set_prompt_attributes(span, [{"role": "system", "content": "Conversation not captured"}])
|
|
self._set_completion_attributes(span, [{"role": "assistant", "content": "No conversation turns recorded."}])
|
|
self._export_span(span)
|
|
del self.deferred_job_spans[trace_id]
|
|
|
|
def _export_span(self, span: ReadableSpan):
|
|
if self.downstream:
|
|
self.downstream.on_end(span)
|
|
|