Merge pull request #43 from nishanthp/add_support_for_parquet

Add Parquet export support to OtelTracesSqlEngine
This commit is contained in:
Clelia (Astra) Bertelli
2026-03-02 09:58:20 +01:00
committed by GitHub
3 changed files with 218 additions and 0 deletions
+40
View File
@@ -309,6 +309,46 @@ if file_input is not None:
except Exception as e:
st.error(f"Error generating podcast: {str(e)}")
st.markdown("---")
st.markdown("## Export Traces")
with st.expander("Export Traces to Parquet"):
export_path = st.text_input(
"Output Path",
value="traces.parquet",
help="Path to save the Parquet file or directory",
)
compression = st.selectbox(
"Compression",
["snappy", "gzip", "brotli", "lz4", "zstd"],
index=0,
)
partition_cols_input = st.text_input(
"Partition Columns (comma separated)",
value="service_name,date",
help="Example: service_name,date",
)
if st.button("Export Parquet"):
try:
partition_cols = (
[col.strip() for col in partition_cols_input.split(",")]
if partition_cols_input.strip()
else None
)
sql_engine.to_parquet(
output_path=export_path,
compression=compression,
partition_cols=partition_cols,
)
st.success(f"Parquet exported to {export_path}")
except Exception as e:
st.error(f"Export failed: {str(e)}")
else:
st.info("Please upload a PDF file to get started.")
+46
View File
@@ -139,6 +139,52 @@ class OtelTracesSqlEngine:
self._connect()
return pd.read_sql_table(table_name=self.table_name, con=self._connection)
def to_parquet(
self,
output_path: str,
compression: Literal["snappy", "gzip", "brotli", "lz4", "zstd"] = "snappy",
partition_cols: Optional[List[str]] = None,
) -> None:
"""
Export traces to Parquet format for efficient storage and querying.
Args:
output_path: Path to save parquet file/directory
compression: Compression algorithm (default: snappy)
partition_cols: Columns to partition by (e.g., ['service_name', 'date'])
"""
try:
df = self.to_pandas()
except ValueError:
df = pd.DataFrame()
# Ensure start_time is datetime for better downstream usability
if "start_time" in df.columns:
df["start_time"] = pd.to_datetime(df["start_time"], unit="us")
# Handle partition columns
if partition_cols:
# Derive date column only if explicitly requested
if "date" in partition_cols:
if "start_time" not in df.columns:
raise ValueError("Cannot derive 'date' column without 'start_time'")
df["date"] = df["start_time"].dt.date
# Validate partition columns
missing_cols = [col for col in partition_cols if col not in df.columns]
if missing_cols:
raise ValueError(
f"Invalid partition columns: {missing_cols}. "
f"Available columns: {list(df.columns)}"
)
df.to_parquet(
output_path,
compression=compression,
partition_cols=partition_cols,
index=False,
)
def disconnect(self) -> None:
if not self._connection:
raise ValueError("Engine was never connected!")
+132
View File
@@ -0,0 +1,132 @@
import pandas as pd
import pytest
from src.notebookllama.instrumentation import OtelTracesSqlEngine
def _create_sample_df():
return pd.DataFrame(
{
"trace_id": ["abc"],
"span_id": ["span1"],
"parent_span_id": [None],
"operation_name": ["op"],
"start_time": [1700000000000000], # microseconds
"duration": [100],
"status_code": ["OK"],
"service_name": ["svc"],
}
)
def test_to_parquet_creates_file(tmp_path):
engine = OtelTracesSqlEngine(engine_url="sqlite:///:memory:")
engine._connect()
df = _create_sample_df()
engine._to_sql(df, if_exists_policy="replace")
output_file = tmp_path / "traces.parquet"
engine.to_parquet(str(output_file))
assert output_file.exists()
def test_to_parquet_writes_correct_content(tmp_path):
engine = OtelTracesSqlEngine(engine_url="sqlite:///:memory:")
engine._connect()
df = _create_sample_df()
engine._to_sql(df, if_exists_policy="replace")
output_file = tmp_path / "traces.parquet"
engine.to_parquet(str(output_file))
exported_df = pd.read_parquet(output_file)
assert len(exported_df) == 1
assert exported_df["trace_id"].iloc[0] == "abc"
assert exported_df["service_name"].iloc[0] == "svc"
def test_to_parquet_with_partitioning(tmp_path):
engine = OtelTracesSqlEngine(engine_url="sqlite:///:memory:")
engine._connect()
df = _create_sample_df()
engine._to_sql(df, if_exists_policy="replace")
output_dir = tmp_path / "partitioned"
engine.to_parquet(
str(output_dir),
partition_cols=["service_name", "date"],
)
# derive expected date from start_time
expected_date = pd.to_datetime(1700000000000000, unit="us").date()
partition_path = output_dir / "service_name=svc" / f"date={expected_date}"
assert partition_path.exists()
assert any(partition_path.glob("*.parquet"))
def test_to_parquet_empty_table(tmp_path):
engine = OtelTracesSqlEngine(engine_url="sqlite:///:memory:")
engine._connect()
# Create empty table explicitly
empty_df = pd.DataFrame(
columns=[
"trace_id",
"span_id",
"parent_span_id",
"operation_name",
"start_time",
"duration",
"status_code",
"service_name",
]
)
engine._to_sql(empty_df, if_exists_policy="replace")
output_file = tmp_path / "empty.parquet"
engine.to_parquet(str(output_file))
assert output_file.exists()
df = pd.read_parquet(output_file)
assert df.empty
def test_to_parquet_invalid_partition_column(tmp_path):
engine = OtelTracesSqlEngine(engine_url="sqlite:///:memory:")
engine._connect()
df = _create_sample_df()
engine._to_sql(df, if_exists_policy="replace")
with pytest.raises(ValueError):
engine.to_parquet(
str(tmp_path / "bad.parquet"),
partition_cols=["does_not_exist"],
)
def test_to_parquet_with_compression(tmp_path):
engine = OtelTracesSqlEngine(engine_url="sqlite:///:memory:")
engine._connect()
df = _create_sample_df()
engine._to_sql(df, if_exists_policy="replace")
output_file = tmp_path / "compressed.parquet"
engine.to_parquet(
str(output_file),
compression="gzip",
)
assert output_file.exists()