fix: Flush trace data on shutdown (#9)

* feat: Make span ttl configurable

* add test

* add testing to ci

* update github action runner

* update github action runner
This commit is contained in:
angus-langchain
2025-07-24 16:38:45 -07:00
committed by GitHub
parent 9f775d0708
commit 69fae23ef0
8 changed files with 1006 additions and 6 deletions
+159
View File
@@ -0,0 +1,159 @@
name: 'CI: Go Tests'
on:
push:
branches: [main]
pull_request:
workflow_dispatch:
# If another push to the same PR or branch happens while this workflow is still running,
# cancel the earlier run in favor of the next run.
#
# There's no point in testing an outdated version of the code. GitHub only allows
# a limited number of job runners to be active at the same time,
# so it's better to cancel pointless jobs early so that more useful jobs can run sooner.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod
cache-dependency-path: go.sum
- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download dependencies
run: make deps
- name: Run tests
run: make test
- name: Run go vet
run: make vet
test-coverage:
name: Test Coverage
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod
cache-dependency-path: go.sum
- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download dependencies
run: make deps
- name: Run tests with coverage
run: make test-coverage
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
file: ./coverage.out
flags: unittests
name: codecov-umbrella
fail_ci_if_error: false
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod
cache-dependency-path: go.sum
- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download dependencies
run: make deps
- name: Install golangci-lint
run: make dev-setup
- name: Run linter
run: make lint
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod
cache-dependency-path: go.sum
- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download dependencies
run: make deps
- name: Build collector
run: make build
- name: Upload build artifacts
uses: actions/upload-artifact@v4
with:
name: collector-binary
path: bin/collector
retention-days: 7
+94
View File
@@ -0,0 +1,94 @@
# Makefile for langsmith-otel-proxy
.PHONY: help build test test-verbose clean lint fmt vet deps check-deps
# Default target
help: ## Show this help message
@echo "Available targets:"
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target>\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
##@ Build
build: ## Build the collector binary
@echo "Building collector..."
go build -o bin/collector ./cmd/collector
##@ Testing
test: ## Run all tests
@echo "Running tests..."
go test ./...
test-verbose: ## Run all tests with verbose output
@echo "Running tests with verbose output..."
go test -v ./...
test-coverage: ## Run tests with coverage report
@echo "Running tests with coverage..."
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html
@echo "Coverage report generated: coverage.html"
test-race: ## Run tests with race detection
@echo "Running tests with race detection..."
go test -race ./...
##@ Code Quality
lint: ## Run golangci-lint
@echo "Running linter..."
golangci-lint run
fmt: ## Format Go code
@echo "Formatting code..."
go fmt ./...
vet: ## Run go vet
@echo "Running go vet..."
go vet ./...
##@ Dependencies
deps: ## Download dependencies
@echo "Downloading dependencies..."
go mod download
tidy: ## Tidy up dependencies
@echo "Tidying dependencies..."
go mod tidy
check-deps: ## Verify dependencies
@echo "Verifying dependencies..."
go mod verify
##@ Cleanup
clean: ## Clean build artifacts
@echo "Cleaning up..."
rm -rf bin/
rm -f coverage.out coverage.html
##@ Development
dev-setup: deps ## Set up development environment
@echo "Setting up development environment..."
@if ! command -v golangci-lint >/dev/null 2>&1; then \
echo "Installing golangci-lint..."; \
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest; \
fi
run: build ## Build and run the collector
@echo "Running collector..."
./bin/collector
##@ CI/CD
ci-test: deps test vet ## Run CI tests (deps, test, vet)
@echo "CI tests completed successfully"
ci-test-race: deps test-race vet ## Run CI tests with race detection (for local development)
@echo "CI tests with race detection completed successfully"
ci-lint: deps lint ## Run CI linting
@echo "CI linting completed successfully"
# Check if required tools are installed
check-tools: ## Check if required development tools are installed
@echo "Checking required tools..."
@command -v go >/dev/null 2>&1 || { echo "Go is not installed"; exit 1; }
@echo "✓ Go is installed"
@command -v golangci-lint >/dev/null 2>&1 || echo "⚠ golangci-lint is not installed (run 'make dev-setup')"
@echo "Tool check completed"
+16 -3
View File
@@ -62,10 +62,23 @@ func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
log.Printf("shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Printf("graceful shutdown failed: %v", err)
log.Printf("HTTP server shutdown failed: %v", err)
}
close(ch)
if err := agg.Flush(ctx); err != nil {
log.Printf("failed to flush traces: %v", err)
}
agg.Stop()
log.Printf("shutdown complete")
}
+18 -3
View File
@@ -86,12 +86,13 @@ type Config struct {
type Aggregator struct {
ch chan *model.Run
cfg Config
up *uploader.Uploader
up uploader.UploaderInterface
cancel context.CancelFunc
filteredIDs sync.Map
flushCh chan struct{}
}
func New(up *uploader.Uploader, cfg Config, ch chan *model.Run) *Aggregator {
func New(up uploader.UploaderInterface, cfg Config, ch chan *model.Run) *Aggregator {
if cfg.GCInterval == 0 {
cfg.GCInterval = 2 * time.Minute
}
@@ -107,7 +108,7 @@ func New(up *uploader.Uploader, cfg Config, ch chan *model.Run) *Aggregator {
if cfg.FlushInterval == 0 {
cfg.FlushInterval = 1 * time.Second
}
return &Aggregator{up: up, cfg: cfg, ch: ch}
return &Aggregator{up: up, cfg: cfg, ch: ch, flushCh: make(chan struct{}, 1)}
}
func (a *Aggregator) Start() {
@@ -123,6 +124,17 @@ func (a *Aggregator) Stop() {
}
}
// force a flush of all pending runs
func (a *Aggregator) Flush(ctx context.Context) error {
select {
case a.flushCh <- struct{}{}:
default:
// Channel is full, flush already pending
}
return a.up.WaitForCompletion(ctx)
}
func (a *Aggregator) worker(ctx context.Context, ch <-chan *model.Run) {
sc := serializer.NewStreamingCompressor()
var scMu sync.Mutex
@@ -307,6 +319,9 @@ func (a *Aggregator) worker(ctx context.Context, ch <-chan *model.Run) {
case <-flushTicker.C:
flush()
case <-a.flushCh:
flush()
case <-gcTicker.C:
gc()
}
+456
View File
@@ -0,0 +1,456 @@
package aggregator
import (
"context"
"sync"
"testing"
"time"
"github.com/langchain-ai/langsmith-collector-proxy/internal/model"
"github.com/langchain-ai/langsmith-collector-proxy/internal/uploader"
"github.com/langchain-ai/langsmith-collector-proxy/internal/util"
)
// TestUploader for capturing sent batches
type TestUploader struct {
mu sync.Mutex
batches []uploader.Batch
}
func NewTestUploader() *TestUploader {
return &TestUploader{batches: make([]uploader.Batch, 0)}
}
func (tu *TestUploader) Send(ctx context.Context, b uploader.Batch) {
tu.mu.Lock()
defer tu.mu.Unlock()
tu.batches = append(tu.batches, b)
}
func (tu *TestUploader) WaitForCompletion(ctx context.Context) error {
return nil
}
func (tu *TestUploader) GetBatchCount() int {
tu.mu.Lock()
defer tu.mu.Unlock()
return len(tu.batches)
}
func (tu *TestUploader) Clear() {
tu.mu.Lock()
defer tu.mu.Unlock()
tu.batches = tu.batches[:0]
}
func TestBasicRunProcessing(t *testing.T) {
cfg := Config{
BatchSize: 1, // Process immediately
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send a simple root run
root := &model.Run{
ID: util.StringPtr("root"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("root_span"),
}
ch <- root
time.Sleep(100 * time.Millisecond)
// Should have sent one batch
if testUploader.GetBatchCount() != 1 {
t.Fatalf("Expected 1 batch, got %d", testUploader.GetBatchCount())
}
}
func TestParentChildOrdering(t *testing.T) {
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send parent first
parent := &model.Run{
ID: util.StringPtr("parent"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("parent_span"),
}
child := &model.Run{
ID: util.StringPtr("child"),
TraceID: util.StringPtr("trace1"),
ParentRunID: util.StringPtr("parent"),
Name: util.StringPtr("child_span"),
}
ch <- parent
time.Sleep(50 * time.Millisecond)
ch <- child
time.Sleep(50 * time.Millisecond)
// Should have sent two batches
if testUploader.GetBatchCount() != 2 {
t.Fatalf("Expected 2 batches, got %d", testUploader.GetBatchCount())
}
}
func TestChildBeforeParent(t *testing.T) {
cfg := Config{
BatchSize: 1, // Process immediately when parent arrives
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send child first
child := &model.Run{
ID: util.StringPtr("child"),
TraceID: util.StringPtr("trace1"),
ParentRunID: util.StringPtr("parent"),
Name: util.StringPtr("child_span"),
}
ch <- child
time.Sleep(50 * time.Millisecond)
// No batches should be sent yet (child waiting for parent)
if testUploader.GetBatchCount() != 0 {
t.Errorf("Expected 0 batches before parent, got %d", testUploader.GetBatchCount())
}
// Send parent
parent := &model.Run{
ID: util.StringPtr("parent"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("parent_span"),
}
ch <- parent
time.Sleep(100 * time.Millisecond)
// Now both should be processed
if testUploader.GetBatchCount() != 2 {
t.Fatalf("Expected 2 batches after parent, got %d", testUploader.GetBatchCount())
}
}
func TestFilteringBasic(t *testing.T) {
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: true},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send GenAI run (should be kept)
genaiRun := &model.Run{
ID: util.StringPtr("genai"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("gen_ai.completion"),
}
// Send HTTP run (should be filtered)
httpRun := &model.Run{
ID: util.StringPtr("http"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("http.request"),
}
ch <- genaiRun
ch <- httpRun
time.Sleep(100 * time.Millisecond)
// Only GenAI run should be processed
if testUploader.GetBatchCount() != 1 {
t.Fatalf("Expected 1 batch (GenAI only), got %d", testUploader.GetBatchCount())
}
// Verify filtered run is tracked
if _, exists := agg.filteredIDs.Load("http"); !exists {
t.Error("Expected HTTP run to be tracked as filtered")
}
// Verify GenAI run is not tracked as filtered
if _, exists := agg.filteredIDs.Load("genai"); exists {
t.Error("Expected GenAI run not to be tracked as filtered")
}
}
func TestFilteringWithReparenting(t *testing.T) {
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: true},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Create hierarchy: genai_root -> http_filtered -> genai_child
root := &model.Run{
ID: util.StringPtr("root"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("gen_ai.root"),
}
filtered := &model.Run{
ID: util.StringPtr("filtered"),
TraceID: util.StringPtr("trace1"),
ParentRunID: util.StringPtr("root"),
Name: util.StringPtr("http.request"), // Will be filtered
}
child := &model.Run{
ID: util.StringPtr("child"),
TraceID: util.StringPtr("trace1"),
ParentRunID: util.StringPtr("filtered"),
Name: util.StringPtr("gen_ai.child"),
}
ch <- root
time.Sleep(50 * time.Millisecond)
ch <- filtered
time.Sleep(50 * time.Millisecond)
ch <- child
time.Sleep(100 * time.Millisecond)
// Should have 2 batches (root and child, filtered one excluded)
if testUploader.GetBatchCount() != 2 {
t.Fatalf("Expected 2 batches after filtering, got %d", testUploader.GetBatchCount())
}
// Verify filtered run is tracked
if _, exists := agg.filteredIDs.Load("filtered"); !exists {
t.Error("Expected filtered run to be tracked")
}
}
func TestGarbageCollection(t *testing.T) {
cfg := Config{
BatchSize: 10,
FlushInterval: 10 * time.Second,
GCInterval: 50 * time.Millisecond, // Fast GC for testing
EntryTTL: 100 * time.Millisecond, // Short TTL for testing
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send orphan child (parent never arrives)
orphan := &model.Run{
ID: util.StringPtr("orphan"),
TraceID: util.StringPtr("trace1"),
ParentRunID: util.StringPtr("missing_parent"),
Name: util.StringPtr("orphan_span"),
}
ch <- orphan
time.Sleep(50 * time.Millisecond)
// No batches yet (waiting for parent)
if testUploader.GetBatchCount() != 0 {
t.Errorf("Expected 0 batches before GC, got %d", testUploader.GetBatchCount())
}
// Wait for GC to process orphan
time.Sleep(200 * time.Millisecond)
// Orphan should be processed after GC
if testUploader.GetBatchCount() != 1 {
t.Fatalf("Expected 1 batch after GC, got %d", testUploader.GetBatchCount())
}
}
func TestFlushFunctionality(t *testing.T) {
cfg := Config{
BatchSize: 10, // Large batch size to prevent auto-flushing
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send a run
run := &model.Run{
ID: util.StringPtr("test"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("test_span"),
}
ch <- run
time.Sleep(100 * time.Millisecond) // Give more time for processing
// No batches yet (batch size not reached)
initialCount := testUploader.GetBatchCount()
t.Logf("Batches before flush: %d", initialCount)
// Force flush
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := agg.Flush(ctx); err != nil {
t.Fatalf("Flush failed: %v", err)
}
time.Sleep(100 * time.Millisecond) // Give time for flush to complete
// Should have at least one batch after flush
finalCount := testUploader.GetBatchCount()
t.Logf("Batches after flush: %d", finalCount)
if finalCount == 0 {
t.Fatal("Expected at least 1 batch after flush, got 0")
}
}
func TestNilRunHandling(t *testing.T) {
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Send nil run - should be ignored
ch <- nil
time.Sleep(50 * time.Millisecond)
// Send valid run
validRun := &model.Run{
ID: util.StringPtr("valid"),
TraceID: util.StringPtr("trace1"),
Name: util.StringPtr("valid_span"),
}
ch <- validRun
time.Sleep(50 * time.Millisecond)
// Should have 1 batch (nil run ignored)
if testUploader.GetBatchCount() != 1 {
t.Fatalf("Expected 1 batch (nil should be ignored), got %d", testUploader.GetBatchCount())
}
}
func TestMultipleTracesInterleaved(t *testing.T) {
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{FilterNonGenAI: false},
}
ch := make(chan *model.Run, 20)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Create runs for multiple traces
trace1Runs := []*model.Run{
{ID: util.StringPtr("t1_root"), TraceID: util.StringPtr("trace1"), Name: util.StringPtr("t1_root")},
{ID: util.StringPtr("t1_child"), TraceID: util.StringPtr("trace1"), ParentRunID: util.StringPtr("t1_root"), Name: util.StringPtr("t1_child")},
}
trace2Runs := []*model.Run{
{ID: util.StringPtr("t2_root"), TraceID: util.StringPtr("trace2"), Name: util.StringPtr("t2_root")},
{ID: util.StringPtr("t2_child"), TraceID: util.StringPtr("trace2"), ParentRunID: util.StringPtr("t2_root"), Name: util.StringPtr("t2_child")},
}
// Interleave runs from different traces
ch <- trace1Runs[0]
ch <- trace2Runs[0]
ch <- trace1Runs[1]
ch <- trace2Runs[1]
time.Sleep(200 * time.Millisecond)
// Should have 4 batches (one for each run)
if testUploader.GetBatchCount() != 4 {
t.Fatalf("Expected 4 batches from 2 traces, got %d", testUploader.GetBatchCount())
}
}
func TestComplexFilteringScenario(t *testing.T) {
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{
FilterNonGenAI: false,
CustomFilter: func(run *model.Run) bool {
// Keep only runs with "keep" in their name
return run.Name != nil && (*run.Name == "keep_root" || *run.Name == "keep_child")
},
},
}
ch := make(chan *model.Run, 10)
testUploader := NewTestUploader()
agg := New(testUploader, cfg, ch)
agg.Start()
defer agg.Stop()
// Create hierarchy where middle span is filtered
// keep_root -> filter_middle -> keep_child
runs := []*model.Run{
{ID: util.StringPtr("root"), TraceID: util.StringPtr("trace1"), Name: util.StringPtr("keep_root")},
{ID: util.StringPtr("middle"), TraceID: util.StringPtr("trace1"), ParentRunID: util.StringPtr("root"), Name: util.StringPtr("filter_middle")},
{ID: util.StringPtr("child"), TraceID: util.StringPtr("trace1"), ParentRunID: util.StringPtr("middle"), Name: util.StringPtr("keep_child")},
}
for _, run := range runs {
ch <- run
time.Sleep(20 * time.Millisecond)
}
time.Sleep(100 * time.Millisecond)
// Should have 2 batches (root and child, middle filtered)
if testUploader.GetBatchCount() != 2 {
t.Fatalf("Expected 2 batches after custom filtering, got %d", testUploader.GetBatchCount())
}
// Verify filtered run is tracked
if _, exists := agg.filteredIDs.Load("middle"); !exists {
t.Error("Expected middle run to be tracked as filtered")
}
}
+169
View File
@@ -0,0 +1,169 @@
package aggregator
import (
"context"
"sync"
"testing"
"time"
"github.com/langchain-ai/langsmith-collector-proxy/internal/model"
"github.com/langchain-ai/langsmith-collector-proxy/internal/uploader"
"github.com/langchain-ai/langsmith-collector-proxy/internal/util"
)
type MockUploader struct {
mu sync.Mutex
batches []uploader.Batch
sendDelay time.Duration
wg sync.WaitGroup
}
func NewMockUploader() *MockUploader {
return &MockUploader{
batches: make([]uploader.Batch, 0),
}
}
func (m *MockUploader) Send(ctx context.Context, b uploader.Batch) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
// Simulate network delay if configured
if m.sendDelay > 0 {
select {
case <-time.After(m.sendDelay):
case <-ctx.Done():
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
m.batches = append(m.batches, b)
}()
}
func (m *MockUploader) WaitForCompletion(ctx context.Context) error {
done := make(chan struct{})
go func() {
m.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (m *MockUploader) GetBatches() []uploader.Batch {
m.mu.Lock()
defer m.mu.Unlock()
result := make([]uploader.Batch, len(m.batches))
copy(result, m.batches)
return result
}
func (m *MockUploader) SetSendDelay(delay time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.sendDelay = delay
}
func TestGracefulShutdownFlushesData(t *testing.T) {
mockUploader := NewMockUploader()
cfg := Config{
BatchSize: 10, // High batch size to prevent automatic flushing
FlushInterval: 10 * time.Second, // Long interval to prevent automatic flushing
FilterConfig: FilterConfig{
FilterNonGenAI: false, // Don't filter anything
},
}
ch := make(chan *model.Run, 10)
agg := New(mockUploader, cfg, ch)
agg.Start()
defer agg.Stop()
testRun := &model.Run{
ID: util.StringPtr("run1"),
TraceID: util.StringPtr("trace1"),
ParentRunID: nil, // Root run
Name: util.StringPtr("test.run1"),
}
ch <- testRun
time.Sleep(200 * time.Millisecond)
batches := mockUploader.GetBatches()
t.Logf("Batches before flush: %d", len(batches))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := agg.Flush(ctx); err != nil {
t.Fatalf("Flush failed: %v", err)
}
time.Sleep(100 * time.Millisecond)
batches = mockUploader.GetBatches()
t.Logf("Batches after flush: %d", len(batches))
if len(batches) == 0 {
t.Error("Expected at least 1 batch after flush, got 0")
}
}
func TestGracefulShutdownWithSlowUploader(t *testing.T) {
mockUploader := NewMockUploader()
mockUploader.SetSendDelay(300 * time.Millisecond)
cfg := Config{
BatchSize: 1,
FlushInterval: 10 * time.Second,
FilterConfig: FilterConfig{
FilterNonGenAI: false,
},
}
ch := make(chan *model.Run, 10)
agg := New(mockUploader, cfg, ch)
agg.Start()
defer agg.Stop()
testRun := &model.Run{
ID: util.StringPtr("run1"),
TraceID: util.StringPtr("trace1"),
ParentRunID: nil, // Root run
Name: util.StringPtr("test.run1"),
}
ch <- testRun
time.Sleep(50 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
start := time.Now()
if err := agg.Flush(ctx); err != nil {
t.Fatalf("Flush failed: %v", err)
}
elapsed := time.Since(start)
t.Logf("Flush took %v", elapsed)
if elapsed < 200*time.Millisecond {
t.Errorf("Flush completed too quickly (%v), expected to wait for slow upload", elapsed)
}
batches := mockUploader.GetBatches()
if len(batches) == 0 {
t.Error("Expected at least 1 batch after flush, got 0")
}
}
+15
View File
@@ -39,6 +39,11 @@ type Uploader struct {
client *http.Client
}
type UploaderInterface interface {
Send(ctx context.Context, b Batch)
WaitForCompletion(ctx context.Context) error
}
func New(cfg Config) *Uploader {
return &Uploader{
cfg: cfg,
@@ -60,6 +65,16 @@ func (u *Uploader) Send(ctx context.Context, b Batch) {
}()
}
// WaitForCompletion waits for all in-flight uploads to complete.
func (u *Uploader) WaitForCompletion(ctx context.Context) error {
if err := u.sem.Acquire(ctx, int64(u.cfg.InFlight)); err != nil {
return err
}
// Release all permits immediately
u.sem.Release(int64(u.cfg.InFlight))
return nil
}
func (u *Uploader) send(ctx context.Context, b Batch) {
url := u.cfg.BaseURL + "/runs/multipart"
var attempt int
+79
View File
@@ -0,0 +1,79 @@
package uploader
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestWaitForCompletion(t *testing.T) {
// Create a test server that responds slowly
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(200 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
uploader := New(Config{
BaseURL: server.URL,
APIKey: "test-key",
MaxAttempts: 1,
BackoffInitial: 100 * time.Millisecond,
BackoffMax: 1 * time.Second,
InFlight: 2,
})
batch1 := Batch{Data: []byte("test1"), Boundary: "boundary1"}
batch2 := Batch{Data: []byte("test2"), Boundary: "boundary2"}
ctx := context.Background()
uploader.Send(ctx, batch1)
uploader.Send(ctx, batch2)
start := time.Now()
if err := uploader.WaitForCompletion(ctx); err != nil {
t.Fatalf("WaitForCompletion failed: %v", err)
}
elapsed := time.Since(start)
// Should have waited for both uploads to complete
if elapsed < 150*time.Millisecond {
t.Errorf("WaitForCompletion completed too quickly (%v), expected to wait for uploads", elapsed)
}
}
func TestWaitForCompletionWithTimeout(t *testing.T) {
// Create a test server that responds very slowly
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(1 * time.Second)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
uploader := New(Config{
BaseURL: server.URL,
APIKey: "test-key",
MaxAttempts: 1,
BackoffInitial: 100 * time.Millisecond,
BackoffMax: 1 * time.Second,
InFlight: 1,
})
// Send a batch
batch := Batch{Data: []byte("test"), Boundary: "boundary"}
uploader.Send(context.Background(), batch)
// Wait for completion with a short timeout
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
err := uploader.WaitForCompletion(ctx)
if err == nil {
t.Error("Expected WaitForCompletion to timeout, but it succeeded")
}
if err != context.DeadlineExceeded {
t.Errorf("Expected context.DeadlineExceeded, got %v", err)
}
}