Merge pull request #19 from langchain-ai/rlm/debugging

fix: multiple bug fixes and improvements to trace fetching
This commit is contained in:
Lance Martin
2025-12-11 14:35:35 -08:00
committed by GitHub
5 changed files with 439 additions and 66 deletions
+49 -7
View File
@@ -302,6 +302,18 @@ def trace(trace_id, format_type, output_file):
type=click.Choice(["raw", "json", "pretty"]),
help="Output format: raw (compact JSON), json (pretty JSON), pretty (human-readable panels)",
)
@click.option(
"--no-progress",
is_flag=True,
default=False,
help="Disable progress bar display during fetch",
)
@click.option(
"--max-concurrent",
type=int,
default=5,
help="Maximum concurrent thread fetches (default: 5, max recommended: 10)",
)
def threads(
output_dir,
project_uuid,
@@ -310,6 +322,8 @@ def threads(
since,
filename_pattern,
format_type,
no_progress,
max_concurrent,
):
"""Fetch recent threads from LangSmith BY CHRONOLOGICAL TIME.
@@ -430,6 +444,8 @@ def threads(
limit,
last_n_minutes=last_n_minutes,
since=since,
max_workers=max_concurrent,
show_progress=not no_progress,
)
except ValueError as e:
click.echo(f"Error: {e}", err=True)
@@ -478,6 +494,8 @@ def threads(
limit,
last_n_minutes=last_n_minutes,
since=since,
max_workers=max_concurrent,
show_progress=not no_progress,
)
if not threads_data:
@@ -557,6 +575,18 @@ def threads(
default=5,
help="Maximum concurrent trace fetches (default: 5, max recommended: 10)",
)
@click.option(
"--include-metadata",
is_flag=True,
default=False,
help="Include run metadata (status, timing, tokens, costs) in output",
)
@click.option(
"--include-feedback",
is_flag=True,
default=False,
help="Include feedback data in output (requires extra API call)",
)
def traces(
output_dir,
project_uuid,
@@ -568,6 +598,8 @@ def traces(
output_file,
no_progress,
max_concurrent,
include_metadata,
include_feedback,
):
"""Fetch recent traces from LangSmith BY CHRONOLOGICAL TIME.
@@ -689,6 +721,8 @@ def traces(
max_workers=max_concurrent,
show_progress=not no_progress,
return_timing=True,
include_metadata=include_metadata,
include_feedback=include_feedback,
)
except ValueError as e:
click.echo(f"Error: {e}", err=True)
@@ -724,12 +758,18 @@ def traces(
json.dump(trace_data, f, indent=2, default=str)
# Show summary of saved data
messages_count = len(trace_data.get("messages", []))
feedback_count = len(trace_data.get("feedback", []))
status = trace_data.get("metadata", {}).get("status", "unknown")
summary = f"{messages_count} messages, status: {status}"
if feedback_count > 0:
summary += f", {feedback_count} feedback"
# Handle both list (include_metadata=False) and dict (include_metadata=True) cases
if isinstance(trace_data, dict):
messages_count = len(trace_data.get("messages", []))
feedback_count = len(trace_data.get("feedback", []))
status = trace_data.get("metadata", {}).get("status", "unknown")
summary = f"{messages_count} messages, status: {status}"
if feedback_count > 0:
summary += f", {feedback_count} feedback"
else:
# trace_data is a list of messages
messages_count = len(trace_data)
summary = f"{messages_count} messages"
click.echo(f" ✓ Saved {trace_id} to {safe_filename} ({summary})")
@@ -744,7 +784,7 @@ def traces(
format_type = config.get_default_format()
try:
# Fetch traces with metadata and feedback
# Fetch traces
traces_data = fetchers.fetch_recent_traces(
api_key=api_key,
base_url=base_url,
@@ -755,6 +795,8 @@ def traces(
max_workers=max_concurrent,
show_progress=not no_progress,
return_timing=False,
include_metadata=include_metadata,
include_feedback=include_feedback,
)
# For limit=1, output single trace directly
+22 -7
View File
@@ -70,16 +70,24 @@ def set_config_value(key: str, value: str):
Set a configuration value.
Args:
key: Configuration key to set
key: Configuration key to set (will be normalized to hyphen format)
value: Value to set
"""
config = load_config()
config[key] = value
# Normalize key to hyphen format
normalized_key = key.replace("_", "-")
config[normalized_key] = value
# Clean up old underscore format if different from normalized
if key != normalized_key and key in config:
del config[key]
save_config(config)
# If manually setting project_uuid, clear in-memory cache
# If manually setting project-uuid, clear in-memory cache
# to force re-validation on next lookup
if key in ("project-uuid", "project_uuid"):
if normalized_key == "project-uuid":
_project_uuid_cache.clear()
@@ -95,6 +103,13 @@ def _update_project_config(project_name: str, project_uuid: str):
config = load_config()
config["project-name"] = project_name
config["project-uuid"] = project_uuid
# Clean up old underscore format if it exists
if "project_uuid" in config:
del config["project_uuid"]
if "project_name" in config:
del config["project_name"]
save_config(config)
@@ -184,9 +199,9 @@ def get_project_uuid() -> str | None:
# Get current project name from env var
env_project_name = os.environ.get("LANGSMITH_PROJECT")
# Load config values
config_project_uuid = get_config_value("project_uuid")
config_project_name = get_config_value("project_name")
# Load config values (use hyphen format as canonical)
config_project_uuid = get_config_value("project-uuid")
config_project_name = get_config_value("project-name")
# Case 1: No env var set - use config as default
if not env_project_name:
+49 -49
View File
@@ -94,9 +94,11 @@ def fetch_recent_threads(
limit: int = 10,
last_n_minutes: int | None = None,
since: str | None = None,
max_workers: int = 5,
show_progress: bool = True,
) -> list[tuple[str, list[dict[str, Any]]]]:
"""
Fetch recent threads for a project.
Fetch recent threads for a project with concurrent fetching.
Args:
project_uuid: LangSmith project UUID (session_id)
@@ -107,6 +109,8 @@ def fetch_recent_threads(
from the last N minutes. Mutually exclusive with `since`.
since: Optional ISO timestamp string (e.g., "2025-12-09T10:00:00Z").
Only returns threads since this time. Mutually exclusive with `last_n_minutes`.
max_workers: Maximum concurrent thread fetches (default: 5)
show_progress: Whether to show progress bar (default: True)
Returns:
List of tuples (thread_id, messages) for each thread
@@ -164,18 +168,52 @@ def fetch_recent_threads(
if len(thread_info) >= limit:
break
# Fetch messages for each thread
results = []
for thread_id in thread_info.keys():
# Fetch messages for each thread with concurrent fetching and progress bar
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
import sys
def _fetch_thread_safe(thread_id: str) -> tuple[str, list[dict[str, Any]] | None]:
"""Safe wrapper for fetch_thread that returns (thread_id, messages or None)."""
try:
messages = fetch_thread(
thread_id, project_uuid, base_url=base_url, api_key=api_key
)
results.append((thread_id, messages))
return (thread_id, messages)
except Exception as e:
# Log error but continue with other threads
print(f"Warning: Failed to fetch thread {thread_id}: {e}")
continue
print(f"Warning: Failed to fetch thread {thread_id}: {e}", file=sys.stderr)
return (thread_id, None)
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all fetch tasks
future_to_thread = {
executor.submit(_fetch_thread_safe, thread_id): thread_id
for thread_id in thread_info.keys()
}
# Use progress bar if requested
if show_progress:
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]Fetching {task.completed}/{task.total} threads..."),
BarColumn(),
TaskProgressColumn(),
) as progress:
task = progress.add_task("fetch", total=len(future_to_thread))
for future in as_completed(future_to_thread):
thread_id, messages = future.result()
if messages is not None:
results.append((thread_id, messages))
progress.update(task, advance=1)
else:
# No progress bar - just collect results
for future in as_completed(future_to_thread):
thread_id, messages = future.result()
if messages is not None:
results.append((thread_id, messages))
return results
@@ -304,45 +342,7 @@ def _fetch_traces_concurrent(
if include_feedback and _sdk_run_has_feedback(run):
runs_with_feedback.append(trace_id)
# For single trace, use simple sequential fetch (no progress overhead)
if len(runs) == 1:
trace_id = str(runs[0].id)
try:
start = perf_counter()
messages = fetch_trace(trace_id, base_url=base_url, api_key=api_key)
duration = perf_counter() - start
if include_metadata:
trace_data = {
"trace_id": trace_id,
"messages": messages,
"metadata": run_metadata_map[trace_id],
"feedback": [],
}
results.append((trace_id, trace_data))
else:
results.append((trace_id, messages))
timing_info["traces_succeeded"] = 1
timing_info["individual_timings"] = [duration]
except Exception as e:
print(f"Warning: Failed to fetch trace {trace_id}: {e}", file=sys.stderr)
timing_info["traces_failed"] = 1
timing_info["individual_timings"] = []
timing_info["fetch_duration"] = perf_counter() - timing_info["fetch_start"]
# Fetch feedback for single trace if needed
if include_metadata and include_feedback and runs_with_feedback and results:
feedback_map = _fetch_feedback_batch(runs_with_feedback, api_key, max_workers=1)
if results:
trace_id, trace_data = results[0]
if trace_id in feedback_map:
trace_data["feedback"] = feedback_map[trace_id]
return results, timing_info
# Concurrent fetching with progress for multiple traces
# Concurrent fetching with progress (for all traces, including single)
individual_timings = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -429,8 +429,8 @@ def fetch_recent_traces(
max_workers: int = 5,
show_progress: bool = True,
return_timing: bool = False,
include_metadata: bool = True,
include_feedback: bool = True,
include_metadata: bool = False,
include_feedback: bool = False,
) -> list[tuple[str, list[dict[str, Any]] | dict[str, Any]]] | tuple[list[tuple[str, list[dict[str, Any]] | dict[str, Any]]], dict]:
"""Fetch multiple recent traces from LangSmith with concurrent fetching.
+7 -3
View File
@@ -332,9 +332,13 @@ def print_formatted_trace(
syntax = Syntax(json_str, "json", theme="monokai", line_numbers=False)
console.print(syntax)
elif format_type == "pretty":
# Print formatted output directly (already has Rich-like formatting via text)
formatted = format_trace_data(data, "pretty")
console.print(formatted)
# For messages-only (list), use Rich panels
if isinstance(data, list):
print_formatted(data, format_type, None)
# For trace data with metadata (dict), use plain text format
else:
formatted = format_trace_data(data, "pretty")
console.print(formatted)
else:
# Raw format
console.print(format_trace_data(data, "raw"))
+312
View File
@@ -490,3 +490,315 @@ class TestThreadsCommand:
# Check that files were created with custom pattern
assert (output_dir / "thread_001.json").exists()
assert (output_dir / "thread_002.json").exists()
class TestTracesCommand:
"""Tests for traces command."""
@responses.activate
def test_traces_default_no_metadata(
self, sample_trace_response, mock_env_api_key, temp_config_dir, tmp_path
):
"""Test traces command with directory output and default (no metadata)."""
# Mock langsmith import
with patch("langsmith_cli.fetchers.HAS_LANGSMITH", True):
# Mock the /info endpoint (called by Client initialization)
responses.add(
responses.GET,
f"{TEST_BASE_URL}/info",
json={"version": "1.0"},
status=200,
)
# Mock the runs query endpoint (called by Client.list_runs)
responses.add(
responses.POST,
f"{TEST_BASE_URL}/runs/query",
json={
"runs": [
{
"id": "3b0b15fe-1e3a-4aef-afa8-48df15879cfe",
"name": "test_run",
"start_time": "2024-01-01T00:00:00Z",
"run_type": "chain",
"trace_id": "3b0b15fe-1e3a-4aef-afa8-48df15879cfe",
}
]
},
status=200,
)
# Mock the trace fetch endpoint
trace_id = "3b0b15fe-1e3a-4aef-afa8-48df15879cfe"
responses.add(
responses.GET,
f"{TEST_BASE_URL}/runs/{trace_id}",
json=sample_trace_response,
status=200,
)
runner = CliRunner()
output_dir = tmp_path / "traces"
result = runner.invoke(main, ["traces", str(output_dir), "--limit", "1"])
assert result.exit_code == 0
assert "Found 1 trace(s)" in result.output
assert "Successfully saved 1 trace(s)" in result.output
assert "3 messages" in result.output # Should show message count
# Check that file was created and contains list (not dict)
import json
trace_file = output_dir / f"{trace_id}.json"
assert trace_file.exists()
with open(trace_file) as f:
data = json.load(f)
assert isinstance(data, list) # Should be list when no metadata
@responses.activate
def test_traces_with_metadata(
self, sample_trace_response, mock_env_api_key, temp_config_dir, tmp_path
):
"""Test traces command with --include-metadata flag."""
with patch("langsmith_cli.fetchers.HAS_LANGSMITH", True):
# Mock the /info endpoint
responses.add(
responses.GET,
f"{TEST_BASE_URL}/info",
json={"version": "1.0"},
status=200,
)
# Mock the runs query endpoint with metadata fields
trace_id = "3b0b15fe-1e3a-4aef-afa8-48df15879cfe"
responses.add(
responses.POST,
f"{TEST_BASE_URL}/runs/query",
json={
"runs": [
{
"id": trace_id,
"name": "test_run",
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-01T00:01:00Z",
"run_type": "chain",
"trace_id": trace_id,
"status": "success",
}
]
},
status=200,
)
responses.add(
responses.GET,
f"{TEST_BASE_URL}/runs/{trace_id}",
json=sample_trace_response,
status=200,
)
runner = CliRunner()
output_dir = tmp_path / "traces"
result = runner.invoke(
main, ["traces", str(output_dir), "--limit", "1", "--include-metadata"]
)
assert result.exit_code == 0
assert "Found 1 trace(s)" in result.output
assert "3 messages, status:" in result.output # Should show status
# Check that file contains dict with metadata
import json
trace_file = output_dir / f"{trace_id}.json"
assert trace_file.exists()
with open(trace_file) as f:
data = json.load(f)
assert isinstance(data, dict)
assert "messages" in data
assert "metadata" in data
assert "feedback" in data
assert len(data["messages"]) == 3
@responses.activate
def test_traces_custom_limit(
self, sample_trace_response, mock_env_api_key, temp_config_dir, tmp_path
):
"""Test traces command with custom limit."""
with patch("langsmith_cli.fetchers.HAS_LANGSMITH", True):
# Mock the /info endpoint
responses.add(
responses.GET,
f"{TEST_BASE_URL}/info",
json={"version": "1.0"},
status=200,
)
# Mock the runs query endpoint
trace_ids = [
"3b0b15fe-1e3a-4aef-afa8-48df15879cf1",
"3b0b15fe-1e3a-4aef-afa8-48df15879cf2",
"3b0b15fe-1e3a-4aef-afa8-48df15879cf3",
]
responses.add(
responses.POST,
f"{TEST_BASE_URL}/runs/query",
json={
"runs": [
{
"id": tid,
"name": f"test_run_{i}",
"start_time": "2024-01-01T00:00:00Z",
"run_type": "chain",
"trace_id": tid,
}
for i, tid in enumerate(trace_ids, 1)
]
},
status=200,
)
# Mock trace fetch endpoints
for tid in trace_ids:
responses.add(
responses.GET,
f"{TEST_BASE_URL}/runs/{tid}",
json=sample_trace_response,
status=200,
)
runner = CliRunner()
output_dir = tmp_path / "traces"
result = runner.invoke(main, ["traces", str(output_dir), "--limit", "3"])
assert result.exit_code == 0
assert "Found 3 trace(s)" in result.output
assert "Successfully saved 3 trace(s)" in result.output
# Check that all files were created
for tid in trace_ids:
assert (output_dir / f"{tid}.json").exists()
@responses.activate
def test_traces_custom_filename_pattern(
self, sample_trace_response, mock_env_api_key, temp_config_dir, tmp_path
):
"""Test traces command with custom filename pattern."""
with patch("langsmith_cli.fetchers.HAS_LANGSMITH", True):
# Mock the /info endpoint
responses.add(
responses.GET,
f"{TEST_BASE_URL}/info",
json={"version": "1.0"},
status=200,
)
# Mock the runs query endpoint
trace_ids = [
"3b0b15fe-1e3a-4aef-afa8-48df15879cf1",
"3b0b15fe-1e3a-4aef-afa8-48df15879cf2",
]
responses.add(
responses.POST,
f"{TEST_BASE_URL}/runs/query",
json={
"runs": [
{
"id": tid,
"name": f"test_run_{i}",
"start_time": "2024-01-01T00:00:00Z",
"run_type": "chain",
"trace_id": tid,
}
for i, tid in enumerate(trace_ids, 1)
]
},
status=200,
)
# Mock trace fetch endpoints
for tid in trace_ids:
responses.add(
responses.GET,
f"{TEST_BASE_URL}/runs/{tid}",
json=sample_trace_response,
status=200,
)
runner = CliRunner()
output_dir = tmp_path / "traces"
result = runner.invoke(
main,
[
"traces",
str(output_dir),
"--limit",
"2",
"--filename-pattern",
"trace_{index:03d}.json",
],
)
assert result.exit_code == 0
assert "Found 2 trace(s)" in result.output
# Check that files were created with custom pattern
assert (output_dir / "trace_001.json").exists()
assert (output_dir / "trace_002.json").exists()
@responses.activate
def test_traces_with_project_uuid(
self, sample_trace_response, mock_env_api_key, temp_config_dir, tmp_path
):
"""Test traces command with --project-uuid filter."""
with patch("langsmith_cli.fetchers.HAS_LANGSMITH", True):
# Mock the /info endpoint
responses.add(
responses.GET,
f"{TEST_BASE_URL}/info",
json={"version": "1.0"},
status=200,
)
# Mock the runs query endpoint
trace_id = "3b0b15fe-1e3a-4aef-afa8-48df15879cfe"
responses.add(
responses.POST,
f"{TEST_BASE_URL}/runs/query",
json={
"runs": [
{
"id": trace_id,
"name": "test_run",
"start_time": "2024-01-01T00:00:00Z",
"run_type": "chain",
"trace_id": trace_id,
}
]
},
status=200,
)
responses.add(
responses.GET,
f"{TEST_BASE_URL}/runs/{trace_id}",
json=sample_trace_response,
status=200,
)
runner = CliRunner()
output_dir = tmp_path / "traces"
result = runner.invoke(
main,
[
"traces",
str(output_dir),
"--limit",
"1",
"--project-uuid",
TEST_PROJECT_UUID,
],
)
assert result.exit_code == 0
assert "Found 1 trace(s)" in result.output