From 8d429524752b530e55da8a899745fb28600ecabc Mon Sep 17 00:00:00 2001 From: Robert Xu Date: Thu, 2 Oct 2025 23:38:55 -0400 Subject: [PATCH] Migration scripts for LangFuse, tracing is still WIP --- .env.example | 14 + .gitignore | 4 + config.py | 30 ++ migrate.py | 57 +++ providers/langfuse/data/datasets.py | 80 ++++ providers/langfuse/data/prompts.py | 187 ++++++++ providers/langfuse/data/traces.py | 445 ++++++++++++++++++ providers/langfuse/main.py | 23 + providers/langfuse/resources/traces.py | 443 ++++++++++++++++++ providers/langfuse/runbook.ipynb | 596 +++++++++++++++++++++++++ requirements.txt | 10 + utils/langfuse.py | 135 ++++++ utils/langsmith.py | 90 ++++ 13 files changed, 2114 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 config.py create mode 100644 migrate.py create mode 100644 providers/langfuse/data/datasets.py create mode 100644 providers/langfuse/data/prompts.py create mode 100644 providers/langfuse/data/traces.py create mode 100644 providers/langfuse/main.py create mode 100644 providers/langfuse/resources/traces.py create mode 100644 providers/langfuse/runbook.ipynb create mode 100644 requirements.txt create mode 100644 utils/langfuse.py create mode 100644 utils/langsmith.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ad72108 --- /dev/null +++ b/.env.example @@ -0,0 +1,14 @@ +LANGFUSE_SECRET_KEY="" +LANGFUSE_PUBLIC_KEY="" +LANGFUSE_HOST="https://us.cloud.langfuse.com" + +LANGSMITH_API_KEY="" +LANGSMITH_ENDPOINT="https://api.smith.langchain.com" +LANGSMITH_TRACING=true +LANGSMITH_ORGANIZATION_ID="" + +OPENAI_API_KEY="" +ANTHROPIC_API_KEY="" + +INCLUDE_MODEL_IN_PROMPTS=true +NUM_TRACES_TO_REPLAY=0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cbf9ae1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +.env +.venv +__pycache__ diff --git a/config.py b/config.py new file mode 100644 index 0000000..64a3533 --- /dev/null +++ b/config.py @@ -0,0 +1,30 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +LF_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY") +LF_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY") +LF_BASE = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") + +LS_API_KEY = os.getenv("LANGSMITH_API_KEY") +LS_ORG_ID = os.environ["LANGSMITH_ORGANIZATION_ID"] +LS_BASE = os.getenv("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com") + +LF_HEADERS = { + "Content-Type": "application/json", +} + +LS_HEADERS = { + "Content-Type": "application/json", + "X-API-Key": LS_API_KEY, + "X-Organization-Id": LS_ORG_ID, +} + +# Migration settings +INCLUDE_MODEL_IN_PROMPTS = ( + os.getenv("INCLUDE_MODEL_IN_PROMPTS", "true").lower() == "true" +) +NUM_TRACES_TO_REPLAY = ( + int(os.getenv("NUM_TRACES_TO_REPLAY", "0")) +) diff --git a/migrate.py b/migrate.py new file mode 100644 index 0000000..69de698 --- /dev/null +++ b/migrate.py @@ -0,0 +1,57 @@ +from providers.langfuse.main import migrate_langfuse +from config import INCLUDE_MODEL_IN_PROMPTS, NUM_TRACES_TO_REPLAY +from utils.langfuse import lf_get_projects + +## ------------------------------------------------------------ +## Helpers +## ------------------------------------------------------------ +def display_config(name: str, resource_name: str): + print("╭─────────────────────────────────────────────────────────────╮") + print("│ MIGRATION SETTINGS │") + print("╰─────────────────────────────────────────────────────────────╯") + print(f"- Migrate {name} {resource_name}s") + print( + f"- Include models in prompts: {'+ Yes' if INCLUDE_MODEL_IN_PROMPTS else '- No'}" + f"- Number of traces to replay: {NUM_TRACES_TO_REPLAY}" + ) + print("─" * 63) + + +def capture_user_selection(name: str,resource_name: str): + # Get user selection + while True: + choice = input( + f"Migrate the {resource_name} associated with your {name} API key? (y/n): " + ).strip().lower() + + if choice == 'no' or choice == 'n': + print("Migration cancelled.") + return False + elif choice == 'yes' or choice == 'y': + break + else: + print("Invalid input. Please enter 'yes or 'no'") + continue + return True + + +## ------------------------------------------------------------ +## Main Migration Function +## ------------------------------------------------------------ +def migrate(provider: str): + if provider == "langfuse": + display_config("Langfuse", "project") + migrate = capture_user_selection("Langfuse", "project") + projects = lf_get_projects() + if not migrate: + return + if not projects: + print("No project found for the configured API keys.") + return + migrate_langfuse(projects) + else: + print("Invalid provider.\nSupported providers are: 'langfuse'") + + +if __name__ == "__main__": + migrate("langfuse") \ No newline at end of file diff --git a/providers/langfuse/data/datasets.py b/providers/langfuse/data/datasets.py new file mode 100644 index 0000000..8fb0412 --- /dev/null +++ b/providers/langfuse/data/datasets.py @@ -0,0 +1,80 @@ +import traceback +from utils.langfuse import lf_get_project_datasets +from utils.langsmith import ls_create_dataset, ls_upload_examples + + +def _extract_expected_value(record: dict) -> str: + """Extract expected/reference output from a record using common keys. + + Checks both snake_case and camelCase at top-level and one nested dict. + """ + keys = [ + "expected", + "expected_output", + "expectedOutput", + "reference_output", + "referenceOutput", + ] + + # Direct keys + for k in keys: + if k in record and record.get(k) not in (None, ""): + return record.get(k) + + # One-level nested dict + for v in record.values(): + if isinstance(v, dict): + for k in keys: + if k in v and v.get(k) not in (None, ""): + return v.get(k) + + return "" + + +def langfuse_example_conversion(records: list[dict]) -> list[dict]: + """Convert Langfuse dataset records to LangSmith format.""" + examples = [] + for r in records: + # Inputs: prefer dict, fallback to scalar under "input" + inputs = ( + r.get("inputs") + or r.get("input") + or {"input": r.get("prompt", "")} + ) + if not isinstance(inputs, dict): + inputs = {"input": inputs} + + # Move any accidental metadata fields from inputs into example metadata + meta = r.get("metadata") or r.get("meta") or {} + for mk in ("metadata", "meta"): + if mk in inputs: + val = inputs.pop(mk) + if isinstance(val, dict): + meta = {**meta, **val} + + # Expected/reference output + expected = _extract_expected_value(r) + + ex = { + "inputs": inputs, + "outputs": {"reference_output": expected}, + } + if isinstance(meta, dict) and meta: + ex["metadata"] = meta + examples.append(ex) + return examples + + +def migrate_datasets(workspace_id: str, project_id: str): + ds_map = lf_get_project_datasets(project_id) + for ds_name, records in ds_map.items(): + print(f" - migrating dataset: {ds_name}") + try: + ds_id = ls_create_dataset(workspace_id, ds_name) + if records: + examples = langfuse_example_conversion(records) + ls_upload_examples(workspace_id, ds_id, examples) + print(f" • uploaded {len(examples)} examples") + except Exception as e: + print(f" x dataset '{ds_name}' failed: {e}") + traceback.print_exc() \ No newline at end of file diff --git a/providers/langfuse/data/prompts.py b/providers/langfuse/data/prompts.py new file mode 100644 index 0000000..2756bad --- /dev/null +++ b/providers/langfuse/data/prompts.py @@ -0,0 +1,187 @@ +import re +import json +import traceback +from langchain_core.prompts import ChatPromptTemplate +from langchain_openai import ChatOpenAI +from langchain_anthropic import ChatAnthropic +from langchain_core.runnables import RunnableSequence +from langsmith.utils import LangSmithConflictError + +from utils.langfuse import lf_get_project_prompts +from utils.langsmith import ls_push_prompt +from config import INCLUDE_MODEL_IN_PROMPTS + + + +def string_to_chat_template(template: str) -> ChatPromptTemplate: + """Detect 'role: content' lines and build a ChatPromptTemplate; fall back.""" + lines = [l.strip() for l in template.splitlines() if l.strip()] + pairs: list[tuple[str, str]] = [] + for line in lines: + m = re.match(r"^(system|user|assistant|tool):\s*(.*)$", line, re.I) + if m: + pairs.append((m.group(1).lower(), m.group(2))) + if pairs: + return ChatPromptTemplate.from_messages(pairs) + return ChatPromptTemplate.from_template(template) + + +def detect_model_provider(model_name: str) -> str: + """Detect if a model is from OpenAI, Anthropic, or other provider.""" + if not model_name: + return "openai" # default fallback + + model_lower = model_name.lower() + + # Anthropic models + if any(x in model_lower for x in ["claude", "anthropic"]): + return "anthropic" + + # OpenAI models + if any( + x in model_lower + for x in [ + "gpt", + "openai", + "o1", + "text-davinci", + "text-curie", + "text-babbage", + "text-ada", + ] + ): + return "openai" + + # Default to OpenAI for unknown models + return "openai" + + +def get_model_instance(model_name: str, model_params: dict = None): + """Get the appropriate LangChain model instance based on the model name.""" + provider = detect_model_provider(model_name) + # Sanitize params to avoid conflicts/unexpected kwargs + params = dict(model_params or {}) + # Remove values that would duplicate or are unsupported by LangChain clients + params.pop("model", None) + params.pop("supported_languages", None) + + if provider == "anthropic": + # Anthropic models + return ChatAnthropic(model=model_name, **params) + else: + # OpenAI models (default) + # Note: ChatOpenAI uses 'model' parameter, not 'model_name' + return ChatOpenAI(model=model_name, **params) + + +def langfuse_prompt_conversion(lf_prompt: dict) -> dict: + """ + Map Langfuse prompt JSON → dict ready for push_langsmith_prompt(). + """ + # Prefer direct fields from v2 detail; fallback to nested versions if present + pv = lf_prompt.get("latest_version", {}) or lf_prompt.get("current_version", {}) + + # Description + description = ( + lf_prompt.get("description") + or pv.get("description") + or "" + ) + + # Model and params (if provided). In v2, config may live top-level. + config = (lf_prompt.get("config") or pv.get("config") or {}) or {} + model = config.get("model") or pv.get("model") or "" + model_params = config if isinstance(config, dict) else {} + + # Build template string + prompt_template = "" + # v2: top-level prompt can be a list of messages + if isinstance(lf_prompt.get("prompt"), list): + msgs = [f"{m.get('role','user')}: {m.get('content','')}" for m in lf_prompt.get("prompt", [])] + prompt_template = "\n\n".join(msgs) + elif isinstance(lf_prompt.get("prompt"), str): + prompt_template = lf_prompt["prompt"] + elif isinstance(pv.get("messages"), list): + msgs = [f"{m.get('role','user')}: {m.get('content','')}" for m in pv.get("messages", [])] + prompt_template = "\n\n".join(msgs) + elif isinstance(pv.get("prompt"), str): + prompt_template = pv.get("prompt") + else: + # Last resort: dump object for visibility + prompt_template = json.dumps(lf_prompt) + + # Normalize mustache variants to {var} + tpl = prompt_template + tpl = re.sub(r"\{\{\{(\w+)\}\}\}", r"{\1}", tpl) + tpl = re.sub(r"\{\{(\w+)\}\}", r"{\1}", tpl) + + # Detect variables after normalization + var_pat = re.findall(r"\{(\w+)\}", tpl) + + # Name → lowercase, spaces to hyphens, robust to missing keys + raw_name = lf_prompt.get("name") or lf_prompt.get("slug") or f"prompt-{lf_prompt.get('id','')}" + name = str(raw_name).lower().replace(" ", "-") + + out: dict = { + "name": name, + "description": description, + "prompt_template": tpl, + "input_variables": list(sorted(set(var_pat))), + "metadata": { + "langfuse_id": lf_prompt.get("id"), + "langfuse_labels": lf_prompt.get("labels", []), + "langfuse_tags": lf_prompt.get("tags", []), + "created": lf_prompt.get("createdAt") or lf_prompt.get("created_at") or lf_prompt.get("created"), + "type": lf_prompt.get("type") or pv.get("type"), + "version": lf_prompt.get("version") or pv.get("version"), + "model": model, + "model_params": model_params, + "original_source": "langfuse", + }, + } + return out + + +def prompt_dict_to_obj(prompt_dict: dict, include_model: bool = True) -> object: + chat_prompt = string_to_chat_template(prompt_dict["prompt_template"]) + model_name = prompt_dict["metadata"].get("model") + model_params = prompt_dict["metadata"].get("model_params", {}) + + if model_name and include_model: + try: + model = get_model_instance(model_name, model_params) + obj = RunnableSequence(chat_prompt, model) + provider = detect_model_provider(model_name) + print(f" • using {provider} model: {model_name}") + except Exception as e: + print( + f" ! failed to create model {model_name}, using prompt only: {e}" + ) + obj = chat_prompt + else: + if model_name and not include_model: + print(f" • prompt only (model {model_name} excluded by flag)") + obj = chat_prompt + return obj + + +def migrate_prompts(workspace_id: str, project_id: str): + prompts = lf_get_project_prompts(project_id) + if prompts: + print(f" - migrating {len(prompts)} prompt(s)…") + for lf_p in prompts: + try: + ls_p_dict = langfuse_prompt_conversion(lf_p) + ls_p_obj = prompt_dict_to_obj(ls_p_dict, include_model=INCLUDE_MODEL_IN_PROMPTS) + url = ls_push_prompt(ls_p_dict["name"], ls_p_dict["description"], ls_p_obj, workspace_id) + pname_disp = lf_p.get("name") or lf_p.get("slug") or lf_p.get("id") + print(f" • {pname_disp} → {url}") + except Exception as e: + pname_disp = lf_p.get("name") or lf_p.get("slug") or lf_p.get("id") + if isinstance(e, LangSmithConflictError): + print(f" • prompt '{pname_disp}' already exists, skipping...") + continue + print(f" x prompt '{pname_disp}' failed: {e}") + traceback.print_exc() + else: + print(" (no prompts)") \ No newline at end of file diff --git a/providers/langfuse/data/traces.py b/providers/langfuse/data/traces.py new file mode 100644 index 0000000..40699f7 --- /dev/null +++ b/providers/langfuse/data/traces.py @@ -0,0 +1,445 @@ +import os +import sys +import uuid +import datetime as dt +import time +import json +from types import SimpleNamespace +# Corrected location for MapValue: +from langfuse.api.resources.commons.types import MapValue +# Ingestion types: +from langfuse.api.resources.ingestion.types import ( + TraceBody, + CreateSpanBody, + CreateGenerationBody, + CreateEventBody, + ScoreBody, + IngestionEvent_TraceCreate, + IngestionEvent_SpanCreate, + IngestionEvent_GenerationCreate, + IngestionEvent_EventCreate, + IngestionEvent_ScoreCreate, + IngestionUsage, +) +# Other common types: +from langfuse.api.resources.commons.types import ObservationLevel, ScoreSource, Usage +from langfuse.api.resources.commons.types.score import Score_Numeric, Score_Categorical, Score_Boolean +from dotenv import load_dotenv +from config import NUM_TRACES_TO_REPLAY +from utils.langsmith import ls_upload_runs +from utils.langfuse import lf_get + +load_dotenv() + +# --- Helper Function for Robust Datetime Formatting --- +def safe_isoformat(dt_obj): + """Safely formats datetime object to ISO 8601 string, handling None.""" + if dt_obj is None: + return None + if not isinstance(dt_obj, dt.datetime): + if isinstance(dt_obj, str): # Allow pre-formatted strings + try: + dt.datetime.fromisoformat(dt_obj.replace('Z', '+00:00')) + return dt_obj + except ValueError: + return None + return None + try: + if dt_obj.tzinfo is None: + dt_obj = dt_obj.replace(tzinfo=dt.timezone.utc) + iso_str = dt_obj.isoformat(timespec='milliseconds') + if iso_str.endswith('+00:00'): + iso_str = iso_str[:-6] + 'Z' + return iso_str + except Exception: + return None + + +def map_langfuse_to_langsmith(source_trace): + """ + Maps Langfuse trace data to LangSmith-compatible format. + Returns a list of LangSmith run objects. + """ + langsmith_runs = [] + + # Map trace to LangSmith run + trace_run = { + "id": source_trace.id, + "name": getattr(source_trace, 'name', None) or "Trace", + "run_type": "chain", + "session_id": getattr(source_trace, 'session_id', None), + "session_name": None, + "tags": getattr(source_trace, 'tags', []) or [], + "metadata": source_trace.metadata if isinstance(getattr(source_trace, 'metadata', {}), dict) else {}, + "inputs": getattr(source_trace, 'input', None), + "outputs": getattr(source_trace, 'output', None), + "start_time": safe_isoformat(getattr(source_trace, 'timestamp', None)), + "end_time": None, + "status": "completed", + "error": None, + "invocation_params": {}, + "usage_metadata": {}, + "child_runs": [] + } + + # Process observations to create child runs + observations = getattr(source_trace, 'observations', []) or [] + sorted_observations = sorted(observations, key=lambda o: getattr(o, 'start_time', None) or '') + observation_runs = {} + + for obs in sorted_observations: + run_id = str(uuid.uuid4()) + observation_runs[getattr(obs, 'id', run_id)] = run_id + + # Determine run type based on observation type + run_type = "chain" + if getattr(obs, 'type', '').upper() == "GENERATION": + run_type = "llm" + elif getattr(obs, 'type', '').upper() == "EVENT": + run_type = "tool" + + # Map model information + invocation_params = {} + if getattr(obs, 'model', None): + invocation_params["model"] = obs.model + if isinstance(getattr(obs, 'model_parameters', None), dict): + param_mapping = { + "temperature": "temperature", + "top_p": "top_p", + "max_tokens": "max_tokens", + "frequency_penalty": "frequency_penalty", + "presence_penalty": "presence_penalty", + "seed": "seed", + "stop": "stop_sequences", + "top_k": "top_k" + } + for langfuse_key, langsmith_key in param_mapping.items(): + if langfuse_key in obs.model_parameters: + invocation_params[langsmith_key] = obs.model_parameters[langfuse_key] + + # Map usage information + usage_metadata = {} + usage = getattr(obs, 'usage', None) + if isinstance(usage, dict): + if usage.get('input') is not None: + usage_metadata['input_tokens'] = usage.get('input') + if usage.get('output') is not None: + usage_metadata['output_tokens'] = usage.get('output') + if usage.get('total') is not None: + usage_metadata['total_tokens'] = usage.get('total') + + inputs = getattr(obs, 'input', None) + outputs = getattr(obs, 'output', None) + + if getattr(obs, 'type', '').upper() == "GENERATION" and getattr(obs, 'model', None): + if isinstance(inputs, dict) and "messages" in inputs: + inputs = inputs + elif isinstance(inputs, str): + inputs = {"prompt": inputs} + if isinstance(outputs, dict) and "messages" in outputs: + outputs = outputs + elif isinstance(outputs, str): + outputs = {"completion": outputs} + + run = { + "id": run_id, + "name": getattr(obs, 'name', None) or f"{str(getattr(obs, 'type', 'obs')).lower()}_{getattr(obs, 'id', '')}", + "run_type": run_type, + "parent_run_id": observation_runs.get(getattr(obs, 'parent_observation_id', None)), + "session_id": getattr(source_trace, 'session_id', None), + "tags": [], + "metadata": obs.metadata if isinstance(getattr(obs, 'metadata', {}), dict) else {}, + "inputs": inputs, + "outputs": outputs, + "start_time": safe_isoformat(getattr(obs, 'start_time', None)), + "end_time": safe_isoformat(getattr(obs, 'end_time', None)) if getattr(obs, 'end_time', None) else None, + "status": "completed", + "error": getattr(obs, 'status_message', None) or None, + "invocation_params": invocation_params, + "usage_metadata": usage_metadata + } + + if getattr(obs, 'type', '').upper() == "GENERATION" and getattr(obs, 'model', None): + model_lower = obs.model.lower() + if "openai" in model_lower: + run["metadata"]["ls_provider"] = "openai" + elif "anthropic" in model_lower: + run["metadata"]["ls_provider"] = "anthropic" + elif "google" in model_lower: + run["metadata"]["ls_provider"] = "google" + else: + run["metadata"]["ls_provider"] = "unknown" + + langsmith_runs.append(run) + + # Add scores as feedback + scores = getattr(source_trace, 'scores', []) or [] + for score in scores: + score_obs_id = getattr(score, 'observation_id', None) + feedback = { + "id": str(uuid.uuid4()), + "run_id": observation_runs.get(score_obs_id, source_trace.id), + "key": getattr(score, 'name', None), + "score": getattr(score, 'value', None), + "comment": getattr(score, 'comment', None), + "metadata": score.metadata if isinstance(getattr(score, 'metadata', {}), dict) else {}, + "source": getattr(score, 'source', None), + "timestamp": safe_isoformat(getattr(score, 'timestamp', None)) + } + langsmith_runs.append({"type": "feedback", **feedback}) + + return langsmith_runs + + +def _wrap(obj): + """Recursively wrap dicts into SimpleNamespace for attribute access.""" + if isinstance(obj, dict): + return SimpleNamespace(**{k: _wrap(v) for k, v in obj.items()}) + if isinstance(obj, list): + return [ _wrap(v) for v in obj ] + return obj + + +def transform_trace_to_ingestion_batch(source_trace): + """ + Transforms a fetched TraceWithFullDetails object into a list of + IngestionEvent objects suitable for the batch ingestion endpoint. + Uses the ORIGINAL source trace ID for the new trace. + Generates new IDs for observations/scores within the trace. + Maps parent/child relationships using new observation IDs. + """ + ingestion_events = [] + preserved_trace_id = source_trace.id + obs_id_map = {} + + # 1. Create Trace Event + trace_metadata = source_trace.metadata if isinstance(source_trace.metadata, dict) else {} + trace_body = TraceBody( + id=preserved_trace_id, + timestamp=source_trace.timestamp, + name=source_trace.name, + user_id=source_trace.user_id, + input=source_trace.input, + output=source_trace.output, + session_id=source_trace.session_id, + release=source_trace.release, + version=source_trace.version, + metadata=trace_metadata or None, + tags=source_trace.tags if source_trace.tags is not None else [], + public=source_trace.public, + environment=source_trace.environment if source_trace.environment else "default", + ) + event_timestamp_str = safe_isoformat(dt.datetime.now(dt.timezone.utc)) + if not event_timestamp_str: + print("Error: Could not format timestamp for trace event. Skipping trace.") + return [] + trace_event_id = str(uuid.uuid4()) + ingestion_events.append( + IngestionEvent_TraceCreate(id=trace_event_id, timestamp=event_timestamp_str, body=trace_body) + ) + + # 2. Create Observation Events + sorted_observations = sorted(source_trace.observations, key=lambda o: o.start_time) + for source_obs in sorted_observations: + new_obs_id = str(uuid.uuid4()) + obs_id_map[source_obs.id] = new_obs_id + new_parent_observation_id = obs_id_map.get(source_obs.parent_observation_id) if source_obs.parent_observation_id else None + obs_metadata = source_obs.metadata if isinstance(source_obs.metadata, dict) else {} + + model_params_mapped = None + if isinstance(source_obs.model_parameters, dict): model_params_mapped = source_obs.model_parameters + elif source_obs.model_parameters is not None: print(f"Warning: Obs {source_obs.id} model_parameters type {type(source_obs.model_parameters)}, skipping.") + + common_body_args = { + "id": new_obs_id, "trace_id": preserved_trace_id, "name": source_obs.name, + "start_time": source_obs.start_time, "metadata": obs_metadata or None, + "input": source_obs.input, "output": source_obs.output, "level": source_obs.level, + "status_message": source_obs.status_message, "parent_observation_id": new_parent_observation_id, + "version": source_obs.version, "environment": source_obs.environment if source_obs.environment else "default", + } + + event_body = None; ingestion_event_type = None + event_specific_timestamp = safe_isoformat(dt.datetime.now(dt.timezone.utc)) + if not event_specific_timestamp: print(f"Error: Could not format timestamp for obs {new_obs_id}. Skipping."); continue + + try: + if source_obs.type == "SPAN": + event_body = CreateSpanBody(**common_body_args, end_time=source_obs.end_time) + ingestion_event_type = IngestionEvent_SpanCreate + elif source_obs.type == "EVENT": + event_body = CreateEventBody(**common_body_args) + ingestion_event_type = IngestionEvent_EventCreate + elif source_obs.type == "GENERATION": + usage_to_pass = None + if isinstance(source_obs.usage, Usage): + usage_data = {k: getattr(source_obs.usage, k, None) for k in ['input', 'output', 'total', 'unit', 'input_cost', 'output_cost', 'total_cost']} + filtered_usage_data = {k: v for k, v in usage_data.items() if v is not None} + if filtered_usage_data: usage_to_pass = Usage(**filtered_usage_data) + elif source_obs.usage is not None: print(f"Warning: Obs {source_obs.id} has usage type {type(source_obs.usage)}. Skipping.") + + event_body = CreateGenerationBody( + **common_body_args, end_time=source_obs.end_time, + completion_start_time=source_obs.completion_start_time, + model=source_obs.model, model_parameters=model_params_mapped, + usage=usage_to_pass, cost_details=source_obs.cost_details, + usage_details=source_obs.usage_details, + prompt_name=getattr(source_obs, 'prompt_name', None), + prompt_version=getattr(source_obs, 'prompt_version', None), + ) + ingestion_event_type = IngestionEvent_GenerationCreate + else: print(f"Warning: Unknown obs type '{source_obs.type}' for ID {source_obs.id}. Skipping."); continue + + if event_body and ingestion_event_type: + event_envelope_id = str(uuid.uuid4()) + ingestion_events.append( + ingestion_event_type(id=event_envelope_id, timestamp=event_specific_timestamp, body=event_body) + ) + except Exception as e: print(f"Error creating obs body for {source_obs.id} (type: {source_obs.type}): {e}"); continue + + # 3. Create Score Events + for source_score in source_trace.scores: + new_score_id = str(uuid.uuid4()) + new_observation_id = obs_id_map.get(source_score.observation_id) if source_score.observation_id else None + score_metadata = source_score.metadata if isinstance(source_score.metadata, dict) else {} + + score_body_value = None + if source_score.data_type == "CATEGORICAL": + # For categorical, use the string_value field from the source + if hasattr(source_score, 'string_value') and isinstance(getattr(source_score, 'string_value', None), str): + score_body_value = source_score.string_value + else: + # Fallback or warning if string_value is missing for categorical + print(f" Warning: Categorical score {source_score.id} is missing string_value. Attempting to use numeric value '{source_score.value}' as string.") + score_body_value = str(source_score.value) if source_score.value is not None else None + + elif source_score.data_type in ["NUMERIC", "BOOLEAN"]: + # For numeric/boolean, use the numeric value field + score_body_value = source_score.value # Already float or None + else: + print(f" Warning: Unknown score dataType '{source_score.data_type}' for score {source_score.id}. Attempting numeric value.") + score_body_value = source_score.value + + # If after all checks, value is still None, skip score + if score_body_value is None: + print(f" Warning: Could not determine valid value for score {source_score.id} (dataType: {source_score.data_type}). Skipping score.") + continue + + try: + score_body = ScoreBody( + id=new_score_id, + trace_id=preserved_trace_id, + name=source_score.name, + # Pass the correctly typed value + value=score_body_value, + # string_value field might not be needed if value holds the category string + # string_value=string_value if source_score.data_type == "CATEGORICAL" else None, # Optional: maybe pass string_value only for categorical? + source=source_score.source, + comment=source_score.comment, + observation_id=new_observation_id, + timestamp=source_score.timestamp, + config_id=source_score.config_id, + metadata=score_metadata or None, + data_type=source_score.data_type, + environment=source_score.environment if source_score.environment else "default", + ) + event_timestamp_str = safe_isoformat(dt.datetime.now(dt.timezone.utc)) + if not event_timestamp_str: print(f"Error: Could not format timestamp for score {new_score_id}. Skipping."); continue + event_envelope_id = str(uuid.uuid4()) + ingestion_events.append( + IngestionEvent_ScoreCreate(id=event_envelope_id, timestamp=event_timestamp_str, body=score_body) + ) + except Exception as e: print(f"Error creating score body for {source_score.id}: {e}"); continue + + return ingestion_events + + +def fetch_and_transform_traces(workspace_id: str, sleep_between_gets=0.7, max_retries=4): + """ + Fetch most recent traces using Public API and transform them into ingestion events. + Enforces NUM_TRACES_TO_REPLAY as a hard cap. + """ + try: + mt = int(NUM_TRACES_TO_REPLAY) + max_traces = mt if mt > 0 else None + except Exception: + max_traces = None + + page = 1 + limit = 50 + total_processed = 0 + total_failed_fetch = 0 + total_failed_transform = 0 + + accumulated_runs: list[dict] = [] + while True: + try: + listing = lf_get("/api/public/traces", page=page, limit=limit) + except Exception as e: + print(f"Error fetching trace list page {page}: {e}") + break + + objs = ( + listing.get("objects") if isinstance(listing, dict) and "objects" in listing + else listing.get("data") if isinstance(listing, dict) else listing + ) or [] + if not objs: + break + + for item in objs: + if max_traces is not None and total_processed >= max_traces: + break + source_trace_id = item.get("id") + if not source_trace_id: + continue + + # Fetch full trace details with retry (Public API) + source_detail = None + fetch_detail_success = False + detail_retries = 0 + while not fetch_detail_success and detail_retries < max_retries: + time.sleep(sleep_between_gets * (2 ** detail_retries)) + try: + source_detail = lf_get(f"/api/public/traces/{source_trace_id}") + fetch_detail_success = True + except Exception: + detail_retries += 1 + if detail_retries >= max_retries: + total_failed_fetch += 1 + break + + if not fetch_detail_success or not isinstance(source_detail, dict): + continue + + try: + source_obj = _wrap(source_detail) + runs_batch = map_langfuse_to_langsmith(source_obj) + if not runs_batch: + total_failed_transform += 1 + continue + # Append to runs accumulator + accumulated_runs.extend(runs_batch) + total_processed += 1 + if max_traces is not None and total_processed >= max_traces: + break + except Exception: + total_failed_transform += 1 + continue + + if max_traces is not None and total_processed >= max_traces: + break + page += 1 + + # Upload accumulated runs to LangSmith workspace + if accumulated_runs: + try: + ls_upload_runs(workspace_id, accumulated_runs) + except Exception as e: + print(f"Error uploading runs to LangSmith: {e}") + + print(f" • Processed traces: {total_processed}") + print(f" • Failed fetching details (after retries): {total_failed_fetch}") + print(f" • Failed transforming data (incl. skipping): {total_failed_transform}") + +def migrate_traces(workspace_id: str, project_id: str): + print(f" - migrating recent traces…") + fetch_and_transform_traces(workspace_id) \ No newline at end of file diff --git a/providers/langfuse/main.py b/providers/langfuse/main.py new file mode 100644 index 0000000..e71b989 --- /dev/null +++ b/providers/langfuse/main.py @@ -0,0 +1,23 @@ +from utils.langsmith import ls_get_or_create_workspace + +from providers.langfuse.data.prompts import migrate_prompts +from providers.langfuse.data.datasets import migrate_datasets +from providers.langfuse.data.traces import migrate_traces + + +def migrate_langfuse(projects: list[dict]): + # Migrate selected projects + for proj in projects: + pname = proj.get("name") or proj.get("display_name") or proj.get("slug") or str(proj.get("id")) + pid = proj.get("id") or proj.get("project_id") or proj.get("uuid") or "" + print(f"\n- Project: {pname}") + + ws = ls_get_or_create_workspace(pname) + ws_id = ws["id"] + print(f" + workspace id: {ws_id}") + + migrate_prompts(ws_id, pid) + migrate_datasets(ws_id, pid) + migrate_traces(ws_id, pid) + + print("\n+ Migration complete.") \ No newline at end of file diff --git a/providers/langfuse/resources/traces.py b/providers/langfuse/resources/traces.py new file mode 100644 index 0000000..43d8189 --- /dev/null +++ b/providers/langfuse/resources/traces.py @@ -0,0 +1,443 @@ +import os +import sys +import uuid +import datetime as dt +import time +import json +from types import SimpleNamespace +# Corrected location for MapValue: +from langfuse.api.resources.commons.types import MapValue +# Ingestion types: +from langfuse.api.resources.ingestion.types import ( + TraceBody, + CreateSpanBody, + CreateGenerationBody, + CreateEventBody, + ScoreBody, + IngestionEvent_TraceCreate, + IngestionEvent_SpanCreate, + IngestionEvent_GenerationCreate, + IngestionEvent_EventCreate, + IngestionEvent_ScoreCreate, + IngestionUsage, +) +# Other common types: +from langfuse.api.resources.commons.types import ObservationLevel, ScoreSource, Usage +from langfuse.api.resources.commons.types.score import Score_Numeric, Score_Categorical, Score_Boolean +from dotenv import load_dotenv +from config import NUM_TRACES_TO_REPLAY +from utils.langsmith import ls_upload_runs +from utils.langfuse import lf_get + +load_dotenv() + +# --- Helper Function for Robust Datetime Formatting --- +def safe_isoformat(dt_obj): + """Safely formats datetime object to ISO 8601 string, handling None.""" + if dt_obj is None: + return None + if not isinstance(dt_obj, dt.datetime): + if isinstance(dt_obj, str): # Allow pre-formatted strings + try: + dt.datetime.fromisoformat(dt_obj.replace('Z', '+00:00')) + return dt_obj + except ValueError: + return None + return None + try: + if dt_obj.tzinfo is None: + dt_obj = dt_obj.replace(tzinfo=dt.timezone.utc) + iso_str = dt_obj.isoformat(timespec='milliseconds') + if iso_str.endswith('+00:00'): + iso_str = iso_str[:-6] + 'Z' + return iso_str + except Exception: + return None + + +def map_langfuse_to_langsmith(source_trace): + """ + Maps Langfuse trace data to LangSmith-compatible format. + Returns a list of LangSmith run objects. + """ + langsmith_runs = [] + + # Map trace to LangSmith run + trace_run = { + "id": source_trace.id, + "name": getattr(source_trace, 'name', None) or "Trace", + "run_type": "chain", + "session_id": getattr(source_trace, 'session_id', None), + "session_name": None, + "tags": getattr(source_trace, 'tags', []) or [], + "metadata": source_trace.metadata if isinstance(getattr(source_trace, 'metadata', {}), dict) else {}, + "inputs": getattr(source_trace, 'input', None), + "outputs": getattr(source_trace, 'output', None), + "start_time": safe_isoformat(getattr(source_trace, 'timestamp', None)), + "end_time": None, + "status": "completed", + "error": None, + "invocation_params": {}, + "usage_metadata": {}, + "child_runs": [] + } + + # Process observations to create child runs + observations = getattr(source_trace, 'observations', []) or [] + sorted_observations = sorted(observations, key=lambda o: getattr(o, 'start_time', None) or '') + observation_runs = {} + + for obs in sorted_observations: + run_id = str(uuid.uuid4()) + observation_runs[getattr(obs, 'id', run_id)] = run_id + + # Determine run type based on observation type + run_type = "chain" + if getattr(obs, 'type', '').upper() == "GENERATION": + run_type = "llm" + elif getattr(obs, 'type', '').upper() == "EVENT": + run_type = "tool" + + # Map model information + invocation_params = {} + if getattr(obs, 'model', None): + invocation_params["model"] = obs.model + if isinstance(getattr(obs, 'model_parameters', None), dict): + param_mapping = { + "temperature": "temperature", + "top_p": "top_p", + "max_tokens": "max_tokens", + "frequency_penalty": "frequency_penalty", + "presence_penalty": "presence_penalty", + "seed": "seed", + "stop": "stop_sequences", + "top_k": "top_k" + } + for langfuse_key, langsmith_key in param_mapping.items(): + if langfuse_key in obs.model_parameters: + invocation_params[langsmith_key] = obs.model_parameters[langfuse_key] + + # Map usage information + usage_metadata = {} + usage = getattr(obs, 'usage', None) + if isinstance(usage, dict): + if usage.get('input') is not None: + usage_metadata['input_tokens'] = usage.get('input') + if usage.get('output') is not None: + usage_metadata['output_tokens'] = usage.get('output') + if usage.get('total') is not None: + usage_metadata['total_tokens'] = usage.get('total') + + inputs = getattr(obs, 'input', None) + outputs = getattr(obs, 'output', None) + + if getattr(obs, 'type', '').upper() == "GENERATION" and getattr(obs, 'model', None): + if isinstance(inputs, dict) and "messages" in inputs: + inputs = inputs + elif isinstance(inputs, str): + inputs = {"prompt": inputs} + if isinstance(outputs, dict) and "messages" in outputs: + outputs = outputs + elif isinstance(outputs, str): + outputs = {"completion": outputs} + + run = { + "id": run_id, + "name": getattr(obs, 'name', None) or f"{str(getattr(obs, 'type', 'obs')).lower()}_{getattr(obs, 'id', '')}", + "run_type": run_type, + "parent_run_id": observation_runs.get(getattr(obs, 'parent_observation_id', None)), + "session_id": getattr(source_trace, 'session_id', None), + "tags": [], + "metadata": obs.metadata if isinstance(getattr(obs, 'metadata', {}), dict) else {}, + "inputs": inputs, + "outputs": outputs, + "start_time": safe_isoformat(getattr(obs, 'start_time', None)), + "end_time": safe_isoformat(getattr(obs, 'end_time', None)) if getattr(obs, 'end_time', None) else None, + "status": "completed", + "error": getattr(obs, 'status_message', None) or None, + "invocation_params": invocation_params, + "usage_metadata": usage_metadata + } + + if getattr(obs, 'type', '').upper() == "GENERATION" and getattr(obs, 'model', None): + model_lower = obs.model.lower() + if "openai" in model_lower: + run["metadata"]["ls_provider"] = "openai" + elif "anthropic" in model_lower: + run["metadata"]["ls_provider"] = "anthropic" + elif "google" in model_lower: + run["metadata"]["ls_provider"] = "google" + else: + run["metadata"]["ls_provider"] = "unknown" + + langsmith_runs.append(run) + + # Add scores as feedback + scores = getattr(source_trace, 'scores', []) or [] + for score in scores: + score_obs_id = getattr(score, 'observation_id', None) + feedback = { + "id": str(uuid.uuid4()), + "run_id": observation_runs.get(score_obs_id, source_trace.id), + "key": getattr(score, 'name', None), + "score": getattr(score, 'value', None), + "comment": getattr(score, 'comment', None), + "metadata": score.metadata if isinstance(getattr(score, 'metadata', {}), dict) else {}, + "source": getattr(score, 'source', None), + "timestamp": safe_isoformat(getattr(score, 'timestamp', None)) + } + langsmith_runs.append({"type": "feedback", **feedback}) + + return langsmith_runs + + +def _wrap(obj): + """Recursively wrap dicts into SimpleNamespace for attribute access.""" + if isinstance(obj, dict): + return SimpleNamespace(**{k: _wrap(v) for k, v in obj.items()}) + if isinstance(obj, list): + return [ _wrap(v) for v in obj ] + return obj + + +def transform_trace_to_ingestion_batch(source_trace): + """ + Transforms a fetched TraceWithFullDetails object into a list of + IngestionEvent objects suitable for the batch ingestion endpoint. + Uses the ORIGINAL source trace ID for the new trace. + Generates new IDs for observations/scores within the trace. + Maps parent/child relationships using new observation IDs. + """ + ingestion_events = [] + preserved_trace_id = source_trace.id + obs_id_map = {} + + # 1. Create Trace Event + trace_metadata = source_trace.metadata if isinstance(source_trace.metadata, dict) else {} + trace_body = TraceBody( + id=preserved_trace_id, + timestamp=source_trace.timestamp, + name=source_trace.name, + user_id=source_trace.user_id, + input=source_trace.input, + output=source_trace.output, + session_id=source_trace.session_id, + release=source_trace.release, + version=source_trace.version, + metadata=trace_metadata or None, + tags=source_trace.tags if source_trace.tags is not None else [], + public=source_trace.public, + environment=source_trace.environment if source_trace.environment else "default", + ) + event_timestamp_str = safe_isoformat(dt.datetime.now(dt.timezone.utc)) + if not event_timestamp_str: + print("Error: Could not format timestamp for trace event. Skipping trace.") + return [] + trace_event_id = str(uuid.uuid4()) + ingestion_events.append( + IngestionEvent_TraceCreate(id=trace_event_id, timestamp=event_timestamp_str, body=trace_body) + ) + + # 2. Create Observation Events + sorted_observations = sorted(source_trace.observations, key=lambda o: o.start_time) + for source_obs in sorted_observations: + new_obs_id = str(uuid.uuid4()) + obs_id_map[source_obs.id] = new_obs_id + new_parent_observation_id = obs_id_map.get(source_obs.parent_observation_id) if source_obs.parent_observation_id else None + obs_metadata = source_obs.metadata if isinstance(source_obs.metadata, dict) else {} + + model_params_mapped = None + if isinstance(source_obs.model_parameters, dict): model_params_mapped = source_obs.model_parameters + elif source_obs.model_parameters is not None: print(f"Warning: Obs {source_obs.id} model_parameters type {type(source_obs.model_parameters)}, skipping.") + + common_body_args = { + "id": new_obs_id, "trace_id": preserved_trace_id, "name": source_obs.name, + "start_time": source_obs.start_time, "metadata": obs_metadata or None, + "input": source_obs.input, "output": source_obs.output, "level": source_obs.level, + "status_message": source_obs.status_message, "parent_observation_id": new_parent_observation_id, + "version": source_obs.version, "environment": source_obs.environment if source_obs.environment else "default", + } + + event_body = None; ingestion_event_type = None + event_specific_timestamp = safe_isoformat(dt.datetime.now(dt.timezone.utc)) + if not event_specific_timestamp: print(f"Error: Could not format timestamp for obs {new_obs_id}. Skipping."); continue + + try: + if source_obs.type == "SPAN": + event_body = CreateSpanBody(**common_body_args, end_time=source_obs.end_time) + ingestion_event_type = IngestionEvent_SpanCreate + elif source_obs.type == "EVENT": + event_body = CreateEventBody(**common_body_args) + ingestion_event_type = IngestionEvent_EventCreate + elif source_obs.type == "GENERATION": + usage_to_pass = None + if isinstance(source_obs.usage, Usage): + usage_data = {k: getattr(source_obs.usage, k, None) for k in ['input', 'output', 'total', 'unit', 'input_cost', 'output_cost', 'total_cost']} + filtered_usage_data = {k: v for k, v in usage_data.items() if v is not None} + if filtered_usage_data: usage_to_pass = Usage(**filtered_usage_data) + elif source_obs.usage is not None: print(f"Warning: Obs {source_obs.id} has usage type {type(source_obs.usage)}. Skipping.") + + event_body = CreateGenerationBody( + **common_body_args, end_time=source_obs.end_time, + completion_start_time=source_obs.completion_start_time, + model=source_obs.model, model_parameters=model_params_mapped, + usage=usage_to_pass, cost_details=source_obs.cost_details, + usage_details=source_obs.usage_details, + prompt_name=getattr(source_obs, 'prompt_name', None), + prompt_version=getattr(source_obs, 'prompt_version', None), + ) + ingestion_event_type = IngestionEvent_GenerationCreate + else: print(f"Warning: Unknown obs type '{source_obs.type}' for ID {source_obs.id}. Skipping."); continue + + if event_body and ingestion_event_type: + event_envelope_id = str(uuid.uuid4()) + ingestion_events.append( + ingestion_event_type(id=event_envelope_id, timestamp=event_specific_timestamp, body=event_body) + ) + except Exception as e: print(f"Error creating obs body for {source_obs.id} (type: {source_obs.type}): {e}"); continue + + # 3. Create Score Events + for source_score in source_trace.scores: + new_score_id = str(uuid.uuid4()) + new_observation_id = obs_id_map.get(source_score.observation_id) if source_score.observation_id else None + score_metadata = source_score.metadata if isinstance(source_score.metadata, dict) else {} + + score_body_value = None + if source_score.data_type == "CATEGORICAL": + # For categorical, use the string_value field from the source + if hasattr(source_score, 'string_value') and isinstance(getattr(source_score, 'string_value', None), str): + score_body_value = source_score.string_value + else: + # Fallback or warning if string_value is missing for categorical + print(f" Warning: Categorical score {source_score.id} is missing string_value. Attempting to use numeric value '{source_score.value}' as string.") + score_body_value = str(source_score.value) if source_score.value is not None else None + + elif source_score.data_type in ["NUMERIC", "BOOLEAN"]: + # For numeric/boolean, use the numeric value field + score_body_value = source_score.value # Already float or None + else: + print(f" Warning: Unknown score dataType '{source_score.data_type}' for score {source_score.id}. Attempting numeric value.") + score_body_value = source_score.value + + # If after all checks, value is still None, skip score + if score_body_value is None: + print(f" Warning: Could not determine valid value for score {source_score.id} (dataType: {source_score.data_type}). Skipping score.") + continue + + try: + score_body = ScoreBody( + id=new_score_id, + trace_id=preserved_trace_id, + name=source_score.name, + # Pass the correctly typed value + value=score_body_value, + # string_value field might not be needed if value holds the category string + # string_value=string_value if source_score.data_type == "CATEGORICAL" else None, # Optional: maybe pass string_value only for categorical? + source=source_score.source, + comment=source_score.comment, + observation_id=new_observation_id, + timestamp=source_score.timestamp, + config_id=source_score.config_id, + metadata=score_metadata or None, + data_type=source_score.data_type, + environment=source_score.environment if source_score.environment else "default", + ) + event_timestamp_str = safe_isoformat(dt.datetime.now(dt.timezone.utc)) + if not event_timestamp_str: print(f"Error: Could not format timestamp for score {new_score_id}. Skipping."); continue + event_envelope_id = str(uuid.uuid4()) + ingestion_events.append( + IngestionEvent_ScoreCreate(id=event_envelope_id, timestamp=event_timestamp_str, body=score_body) + ) + except Exception as e: print(f"Error creating score body for {source_score.id}: {e}"); continue + + return ingestion_events + + +def fetch_and_transform_traces(workspace_id: str, sleep_between_gets=0.7, max_retries=4): + """ + Fetch most recent traces using Public API and transform them into ingestion events. + Enforces NUM_TRACES_TO_REPLAY as a hard cap. + """ + try: + max_traces = int(NUM_TRACES_TO_REPLAY) + except Exception: + max_traces = None + + page = 1 + limit = 50 + total_processed = 0 + total_failed_fetch = 0 + total_failed_transform = 0 + + accumulated_runs: list[dict] = [] + while True: + try: + listing = lf_get("/api/public/traces", page=page, limit=limit) + except Exception as e: + print(f"Error fetching trace list page {page}: {e}") + break + + objs = ( + listing.get("objects") if isinstance(listing, dict) and "objects" in listing + else listing.get("data") if isinstance(listing, dict) else listing + ) or [] + if not objs: + break + + for item in objs: + if max_traces is not None and total_processed >= max_traces: + break + source_trace_id = item.get("id") + if not source_trace_id: + continue + + # Fetch full trace details with retry (Public API) + source_detail = None + fetch_detail_success = False + detail_retries = 0 + while not fetch_detail_success and detail_retries < max_retries: + time.sleep(sleep_between_gets * (2 ** detail_retries)) + try: + source_detail = lf_get(f"/api/public/traces/{source_trace_id}") + fetch_detail_success = True + except Exception: + detail_retries += 1 + if detail_retries >= max_retries: + total_failed_fetch += 1 + break + + if not fetch_detail_success or not isinstance(source_detail, dict): + continue + + try: + source_obj = _wrap(source_detail) + ingestion_batch = transform_trace_to_ingestion_batch(source_obj) + if not ingestion_batch: + total_failed_transform += 1 + continue + # Append to runs accumulator + accumulated_runs.extend(ingestion_batch) + total_processed += 1 + if max_traces is not None and total_processed >= max_traces: + break + except Exception: + total_failed_transform += 1 + continue + + if max_traces is not None and total_processed >= max_traces: + break + page += 1 + + # Upload accumulated runs to LangSmith workspace + if accumulated_runs: + try: + ls_upload_runs(workspace_id, accumulated_runs) + except Exception as e: + print(f"Error uploading runs to LangSmith: {e}") + + print(f" • Processed traces: {total_processed}") + print(f" • Failed fetching details (after retries): {total_failed_fetch}") + print(f" • Failed transforming data (incl. skipping): {total_failed_transform}") + +def migrate_traces(workspace_id: str, project_id: str): + fetch_and_transform_traces(workspace_id) \ No newline at end of file diff --git a/providers/langfuse/runbook.ipynb b/providers/langfuse/runbook.ipynb new file mode 100644 index 0000000..111158e --- /dev/null +++ b/providers/langfuse/runbook.ipynb @@ -0,0 +1,596 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b97cd937", + "metadata": {}, + "source": [ + "# Migrating from Langfuse to LangSmith" + ] + }, + { + "cell_type": "markdown", + "id": "4c191ad3", + "metadata": {}, + "source": [ + "\n", + "## Migrating Resources\n", + "\n", + "Contained in this repo are scripts to migrate your resources from Langfuse to LangSmith.\n", + "\n", + "This includes:\n", + "- Datasets\n", + "- Prompts\n", + "- Recent Traces\n", + "\n", + "To migrate your resources over, refer to ```providers/langfuse/main.py```. Specific scripts for each are provided in the ```providers/langfuse/data``` directory.\n" + ] + }, + { + "cell_type": "markdown", + "id": "b54ef944", + "metadata": {}, + "source": [ + "## Updating Code\n", + "\n", + "In the process of migrating to LangSmith, you will also need to update your instrumentation code as well. \n", + "\n", + "In the following sections we break down some common patterns used in LangFuse, and their equivalent implementation in LangSmith. Not all features are shared, but common constructs are available across both frameworks.\n", + "\n", + "First, let's load in our environment variables.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "60bb23c6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import os\n", + "os.environ[\"LANGSMITH_PROJECT\"] = \"default\"\n", + "\n", + "from dotenv import load_dotenv\n", + "load_dotenv(\"../../.env\", override=True)" + ] + }, + { + "cell_type": "markdown", + "id": "5e64cd8c", + "metadata": {}, + "source": [ + "### **Tracing**\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "3834eeca", + "metadata": {}, + "source": [ + "#### Observe decorator\n", + "\n", + "If you're using the ```@observe``` decorator, the equivalent in LangSmith is the ```@traceable``` decorator" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "57800622", + "metadata": {}, + "outputs": [], + "source": [ + "from langfuse import observe, get_client\n", + " \n", + "@observe\n", + "def my_function():\n", + " return \"Hello, world!\" # Input/output and timings are automatically captured\n", + " \n", + "my_function()\n", + " \n", + "# Flush events in short-lived applications\n", + "langfuse = get_client()\n", + "langfuse.flush()" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "7c162af1", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'Hello, world!'" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from langsmith import traceable\n", + "\n", + "@traceable\n", + "def my_function():\n", + " return \"Hello, world!\" # Input/output and timings are automatically captured\n", + "\n", + "my_function()" + ] + }, + { + "cell_type": "markdown", + "id": "54802e1c", + "metadata": {}, + "source": [ + "#### Context Managers\n", + "\n", + "If you're using context managers in LangFuse, LangSmith has an equivalent ls.trace() context manager." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "49740681", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/var/folders/jj/2fvdkyfj0856p6_6sdvv74rw0000gn/T/ipykernel_39379/1446926794.py:11: DeprecationWarning: start_as_current_generation is deprecated and will be removed in a future version. Use start_as_current_observation(as_type='generation') instead.\n", + " with langfuse.start_as_current_generation(name=\"llm-response\", model=\"gpt-3.5-turbo\") as generation:\n" + ] + } + ], + "source": [ + "from langfuse import get_client\n", + " \n", + "langfuse = get_client()\n", + " \n", + "# Create a span using a context manager\n", + "with langfuse.start_as_current_span(name=\"process-request\") as span:\n", + " # Your processing logic here\n", + " span.update(output=\"Processing complete\")\n", + " \n", + " # Create a nested generation for an LLM call\n", + " with langfuse.start_as_current_generation(name=\"llm-response\", model=\"gpt-3.5-turbo\") as generation:\n", + " # Your LLM call logic here\n", + " generation.update(output=\"Generated response\")\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "0f83a06c", + "metadata": {}, + "outputs": [], + "source": [ + "import langsmith as ls\n", + "\n", + "# Create a trace using the context manager\n", + "with ls.trace(name=\"process-request\") as rt:\n", + " # Your processing logic here\n", + " # Create a nested generation for an LLM call\n", + " with ls.trace(name=\"llm-response\", run_type=\"llm\", metadata={\"model\": \"gpt-3.5-turbo\"}) as generation:\n", + " # Your LLM call logic here\n", + " generation.end(outputs={\"output\": \"Generated response\"})\n", + "\n", + " rt.end(outputs={\"output\": \"Processing complete\"})" + ] + }, + { + "cell_type": "markdown", + "id": "8e8bb9fc", + "metadata": {}, + "source": [ + "#### OpenTelemetry" + ] + }, + { + "cell_type": "markdown", + "id": "5de23d6e", + "metadata": {}, + "source": [ + "If you're using OpenTelemetry, LangSmith supports [OTel tracing natively.](https://docs.langchain.com/langsmith/trace-with-opentelemetry#trace-with-opentelemetry)\n", + "\n", + "You'll likely be switching out the exporter endpoints you had [set with Langfuse](https://langfuse.com/integrations/native/opentelemetry)" + ] + }, + { + "cell_type": "markdown", + "id": "ce21a7df", + "metadata": {}, + "source": [ + "### **Evaluations**" + ] + }, + { + "cell_type": "markdown", + "id": "822c0bb5", + "metadata": {}, + "source": [ + "#### Datasets\n", + "\n", + "In offline evaluations, a dataset is often used to run evaluations over. LangFuse allows you to create a dataset and add examples using the SDK." + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "adc3109e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DatasetItem(id='3d528ef2-d906-46bc-97b9-38628ce64b5c', status=, input={'text': 'What is the capital of Germany?'}, expected_output={'text': 'Berlin'}, metadata=None, source_trace_id=None, source_observation_id=None, dataset_id='cmga8r3ll02fuad06uf23cwmf', dataset_name='basic', created_at=datetime.datetime(2025, 10, 3, 2, 44, 50, 788000, tzinfo=datetime.timezone.utc), updated_at=datetime.datetime(2025, 10, 3, 2, 44, 50, 788000, tzinfo=datetime.timezone.utc))" + ] + }, + "execution_count": 29, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "langfuse.create_dataset(\n", + " name=\"basic\",\n", + " # optional description\n", + " description=\"Basic dataset\",\n", + " # optional metadata\n", + " metadata={\n", + " \"type\": \"benchmark\"\n", + " }\n", + ")\n", + "\n", + "langfuse.create_dataset_item(\n", + " dataset_name=\"basic\",\n", + " # any python object or value, optional\n", + " input={\n", + " \"text\": \"What is the capital of France?\"\n", + " },\n", + " # any python object or value, optional\n", + " expected_output={\n", + " \"text\": \"Paris\"\n", + " },\n", + ")\n", + "\n", + "langfuse.create_dataset_item(\n", + " dataset_name=\"basic\",\n", + " input={\n", + " \"text\": \"What is the capital of Germany?\"\n", + " },\n", + " expected_output={\n", + " \"text\": \"Berlin\"\n", + " },\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "0de09470", + "metadata": {}, + "source": [ + "LangSmith allows you to create datasets using the LangSmith SDK as well" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b291f3ef", + "metadata": {}, + "outputs": [], + "source": [ + "from langsmith import Client\n", + "\n", + "client = Client()\n", + "# Create a dataset\n", + "examples = [\n", + " {\n", + " \"input\": \"What is the capital of France?\",\n", + " \"expected_output\": \"Paris\"\n", + " },\n", + " {\n", + " \"input\": \"What is the capital of Germany?\",\n", + " \"expected_output\": \"Berlin\"\n", + " }\n", + "]\n", + "\n", + "dataset_name = \"basic\"\n", + "\n", + "if not client.has_dataset(dataset_name=dataset_name):\n", + " langsmith_dataset = client.create_dataset(dataset_name=dataset_name)\n", + " client.create_examples(\n", + " inputs=[{\"input\": ex[\"input\"]} for ex in examples],\n", + " outputs=[{\"expected_output\": ex[\"expected_output\"]} for ex in examples],\n", + " dataset_id=langsmith_dataset.id\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "558750e3", + "metadata": {}, + "source": [ + "#### Experiments" + ] + }, + { + "cell_type": "markdown", + "id": "b510021b", + "metadata": {}, + "source": [ + "Running experiments with LangFuse in the SDK is done through ```run_experiment```" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "id": "283098a7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Individual Results: Hidden (2 items)\\n💡 Set include_item_results=True to view them\\n\\n──────────────────────────────────────────────────\\n🧪 Experiment: Multi-metric Evaluation\n", + "📋 Run name: Multi-metric Evaluation - 2025-10-03T03:12:26.355573Z\\n2 items\\nEvaluations:\\n • response_length\\n • accuracy\\n\\nAverage Scores:\\n • response_length: 36.000\\n • accuracy: 1.000\\n\\n🔗 Dataset Run:\\n https://us.cloud.langfuse.com/project/cmg9xbp62008had07ab7us47z/datasets/cmga8r3ll02fuad06uf23cwmf/runs/dbd7a5a7-2a7a-42b4-a01c-606bd5e9eb16\n" + ] + } + ], + "source": [ + "from langfuse import Evaluation\n", + "from langfuse.openai import OpenAI\n", + "\n", + "# Define your task function\n", + "def my_task(*, item, **kwargs):\n", + " question = item.input[\"text\"]\n", + " print(question)\n", + " response = OpenAI().chat.completions.create(\n", + " model=\"gpt-4.1\", messages=[{\"role\": \"user\", \"content\": question}]\n", + " )\n", + " return response.choices[0].message.content\n", + " \n", + "\n", + "# Define evaluation functions\n", + "def accuracy_evaluator(*, input, output, expected_output, **kwargs):\n", + " if expected_output and expected_output[\"text\"].lower() in output.lower():\n", + " return Evaluation(name=\"accuracy\", value=1.0, comment=\"Correct answer found\")\n", + " return Evaluation(name=\"accuracy\", value=0.0, comment=\"Incorrect answer\")\n", + " \n", + "def length_evaluator(*, input, output, **kwargs):\n", + " return Evaluation(name=\"response_length\", value=len(output), comment=f\"Response has {len(output)} characters\")\n", + " \n", + "# Use multiple evaluators\n", + "dataset = langfuse.get_dataset(\"basic\")\n", + "result = langfuse.run_experiment(\n", + " name=\"Multi-metric Evaluation\",\n", + " data=dataset.items,\n", + " task=my_task,\n", + " evaluators=[accuracy_evaluator, length_evaluator]\n", + ")\n", + " \n", + "print(result.format())" + ] + }, + { + "cell_type": "markdown", + "id": "0668553f", + "metadata": {}, + "source": [ + "The equivalent in LangSmith is using ```evaluate()```" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "c0f37aec", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/robertxu/Desktop/Projects/experiments/tunnels/tunnel/.venv/lib/python3.13/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "View the evaluation results for experiment: 'multi-metric-eval-89657962' at:\n", + "https://smith.langchain.com/o/4015447c-43ab-4414-8539-633d4cb47217/datasets/335a7b19-f7bf-426f-a288-d5b39a5402fb/compare?selectedSessions=4c83a493-5938-487f-9929-1f26fd0c157f\n", + "\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2it [00:01, 1.27it/s]\n" + ] + } + ], + "source": [ + "from langsmith import Client, trace\n", + "from langsmith.wrappers import wrap_openai\n", + "from openai import OpenAI\n", + "\n", + "client = Client()\n", + "\n", + "dataset = client.read_dataset(dataset_name=\"basic\")\n", + "\n", + "# Wrap OpenAI client for tracing\n", + "openai_client = wrap_openai(OpenAI())\n", + "\n", + "# Define your task function\n", + "def my_task(inputs: dict) -> dict:\n", + " question = inputs[\"input\"]\n", + " response = openai_client.chat.completions.create(\n", + " model=\"gpt-4o-mini\", # Use a LangSmith-supported model name\n", + " messages=[{\"role\": \"user\", \"content\": question}],\n", + " )\n", + " return {\"output\": response.choices[0].message.content}\n", + "\n", + "\n", + "# Define evaluation functions\n", + "def accuracy_evaluator(inputs: dict, outputs: dict, reference_outputs: dict) -> dict:\n", + " output = outputs.get(\"output\", \"\")\n", + " expected = reference_outputs.get(\"expected_output\", \"\")\n", + " if expected and expected.lower() in output.lower():\n", + " return {\"key\": \"accuracy\", \"score\": 1.0, \"comment\": \"Correct answer found\"}\n", + " return {\"key\": \"accuracy\", \"score\": 0.0, \"comment\": \"Incorrect answer\"}\n", + "\n", + "def length_evaluator(inputs: dict, outputs: dict) -> dict:\n", + " output = outputs.get(\"output\", \"\")\n", + " return {\"key\": \"response_length\", \"score\": len(output), \"comment\": f\"Response has {len(output)} characters\"}\n", + "\n", + "# Run experiment\n", + "result = client.evaluate(\n", + " my_task,\n", + " data=dataset.id,\n", + " evaluators=[accuracy_evaluator, length_evaluator],\n", + " experiment_prefix=\"multi-metric-eval\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "97dafab6", + "metadata": {}, + "source": [ + "Both LangFuse and LangSmith support flexible evaluation types, including summary evaluators being defined in experiments." + ] + }, + { + "cell_type": "markdown", + "id": "ed82d900", + "metadata": {}, + "source": [ + "### **Prompts**" + ] + }, + { + "cell_type": "markdown", + "id": "ceb4e61a", + "metadata": {}, + "source": [ + "LangFuse and LangSmith both have prompting interfaces in the UI and the SDK.\n", + "\n", + "LangFuse uses the ```create_prompt``` method, shown below" + ] + }, + { + "cell_type": "code", + "execution_count": 60, + "id": "381a92fc", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 60, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "langfuse.create_prompt(\n", + " name=\"movie-critic-chat\",\n", + " type=\"chat\",\n", + " prompt=[\n", + " { \"role\": \"system\", \"content\": \"You are an {{criticlevel}} movie critic\" },\n", + " { \"role\": \"user\", \"content\": \"Do you like {{movie}}?\" },\n", + " ],\n", + " labels=[\"production\"], # directly promote to production\n", + " config={\n", + " \"model\": \"gpt-4o\",\n", + " \"temperature\": 0.7,\n", + " \"supported_languages\": [\"en\", \"fr\"],\n", + " }, # optionally, add configs (e.g. model parameters or model tools) or tags\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "36f8c750", + "metadata": {}, + "source": [ + "LangSmith has a comparable ```push_prompt``` function in the SDK, which automatically detects which model you're using and includes your exact configuration in the metadata." + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "id": "ec804c5e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'https://smith.langchain.com/prompts/movie-critic-chat/4cb87751?organizationId=4015447c-43ab-4414-8539-633d4cb47217'" + ] + }, + "execution_count": 64, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from langsmith import Client\n", + "from langchain_core.prompts import ChatPromptTemplate\n", + "from langchain_openai import ChatOpenAI\n", + "\n", + "client = Client()\n", + "\n", + "model = ChatOpenAI(model=\"gpt-4o-mini\")\n", + "prompt = ChatPromptTemplate([\n", + " (\"system\", \"You are an {criticlevel} movie critic\"),\n", + " (\"human\", \"Do you like {movie}?\")\n", + "])\n", + "chain = prompt | model\n", + "client.push_prompt(\"movie-critic-chat\", object=chain)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ac58d4f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +openai +python-dotenv +langfuse +langsmith +langchain +langchain-core +langchain-openai +langchain-anthropic +requests +tqdm \ No newline at end of file diff --git a/utils/langfuse.py b/utils/langfuse.py new file mode 100644 index 0000000..6acd747 --- /dev/null +++ b/utils/langfuse.py @@ -0,0 +1,135 @@ +import requests +from requests.auth import HTTPBasicAuth +from tqdm import tqdm +from urllib.parse import quote + +from config import LF_BASE, LF_HEADERS, LF_PUBLIC_KEY, LF_SECRET_KEY + + +def lf_get(path: str, **params): + """GET helper for Langfuse Public/Admin API with Basic auth.""" + resp = requests.get( + f"{LF_BASE}{path}", + headers=LF_HEADERS, + params=params, + auth=HTTPBasicAuth(LF_PUBLIC_KEY or "", LF_SECRET_KEY or ""), + ) + resp.raise_for_status() + return resp.json() + + +def lf_get_projects() -> list[dict]: + """List Langfuse projects. + + Tries Public API path first, then falls back to Admin API path. + """ + try: + # Public API style + data = lf_get("/api/public/projects") + # Normalize to list of {id, name} + if isinstance(data, dict): + items = data.get("data") or data.get("projects") or data.get("objects") + else: + items = data + if items: + return items + except Exception as e: + raise e + + + +def lf_get_project_prompts(project_id: str) -> list[dict]: + """ + Return prompts for a Langfuse project. + """ + names: list[str] = [] + page = 1 + while True: + listing = lf_get("/api/public/v2/prompts", project_id=project_id, page=page) + objs = ( + listing.get("objects") + if isinstance(listing, dict) and "objects" in listing + else listing.get("data") if isinstance(listing, dict) else listing + ) or [] + if not objs: + break + for p in objs: + name = p.get("name") or p.get("slug") or p.get("id") + if name: + names.append(str(name)) + page += 1 + + detailed: list[dict] = [] + for name in names: + try: + p_detail = lf_get(f"/api/public/v2/prompts/{quote(name, safe='')}", project_id=project_id) + detailed.append(p_detail) + except Exception: + detailed.append({"name": name}) + return detailed + + +def lf_get_project_datasets( + project_id: str +) -> dict[str, list[dict]]: + """Fetch datasets and their records from Langfuse for a project. + + Returns a mapping of dataset name → list of record dicts. + """ + out: dict[str, list[dict]] = {} + # List datasets (v2) with page/limit pagination + datasets: list[dict] = [] + page = 1 + while True: + dresp = lf_get("/api/public/v2/datasets", project_id=project_id, page=page, limit=100) + batch = ( + dresp.get("objects") + if isinstance(dresp, dict) and "objects" in dresp + else dresp.get("data") if isinstance(dresp, dict) else dresp + ) or [] + if not batch: + break + datasets.extend(batch) + page += 1 + + for ds in datasets: + ds_id = ds.get("id") + ds_name = ds.get("name") or f"dataset-{ds_id}" + items: list[dict] = [] + try: + # List dataset items (v2) with page/limit; then enrich each with detail by id + page_items = 1 + while True: + listing = lf_get( + "/api/public/dataset-items", + datasetId=ds_id, + page=page_items, + limit=100, + ) + batch = ( + listing.get("objects") + if isinstance(listing, dict) and "objects" in listing + else listing.get("data") if isinstance(listing, dict) else listing + ) or [] + if not batch: + break + + for it in batch: + item_id = it.get("id") or it.get("item_id") or it.get("uuid") + if not item_id: + items.append(it) + continue + try: + detail = lf_get(f"/api/public/dataset-items/{item_id}") + items.append(detail or it) + except Exception: + items.append(it) + + page_items += 1 + + print(f" > fetched {len(items)} items from '{ds_name}'") + except Exception as e: + print(f" ! couldn't fetch '{ds_name}': {e}") + items = [] + out[ds_name] = items + return out diff --git a/utils/langsmith.py b/utils/langsmith.py new file mode 100644 index 0000000..2863c50 --- /dev/null +++ b/utils/langsmith.py @@ -0,0 +1,90 @@ +import requests +from typing import Optional + +from langsmith import Client +from config import LS_BASE, LS_HEADERS, LS_API_KEY + + +def ls_get_or_create_workspace(ws_name: str) -> dict: + try: + r = requests.post( + f"{LS_BASE}/api/v1/workspaces", + headers=LS_HEADERS, + json={"display_name": ws_name}, + ) + r.raise_for_status() + return r.json() + except requests.HTTPError as e: + # fetch existing + ex = requests.get(f"{LS_BASE}/api/v1/workspaces", headers=LS_HEADERS).json() + return next(ws for ws in ex if ws["display_name"] == ws_name) + + +def ls_create_dataset(workspace_id: str, name: str) -> str: + hdrs = LS_HEADERS | {"X-Tenant-Id": workspace_id} + r = requests.post(f"{LS_BASE}/api/v1/datasets", headers=hdrs, json={"name": name}) + if r.status_code == 409: # already there + r = requests.get(f"{LS_BASE}/api/v1/datasets", headers=hdrs).json() + return next(ds["id"] for ds in r if ds["name"] == name) + r.raise_for_status() + return r.json()["id"] + + +def ls_upload_examples(workspace_id: str, dataset_id: str, examples: list[dict]): + hdrs = LS_HEADERS | {"X-Tenant-Id": workspace_id} + for ex in examples: + payload = {"dataset_id": dataset_id} | ex + requests.post( + f"{LS_BASE}/api/v1/examples", headers=hdrs, json=payload + ).raise_for_status() + + +def ls_push_prompt( + name: str, + description: str, + prompt_obj: object, + workspace_id: str, + pat: Optional[str] = None, +) -> str: + """Create/Update a prompt (+model) in the given workspace; return URL.""" + pat = pat or LS_API_KEY + session = requests.Session() + session.headers.update({"X-Tenant-Id": workspace_id}) + client = Client(api_key=pat, api_url=LS_BASE, session=session) + + url = client.push_prompt( + name, + object=prompt_obj, + description=description, + # metadata & input vars not yet supported in SDK call → store via tags + ) + return url + + +def ls_upload_runs(workspace_id: str, runs: list[dict]): + """Upload runs and feedback to LangSmith workspace via REST API. + + Expects run dicts shaped similarly to LangSmith runs. Feedback items can be + passed as dicts with key 'type' == 'feedback'. + """ + hdrs = LS_HEADERS | {"X-Tenant-Id": workspace_id} + for item in runs: + if isinstance(item, dict) and item.get("type") == "feedback": + payload = { + "run_id": item.get("run_id"), + "key": item.get("key"), + "score": item.get("score"), + "comment": item.get("comment"), + "source": item.get("source"), + "metadata": item.get("metadata", {}), + "timestamp": item.get("timestamp"), + } + requests.post(f"{LS_BASE}/api/v1/feedback", headers=hdrs, json=payload).raise_for_status() + continue + + # Normal run + run_payload = {k: v for k, v in item.items() if k in { + "id","name","run_type","inputs","outputs","start_time","end_time","session_id", + "parent_run_id","tags","metadata","error","invocation_params","usage_metadata" + }} + requests.post(f"{LS_BASE}/api/v1/runs", headers=hdrs, json=run_payload).raise_for_status()