From af1f6dcd219910330e3263aff3778830a03cd62c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Szczur?= Date: Tue, 29 Apr 2025 12:05:18 +0200 Subject: [PATCH] feat: upgrade kafka-go, give some more buffers to reading, remove sentry (#31692) --- livestream/Dockerfile | 2 +- livestream/configs.go | 3 +-- livestream/db.go | 3 +-- livestream/go.mod | 2 +- livestream/go.sum | 2 ++ livestream/kafka.go | 25 +++++++++++++++---------- livestream/main.go | 14 +++++++------- livestream/metrics.go | 11 +++++++---- livestream/served.go | 3 +-- 9 files changed, 36 insertions(+), 29 deletions(-) diff --git a/livestream/Dockerfile b/livestream/Dockerfile index f5455917ea..6b04b63536 100644 --- a/livestream/Dockerfile +++ b/livestream/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.24.1 AS builder +FROM golang:1.24.2 AS builder WORKDIR /code COPY go.sum go.mod ./ RUN go mod download -x diff --git a/livestream/configs.go b/livestream/configs.go index be1f806101..08ecdb6d9a 100644 --- a/livestream/configs.go +++ b/livestream/configs.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/fsnotify/fsnotify" - "github.com/getsentry/sentry-go" "github.com/spf13/viper" ) @@ -19,7 +18,7 @@ func loadConfigs() { err := viper.ReadInConfig() if err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog log.Fatalf("fatal error config file: %v", err) } diff --git a/livestream/db.go b/livestream/db.go index f66e5610d8..00366f6529 100644 --- a/livestream/db.go +++ b/livestream/db.go @@ -3,7 +3,6 @@ package main import ( "context" - "github.com/getsentry/sentry-go" "github.com/jackc/pgx/v5" "github.com/spf13/viper" ) @@ -12,7 +11,7 @@ func getPGConn() (*pgx.Conn, error) { url := viper.GetString("postgres.url") conn, err := pgx.Connect(context.Background(), url) if err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog return nil, err } return conn, nil diff --git a/livestream/go.mod b/livestream/go.mod index 9141745fce..29f557ae33 100644 --- a/livestream/go.mod +++ b/livestream/go.mod @@ -3,7 +3,7 @@ module github.com/posthog/posthog/livestream go 1.24 require ( - github.com/confluentinc/confluent-kafka-go/v2 v2.8.0 + github.com/confluentinc/confluent-kafka-go/v2 v2.10.0 github.com/fsnotify/fsnotify v1.9.0 github.com/getsentry/sentry-go v0.32.0 github.com/gofrs/uuid/v5 v5.3.2 diff --git a/livestream/go.sum b/livestream/go.sum index 7347307b80..5d16d7a967 100644 --- a/livestream/go.sum +++ b/livestream/go.sum @@ -52,6 +52,8 @@ github.com/compose-spec/compose-go/v2 v2.1.3 h1:bD67uqLuL/XgkAK6ir3xZvNLFPxPScEi github.com/compose-spec/compose-go/v2 v2.1.3/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc= github.com/confluentinc/confluent-kafka-go/v2 v2.8.0 h1:0HlcSNWg4LpLA9nIjzUMIqWHI+w0S68UN7alXAc3TeA= github.com/confluentinc/confluent-kafka-go/v2 v2.8.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves= +github.com/confluentinc/confluent-kafka-go/v2 v2.10.0 h1:TK5CH5RbIj/aVfmJFEsDUT6vD2izac2zmA5BUfAOxC0= +github.com/confluentinc/confluent-kafka-go/v2 v2.10.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves= github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= github.com/containerd/containerd v1.7.18 h1:jqjZTQNfXGoEaZdW1WwPU0RqSn1Bm2Ay/KJPUuO8nao= diff --git a/livestream/kafka.go b/livestream/kafka.go index f9832d789d..cb599f1e88 100644 --- a/livestream/kafka.go +++ b/livestream/kafka.go @@ -3,11 +3,12 @@ package main import ( "encoding/json" "errors" + "github.com/prometheus/client_golang/prometheus" "log" + "strconv" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/getsentry/sentry-go" ) type PostHogEventWrapper struct { @@ -49,11 +50,14 @@ func NewPostHogKafkaConsumer( outgoingChan chan PostHogEvent, statsChan chan CountEvent) (*PostHogKafkaConsumer, error) { config := &kafka.ConfigMap{ - "bootstrap.servers": brokers, - "group.id": groupID, - "auto.offset.reset": "latest", - "enable.auto.commit": false, - "security.protocol": securityProtocol, + "bootstrap.servers": brokers, + "group.id": groupID, + "auto.offset.reset": "latest", + "enable.auto.commit": false, + "security.protocol": securityProtocol, + "fetch.message.max.bytes": 1_000_000_000, + "fetch.max.bytes": 1_000_000_000, + "queued.max.messages.kbytes": 1_000_000, } consumer, err := kafka.NewConsumer(config) @@ -72,7 +76,7 @@ func NewPostHogKafkaConsumer( func (c *PostHogKafkaConsumer) Consume() { if err := c.consumer.SubscribeTopics([]string{c.topic}, nil); err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog log.Fatalf("Failed to subscribe to topic: %v", err) } @@ -89,11 +93,11 @@ func (c *PostHogKafkaConsumer) Consume() { } } log.Printf("Error consuming message: %v", err) - sentry.CaptureException(err) + // TODO capture error to PostHog continue } - msgConsumed.Inc() + msgConsumed.With(prometheus.Labels{"partition": strconv.Itoa(int(msg.TopicPartition.Partition))}).Inc() phEvent := parse(c.geolocator, msg.Value) c.outgoingChan <- phEvent @@ -145,7 +149,8 @@ func parse(geolocator GeoLocator, kafkaMessage []byte) PostHogEvent { var err error phEvent.Lat, phEvent.Lng, err = geolocator.Lookup(ipStr) if err != nil && err.Error() != "invalid IP address" { // An invalid IP address is not an error on our side - sentry.CaptureException(err) + // TODO capture error to PostHog + _ = err } } diff --git a/livestream/main.go b/livestream/main.go index a3e6618103..2e8acc93d6 100644 --- a/livestream/main.go +++ b/livestream/main.go @@ -26,7 +26,7 @@ func main() { AttachStacktrace: true, }) if err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog log.Fatalf("sentry.Init: %s", err) } // Flush buffered events before the program terminates. @@ -56,16 +56,16 @@ func main() { geolocator, err := NewMaxMindGeoLocator(mmdb) if err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog log.Fatalf("Failed to open MMDB: %v", err) } stats := newStatsKeeper() - phEventChan := make(chan PostHogEvent, 1000) - statsChan := make(chan CountEvent, 1000) - subChan := make(chan Subscription, 1000) - unSubChan := make(chan Subscription, 1000) + phEventChan := make(chan PostHogEvent, 10000) + statsChan := make(chan CountEvent, 10000) + subChan := make(chan Subscription, 10000) + unSubChan := make(chan Subscription, 10000) go stats.keepStats(statsChan) @@ -75,7 +75,7 @@ func main() { } consumer, err := NewPostHogKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan) if err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog log.Fatalf("Failed to create Kafka consumer: %v", err) } defer consumer.Close() diff --git a/livestream/metrics.go b/livestream/metrics.go index 817c65c29e..6a8a5082a0 100644 --- a/livestream/metrics.go +++ b/livestream/metrics.go @@ -6,10 +6,13 @@ import ( ) var ( - msgConsumed = promauto.NewCounter(prometheus.CounterOpts{ - Name: "livestream_kafka_consumed_total", - Help: "The total number of processed events", - }) + msgConsumed = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "livestream_kafka_consumed_total", + Help: "The total number of processed events", + }, + []string{"partition"}, + ) timeoutConsume = promauto.NewCounter(prometheus.CounterOpts{ Name: "livestream_kafka_timeout_total", Help: "The total number of timeout consume", diff --git a/livestream/served.go b/livestream/served.go index ef6e095975..93c3d4b1e4 100644 --- a/livestream/served.go +++ b/livestream/served.go @@ -6,7 +6,6 @@ import ( "strings" "sync/atomic" - "github.com/getsentry/sentry-go" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/labstack/echo/v4" ) @@ -117,7 +116,7 @@ func streamEventsHandler(log echo.Logger, subChan chan Subscription, filter *Fil case payload := <-subscription.EventChan: jsonData, err := json.Marshal(payload) if err != nil { - sentry.CaptureException(err) + // TODO capture error to PostHog log.Errorf("Error marshalling payload: %w", err) continue }