cleanup arize and prompt migration

This commit is contained in:
christineastoria
2025-12-03 20:38:36 -05:00
parent 390e3a3d58
commit 21089b4061
9 changed files with 43 additions and 603 deletions
+7
View File
@@ -2,6 +2,13 @@ LANGFUSE_SECRET_KEY="<redacted>"
LANGFUSE_PUBLIC_KEY="<redacted>"
LANGFUSE_HOST="https://us.cloud.langfuse.com"
INCLUDE_MODEL_IN_PROMPTS=true
NUM_TRACES_TO_REPLAY=10
PHOENIX_API_KEY="<redacted>"
PHOENIX_SPACE="<redacted>"
LANGSMITH_API_KEY="<redacted>"
LANGSMITH_ENDPOINT="https://api.smith.langchain.com"
LANGSMITH_TRACING=true
+1 -1
View File
@@ -29,7 +29,7 @@ Each provider has a runbook notebook to illustrate how to migrate your existing
Each observability provider in this repo has a runbook under its corresponding directory.
- Langfuse runbook: `providers/langfuse/runbook.ipynb`
- Arize runbook: `providers/arize/runbook.ipynb` (coming soon)
- Arize Phoenix runbook: `providers/phoenix/runbook.ipynb` (coming soon)
Usage:
1. Open the runbook in your IDE or using ```jupyter notebook``` in the root directory.
+2 -7
View File
@@ -8,15 +8,10 @@ LF_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY")
LF_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY")
LF_BASE = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
# Arize configuration
ARIZE_API_KEY = os.getenv("ARIZE_API_KEY")
ARIZE_SPACE_ID = os.getenv("ARIZE_SPACE_ID")
ARIZE_PROJECT_NAMES = [p.strip() for p in os.getenv("ARIZE_PROJECT_NAMES", "").split(",") if p.strip()]
# Arize: Phoenix configuration
# Arize Phoenix configuration
PHOENIX_API_KEY = os.getenv("PHOENIX_API_KEY")
PHOENIX_HOST = os.getenv("PHOENIX_HOST", "https://app.phoenix.arize.com")
PHOENIX_SPACE = os.getenv("PHOENIX_SPACE", "") # e.g., "christine"
PHOENIX_SPACE = os.getenv("PHOENIX_SPACE", "")
# LangSmith configuration
LS_API_KEY = os.getenv("LANGSMITH_API_KEY")
+5 -18
View File
@@ -1,15 +1,12 @@
from providers.langfuse.main import migrate_langfuse
from providers.arize.main import migrate_arize
from providers.phoenix.main import migrate_phoenix
from config import INCLUDE_MODEL_IN_PROMPTS, NUM_TRACES_TO_REPLAY
from utils.langfuse import lf_get_projects
from utils.arize import arize_get_projects
from utils.phoenix import phoenix_get_projects
AVAILABLE_PROVIDERS = [
"langfuse",
"arize",
"phoenix",
"arize-phoenix",
]
## ------------------------------------------------------------
@@ -60,23 +57,13 @@ def migrate(provider: str):
print("No project found for the configured API keys.")
return
migrate_langfuse(projects)
elif provider == "arize":
display_config("Arize", "project")
projects = arize_get_projects()
if not projects:
print("No projects found. Set ARIZE_PROJECT_NAMES in .env (comma-separated).")
return
migrate = capture_user_selection("Arize", "projects")
if not migrate:
return
migrate_arize(projects)
elif provider == "phoenix":
display_config("Phoenix", "project")
elif provider == "arize-phoenix":
display_config("Arize Phoenix", "project")
projects = phoenix_get_projects()
if not projects:
print("No projects found in Phoenix.")
print("No projects found in Arize Phoenix.")
return
migrate = capture_user_selection("Phoenix", "projects")
migrate = capture_user_selection("Arize Phoenix", "projects")
if not migrate:
return
migrate_phoenix(projects)
-121
View File
@@ -1,121 +0,0 @@
import traceback
import pandas as pd
from utils.arize import arize_get_datasets, arize_get_dataset_examples
from utils.langsmith import ls_create_dataset, ls_upload_examples
def _extract_expected_value(record: dict) -> str:
keys = [
"expected",
"expected_output",
"expectedOutput",
"reference_output",
"referenceOutput",
"output",
"output.value",
]
for k in keys:
if k in record and record.get(k) not in (None, ""):
return record.get(k)
for v in record.values():
if isinstance(v, dict):
for k in keys:
if k in v and v.get(k) not in (None, ""):
return v.get(k)
return ""
def arize_example_conversion(records: list[dict]) -> list[dict]:
"""Convert Arize dataset records to LangSmith format.
Note: Dataset versioning and experiment associations are not migrated.
"""
examples = []
for r in records:
inputs = {}
for key, value in r.items():
if key.startswith("input.") or key.startswith("input_"):
clean_key = key.replace("input.", "").replace("input_", "")
inputs[clean_key] = value
elif key == "input":
if isinstance(value, dict):
inputs.update(value)
else:
inputs["input"] = value
if not inputs:
inputs = (
r.get("inputs")
or r.get("input")
or {"input": r.get("prompt", r.get("question", ""))}
)
if not isinstance(inputs, dict):
inputs = {"input": inputs}
meta = r.get("metadata") or r.get("meta") or {}
for key, value in r.items():
if key.startswith("metadata.") or key.startswith("meta."):
clean_key = key.replace("metadata.", "").replace("meta.", "")
meta[clean_key] = value
for mk in ("metadata", "meta"):
if mk in inputs:
val = inputs.pop(mk)
if isinstance(val, dict):
meta = {**meta, **val}
expected = _extract_expected_value(r)
if not expected:
expected = r.get("output.value", r.get("output", ""))
ex = {
"inputs": inputs,
"outputs": {"reference_output": expected},
}
if isinstance(meta, dict) and meta:
ex["metadata"] = meta
examples.append(ex)
return examples
def migrate_datasets(workspace_id: str):
try:
datasets_df = arize_get_datasets()
except Exception as e:
print(f" x failed to fetch Arize datasets: {e}")
traceback.print_exc()
return
if datasets_df is None or datasets_df.empty:
print(" - no datasets found in Arize space")
return
for _, ds_row in datasets_df.iterrows():
ds_id = ds_row.get("id") or ds_row.get("dataset_id")
ds_name = ds_row.get("name") or f"arize-dataset-{ds_id}"
print(f" - migrating dataset: {ds_name}")
try:
ls_ds_id = ls_create_dataset(workspace_id, ds_name)
if ls_ds_id is None:
print(f" • skipped (already exists)")
continue
examples_df = arize_get_dataset_examples(dataset_id=ds_id)
if examples_df is not None and not examples_df.empty:
records = examples_df.to_dict(orient="records")
examples = arize_example_conversion(records)
ls_upload_examples(workspace_id, ls_ds_id, examples)
print(f" • uploaded {len(examples)} examples")
else:
print(f" • no examples found")
except Exception as e:
print(f" x dataset '{ds_name}' failed: {e}")
traceback.print_exc()
-314
View File
@@ -1,314 +0,0 @@
from __future__ import annotations
import traceback
import uuid
import datetime as dt
from config import NUM_TRACES_TO_REPLAY
from utils.arize import arize_export_traces
from utils.langsmith import ls_replay_runs_sdk
def safe_isoformat(dt_obj):
if dt_obj is None:
return None
if not isinstance(dt_obj, dt.datetime):
if isinstance(dt_obj, str):
return dt_obj
return None
if dt_obj.tzinfo is None:
dt_obj = dt_obj.replace(tzinfo=dt.timezone.utc)
s = dt_obj.isoformat(timespec='milliseconds')
return s[:-6] + 'Z' if s.endswith('+00:00') else s
def _compact_ts(ts_val):
if ts_val is None:
return ""
if isinstance(ts_val, str):
try:
s = ts_val[:-1] + '+00:00' if ts_val.endswith('Z') else ts_val
dt_obj = dt.datetime.fromisoformat(s)
except Exception:
return ""
else:
dt_obj = ts_val
if dt_obj.tzinfo is None:
dt_obj = dt_obj.replace(tzinfo=dt.timezone.utc)
return dt_obj.strftime('%Y%m%dT%H%M%S') + f"{dt_obj.microsecond:06d}" + 'Z'
def _span_kind_to_run_type(span_kind: str) -> str:
if not span_kind:
return "chain"
kind_lower = str(span_kind).lower()
mapping = {
"llm": "llm",
"chain": "chain",
"agent": "chain",
"tool": "tool",
"retriever": "retriever",
"embedding": "llm",
"reranker": "chain",
"guardrail": "chain",
}
return mapping.get(kind_lower, "chain")
def _ensure_end_times(runs: list[dict]):
for r in runs:
if isinstance(r, dict) and r.get("end_time") is None:
r["end_time"] = r.get("start_time")
def _children_map(runs: list[dict]) -> dict:
id_to_run = {r["id"]: r for r in runs if isinstance(r, dict)}
cmap: dict[str, list[str]] = {}
for r in runs:
if not isinstance(r, dict):
continue
pid = r.get("parent_run_id")
if pid:
cmap.setdefault(pid, []).append(r["id"])
for pid, kids in list(cmap.items()):
kids.sort(key=lambda k: id_to_run.get(k, {}).get('start_time') or '')
cmap[pid] = kids
return cmap
def _assign_dotted_order(runs: list[dict], root_id: str):
id_to_run = {r["id"]: r for r in runs if isinstance(r, dict)}
cmap = _children_map(runs)
def assign(run_id: str, parent_dotted: str | None):
run = id_to_run.get(run_id)
if not run:
return
ts = run.get('start_time') or run.get('end_time') or id_to_run.get(root_id, {}).get('start_time')
seg = _compact_ts(ts) + run_id
dotted = seg if not parent_dotted else f"{parent_dotted}.{seg}"
run["dotted_order"] = dotted
for kid in cmap.get(run_id, []):
assign(kid, dotted)
assign(root_id, None)
def _get_col(row, *col_names, default=None):
import pandas as pd
import numpy as np
for col in col_names:
if col in row.index:
val = row[col]
if val is None:
continue
if isinstance(val, (list, np.ndarray)):
if len(val) == 0:
continue
return val
try:
if pd.isna(val):
continue
except (ValueError, TypeError):
pass
if isinstance(val, float) and str(val) == 'nan':
continue
if isinstance(val, str) and val.strip() == '':
continue
return val
return default
def map_arize_spans_to_langsmith(traces_df) -> list[dict]:
"""Transform Arize trace DataFrame to LangSmith runs format.
Note: The following Arize data is not explicitly migrated, but may be preserved in metadata:
- Span events and exceptions (only status is preserved)
- Evaluation scores and annotations
- Session/conversation groupings
- Cost tracking data
- Retrieval document contents (only query is preserved)
"""
if traces_df is None or traces_df.empty:
return []
runs = []
trace_id_col = None
for col in ['context.trace_id', 'trace_id', 'context_trace_id']:
if col in traces_df.columns:
trace_id_col = col
break
if not trace_id_col:
print(" ! Could not find trace_id column in Arize export")
return []
span_id_mapping = {}
for trace_id, trace_group in traces_df.groupby(trace_id_col):
new_trace_id = str(uuid.uuid4())
trace_spans = trace_group.sort_values(
by=[c for c in ['start_time', 'context.span_id', 'span_id'] if c in trace_group.columns][:1]
)
for _, row in trace_spans.iterrows():
orig_span_id = _get_col(row, 'context.span_id', 'span_id', 'context_span_id')
if orig_span_id:
span_id_mapping[str(orig_span_id)] = str(uuid.uuid4())
root_run_id = None
for _, row in trace_spans.iterrows():
orig_span_id = _get_col(row, 'context.span_id', 'span_id', 'context_span_id')
run_id = span_id_mapping.get(str(orig_span_id), str(uuid.uuid4()))
orig_parent_id = _get_col(row, 'parent_id', 'parent_span_id')
parent_run_id = span_id_mapping.get(str(orig_parent_id)) if orig_parent_id else None
if parent_run_id is None and root_run_id is None:
root_run_id = run_id
span_kind = _get_col(row, 'span_kind', 'openinference.span.kind', 'attributes.openinference.span.kind')
run_type = _span_kind_to_run_type(span_kind)
span_name = _get_col(row, 'name', 'span_name') or "span"
inputs = {}
input_value = _get_col(row, 'attributes.input.value', 'input.value')
if input_value:
inputs["input"] = input_value
llm_input = _get_col(row, 'attributes.llm.input_messages', 'llm.input_messages')
if llm_input:
inputs["messages"] = llm_input
llm_prompt = _get_col(row, 'attributes.llm.prompt')
if llm_prompt and "input" not in inputs:
inputs["input"] = llm_prompt
tool_params = _get_col(row, 'attributes.tool.parameters', 'attributes.tool.arguments')
if tool_params:
inputs["tool_parameters"] = tool_params
tool_name = _get_col(row, 'attributes.tool.name')
if tool_name:
inputs["tool_name"] = tool_name
retrieval_query = _get_col(row, 'attributes.retrieval.query')
if retrieval_query:
inputs["query"] = retrieval_query
outputs = {}
output_value = _get_col(row, 'attributes.output.value', 'output.value')
if output_value:
outputs["output"] = output_value
llm_output = _get_col(row, 'attributes.llm.output_messages', 'llm.output_messages')
if llm_output:
outputs["messages"] = llm_output
llm_response = _get_col(row, 'attributes.llm.response', 'attributes.output.response')
if llm_response and "output" not in outputs:
outputs["output"] = llm_response
tool_result = _get_col(row, 'attributes.tool.result')
if tool_result:
outputs["tool_result"] = tool_result
metadata = {}
model_name = _get_col(row, 'attributes.llm.model_name', 'attributes.llm.model')
if model_name:
metadata["ls_model_name"] = model_name
model_lower = str(model_name).lower()
if "gpt" in model_lower or "openai" in model_lower:
metadata["ls_provider"] = "openai"
elif "claude" in model_lower or "anthropic" in model_lower:
metadata["ls_provider"] = "anthropic"
prompt_tokens = _get_col(row, 'attributes.llm.token_count.prompt')
completion_tokens = _get_col(row, 'attributes.llm.token_count.completion')
total_tokens = _get_col(row, 'attributes.llm.token_count.total')
if prompt_tokens or completion_tokens or total_tokens:
metadata["token_usage"] = {}
if prompt_tokens:
metadata["token_usage"]["prompt_tokens"] = int(prompt_tokens)
if completion_tokens:
metadata["token_usage"]["completion_tokens"] = int(completion_tokens)
if total_tokens:
metadata["token_usage"]["total_tokens"] = int(total_tokens)
invocation_params = _get_col(row, 'attributes.llm.invocation_parameters')
if invocation_params:
metadata["invocation_params"] = invocation_params
for col in row.index:
if col.startswith('attributes.metadata.') or col.startswith('metadata.'):
val = _get_col(row, col)
if val is not None:
key = col.replace('attributes.metadata.', '').replace('metadata.', '')
metadata[key] = val
start_time = _get_col(row, 'start_time')
end_time = _get_col(row, 'end_time')
trace_id = run_id if parent_run_id is None else root_run_id
run = {
"id": run_id,
"trace_id": trace_id,
"name": _get_col(row, 'name', 'span_name') or "span",
"run_type": run_type,
"parent_run_id": parent_run_id,
"inputs": inputs if inputs else {},
"outputs": outputs if outputs else {},
"start_time": safe_isoformat(start_time) if start_time else None,
"end_time": safe_isoformat(end_time) if end_time else None,
"metadata": metadata,
"tags": [],
}
runs.append(run)
if root_run_id:
trace_runs = [r for r in runs if r["trace_id"] == root_run_id]
_ensure_end_times(trace_runs)
_assign_dotted_order(trace_runs, root_run_id)
return runs
def migrate_traces(workspace_id: str, project_name: str, days_back: int = 7):
print(f" - migrating traces (last {days_back} days)...")
try:
if NUM_TRACES_TO_REPLAY and NUM_TRACES_TO_REPLAY > 0:
days_back = max(1, NUM_TRACES_TO_REPLAY // 100) or days_back
traces_df = arize_export_traces(project_name, days_back=days_back)
except Exception as e:
print(f" x failed to export traces from Arize: {e}")
traceback.print_exc()
return
if traces_df is None or traces_df.empty:
print(" • no traces found")
return
print(f" • exported {len(traces_df)} spans from Arize")
try:
runs = map_arize_spans_to_langsmith(traces_df)
if not runs:
print(" • no runs to upload after transformation")
return
if NUM_TRACES_TO_REPLAY and NUM_TRACES_TO_REPLAY > 0:
unique_traces = set(r["trace_id"] for r in runs)
if len(unique_traces) > NUM_TRACES_TO_REPLAY:
trace_ids = list(unique_traces)[:NUM_TRACES_TO_REPLAY]
runs = [r for r in runs if r["trace_id"] in trace_ids]
ls_replay_runs_sdk(workspace_id, runs, project_name=project_name)
unique_traces = len(set(r["trace_id"] for r in runs))
print(f" • uploaded {len(runs)} spans ({unique_traces} traces) to project '{project_name}'")
except Exception as e:
print(f" x failed to transform/upload traces: {e}")
traceback.print_exc()
-26
View File
@@ -1,26 +0,0 @@
from utils.langsmith import ls_get_or_create_workspace
from config import LS_WORKSPACE_ID
from providers.arize.data.datasets import migrate_datasets
from providers.arize.data.traces import migrate_traces
def migrate_arize(projects: list[dict]):
"""Migrate data from Arize to LangSmith."""
for proj in projects:
project_name = proj.get("name")
print(f"\n- Project: {project_name}")
if LS_WORKSPACE_ID:
ws_id = LS_WORKSPACE_ID
print(f" + using workspace: {ws_id}")
else:
ws = ls_get_or_create_workspace(project_name)
ws_id = ws["id"]
print(f" + workspace id: {ws_id}")
migrate_datasets(ws_id)
migrate_traces(ws_id, project_name)
print("\n+ Migration complete.")
+28 -40
View File
@@ -25,52 +25,37 @@ def string_to_chat_template(template: str) -> ChatPromptTemplate:
return ChatPromptTemplate.from_template(template)
def detect_model_provider(model_name: str) -> str:
"""Detect if a model is from OpenAI, Anthropic, or other provider."""
if not model_name:
return "openai"
model_lower = model_name.lower()
if any(x in model_lower for x in ["claude", "anthropic"]):
return "anthropic"
if any(
x in model_lower
for x in ["gpt", "openai", "o1", "text-davinci", "text-curie", "text-babbage", "text-ada"]
):
return "openai"
return "openai"
def get_model_instance(model_name: str, model_params: dict = None):
"""Get the appropriate LangChain model instance based on the model name."""
provider = detect_model_provider(model_name)
def get_model_instance(model_name: str, model_provider: str = None, model_params: dict = None):
"""Get the appropriate LangChain model instance based on model name and provider.
Returns None for unsupported providers (prompt-only mode).
"""
params = dict(model_params or {})
# Handle Phoenix nested invocation_parameters format:
# {'type': 'openai', 'openai': {'temperature': 1.0}}
if 'type' in params:
param_type = params.pop('type', None)
# Extract actual params from nested provider key
if param_type and param_type in params:
nested_params = params.pop(param_type, {})
params.update(nested_params)
# Also try 'openai' or 'anthropic' keys
for key in ['openai', 'anthropic']:
if key in params:
nested_params = params.pop(key, {})
params.update(nested_params)
# Extract actual params from under the provider key
params.pop('type', None) # Remove 'type' key
provider_key = (model_provider or "").lower()
if provider_key and provider_key in params:
nested_params = params.pop(provider_key, {})
params.update(nested_params)
# Remove invalid keys
params.pop("model", None)
params.pop("supported_languages", None)
if provider == "anthropic":
# Normalize provider string
provider = (model_provider or "").upper()
# Only support OpenAI and Anthropic in LangChain
if provider == "ANTHROPIC" or (model_name and "claude" in model_name.lower()):
return ChatAnthropic(model=model_name, **params)
else:
elif provider == "OPENAI" or (model_name and any(x in model_name.lower() for x in ["gpt", "o1"])):
return ChatOpenAI(model=model_name, **params)
else:
# Unsupported provider (DeepSeek, etc.) - return None to use prompt-only mode
return None
def phoenix_prompt_conversion(phoenix_prompt, prompt_info: dict = None) -> dict:
@@ -166,14 +151,17 @@ def phoenix_prompt_conversion(phoenix_prompt, prompt_info: dict = None) -> dict:
def prompt_dict_to_obj(prompt_dict: dict, include_model: bool = True) -> object:
chat_prompt = string_to_chat_template(prompt_dict["prompt_template"])
model_name = prompt_dict["metadata"].get("model")
model_provider = prompt_dict["metadata"].get("model_provider", "")
model_params = prompt_dict["metadata"].get("model_params", {})
if model_name and include_model:
try:
model = get_model_instance(model_name, model_params)
obj = RunnableSequence(chat_prompt, model)
provider = detect_model_provider(model_name)
print(f" ... using {provider} model: {model_name}")
model = get_model_instance(model_name, model_provider, model_params)
if model is not None:
obj = RunnableSequence(chat_prompt, model)
print(f" ... using {model_provider or 'detected'} model: {model_name}")
else:
obj = chat_prompt
except Exception as e:
print(f" ! failed to create model {model_name}, using prompt only: {e}")
obj = chat_prompt
-76
View File
@@ -1,76 +0,0 @@
import pandas as pd
from datetime import datetime, timedelta, timezone
from arize.experimental.datasets import ArizeDatasetsClient
from arize.exporter import ArizeExportClient
from arize.utils.types import Environments
from config import ARIZE_API_KEY, ARIZE_SPACE_ID, ARIZE_PROJECT_NAMES
def get_arize_client() -> ArizeDatasetsClient:
"""Get an authenticated Arize Datasets client."""
return ArizeDatasetsClient(
api_key=ARIZE_API_KEY,
)
def arize_get_projects() -> list[dict]:
"""Get list of Arize projects from config."""
return [{"name": name} for name in ARIZE_PROJECT_NAMES]
def arize_get_datasets() -> pd.DataFrame:
"""List all datasets in the Arize space.
Returns a DataFrame with dataset metadata (id, name, etc.).
"""
client = get_arize_client()
datasets_df = client.list_datasets(space_id=ARIZE_SPACE_ID)
return datasets_df
def arize_get_dataset_examples(dataset_id: str = None, dataset_name: str = None) -> pd.DataFrame:
"""Fetch all examples from an Arize dataset.
Args:
dataset_id: The dataset ID. Required if dataset_name is not provided.
dataset_name: The dataset name. Required if dataset_id is not provided.
Returns a DataFrame with the dataset examples.
"""
client = get_arize_client()
examples_df = client.get_dataset(
space_id=ARIZE_SPACE_ID,
dataset_id=dataset_id,
dataset_name=dataset_name,
)
return examples_df
def get_arize_export_client() -> ArizeExportClient:
"""Get an authenticated Arize Export client for traces."""
return ArizeExportClient()
def arize_export_traces(project_name: str, days_back: int = 7) -> pd.DataFrame:
"""Export traces from Arize.
Args:
project_name: The Arize project name (model_id).
days_back: Number of days back to export traces from (default: 7).
Returns a DataFrame with trace/span data.
"""
client = get_arize_export_client()
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=days_back)
traces_df = client.export_model_to_df(
space_id=ARIZE_SPACE_ID,
model_id=project_name,
environment=Environments.TRACING,
start_time=start_time,
end_time=end_time,
)
return traces_df