feat: upgrade kafka-go, give some more buffers to reading, remove sentry (#31692)

This commit is contained in:
Paweł Szczur
2025-04-29 12:05:18 +02:00
committed by GitHub
parent 826f4705dc
commit af1f6dcd21
9 changed files with 36 additions and 29 deletions

View File

@@ -1,4 +1,4 @@
FROM golang:1.24.1 AS builder
FROM golang:1.24.2 AS builder
WORKDIR /code
COPY go.sum go.mod ./
RUN go mod download -x

View File

@@ -6,7 +6,6 @@ import (
"strings"
"github.com/fsnotify/fsnotify"
"github.com/getsentry/sentry-go"
"github.com/spf13/viper"
)
@@ -19,7 +18,7 @@ func loadConfigs() {
err := viper.ReadInConfig()
if err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
log.Fatalf("fatal error config file: %v", err)
}

View File

@@ -3,7 +3,6 @@ package main
import (
"context"
"github.com/getsentry/sentry-go"
"github.com/jackc/pgx/v5"
"github.com/spf13/viper"
)
@@ -12,7 +11,7 @@ func getPGConn() (*pgx.Conn, error) {
url := viper.GetString("postgres.url")
conn, err := pgx.Connect(context.Background(), url)
if err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
return nil, err
}
return conn, nil

View File

@@ -3,7 +3,7 @@ module github.com/posthog/posthog/livestream
go 1.24
require (
github.com/confluentinc/confluent-kafka-go/v2 v2.8.0
github.com/confluentinc/confluent-kafka-go/v2 v2.10.0
github.com/fsnotify/fsnotify v1.9.0
github.com/getsentry/sentry-go v0.32.0
github.com/gofrs/uuid/v5 v5.3.2

View File

@@ -52,6 +52,8 @@ github.com/compose-spec/compose-go/v2 v2.1.3 h1:bD67uqLuL/XgkAK6ir3xZvNLFPxPScEi
github.com/compose-spec/compose-go/v2 v2.1.3/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/confluentinc/confluent-kafka-go/v2 v2.8.0 h1:0HlcSNWg4LpLA9nIjzUMIqWHI+w0S68UN7alXAc3TeA=
github.com/confluentinc/confluent-kafka-go/v2 v2.8.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves=
github.com/confluentinc/confluent-kafka-go/v2 v2.10.0 h1:TK5CH5RbIj/aVfmJFEsDUT6vD2izac2zmA5BUfAOxC0=
github.com/confluentinc/confluent-kafka-go/v2 v2.10.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves=
github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro=
github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/containerd/containerd v1.7.18 h1:jqjZTQNfXGoEaZdW1WwPU0RqSn1Bm2Ay/KJPUuO8nao=

View File

@@ -3,11 +3,12 @@ package main
import (
"encoding/json"
"errors"
"github.com/prometheus/client_golang/prometheus"
"log"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/getsentry/sentry-go"
)
type PostHogEventWrapper struct {
@@ -49,11 +50,14 @@ func NewPostHogKafkaConsumer(
outgoingChan chan PostHogEvent, statsChan chan CountEvent) (*PostHogKafkaConsumer, error) {
config := &kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": groupID,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
"security.protocol": securityProtocol,
"bootstrap.servers": brokers,
"group.id": groupID,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
"security.protocol": securityProtocol,
"fetch.message.max.bytes": 1_000_000_000,
"fetch.max.bytes": 1_000_000_000,
"queued.max.messages.kbytes": 1_000_000,
}
consumer, err := kafka.NewConsumer(config)
@@ -72,7 +76,7 @@ func NewPostHogKafkaConsumer(
func (c *PostHogKafkaConsumer) Consume() {
if err := c.consumer.SubscribeTopics([]string{c.topic}, nil); err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
log.Fatalf("Failed to subscribe to topic: %v", err)
}
@@ -89,11 +93,11 @@ func (c *PostHogKafkaConsumer) Consume() {
}
}
log.Printf("Error consuming message: %v", err)
sentry.CaptureException(err)
// TODO capture error to PostHog
continue
}
msgConsumed.Inc()
msgConsumed.With(prometheus.Labels{"partition": strconv.Itoa(int(msg.TopicPartition.Partition))}).Inc()
phEvent := parse(c.geolocator, msg.Value)
c.outgoingChan <- phEvent
@@ -145,7 +149,8 @@ func parse(geolocator GeoLocator, kafkaMessage []byte) PostHogEvent {
var err error
phEvent.Lat, phEvent.Lng, err = geolocator.Lookup(ipStr)
if err != nil && err.Error() != "invalid IP address" { // An invalid IP address is not an error on our side
sentry.CaptureException(err)
// TODO capture error to PostHog
_ = err
}
}

View File

@@ -26,7 +26,7 @@ func main() {
AttachStacktrace: true,
})
if err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
log.Fatalf("sentry.Init: %s", err)
}
// Flush buffered events before the program terminates.
@@ -56,16 +56,16 @@ func main() {
geolocator, err := NewMaxMindGeoLocator(mmdb)
if err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
log.Fatalf("Failed to open MMDB: %v", err)
}
stats := newStatsKeeper()
phEventChan := make(chan PostHogEvent, 1000)
statsChan := make(chan CountEvent, 1000)
subChan := make(chan Subscription, 1000)
unSubChan := make(chan Subscription, 1000)
phEventChan := make(chan PostHogEvent, 10000)
statsChan := make(chan CountEvent, 10000)
subChan := make(chan Subscription, 10000)
unSubChan := make(chan Subscription, 10000)
go stats.keepStats(statsChan)
@@ -75,7 +75,7 @@ func main() {
}
consumer, err := NewPostHogKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan)
if err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
log.Fatalf("Failed to create Kafka consumer: %v", err)
}
defer consumer.Close()

View File

@@ -6,10 +6,13 @@ import (
)
var (
msgConsumed = promauto.NewCounter(prometheus.CounterOpts{
Name: "livestream_kafka_consumed_total",
Help: "The total number of processed events",
})
msgConsumed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "livestream_kafka_consumed_total",
Help: "The total number of processed events",
},
[]string{"partition"},
)
timeoutConsume = promauto.NewCounter(prometheus.CounterOpts{
Name: "livestream_kafka_timeout_total",
Help: "The total number of timeout consume",

View File

@@ -6,7 +6,6 @@ import (
"strings"
"sync/atomic"
"github.com/getsentry/sentry-go"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/labstack/echo/v4"
)
@@ -117,7 +116,7 @@ func streamEventsHandler(log echo.Logger, subChan chan Subscription, filter *Fil
case payload := <-subscription.EventChan:
jsonData, err := json.Marshal(payload)
if err != nil {
sentry.CaptureException(err)
// TODO capture error to PostHog
log.Errorf("Error marshalling payload: %w", err)
continue
}