feat: extra metrics to livestream, bump to go1.24.6 (#36414)

This commit is contained in:
Paweł Szczur
2025-08-08 23:26:41 +02:00
committed by GitHub
parent 3175d1063d
commit 6a964438b4
6 changed files with 45 additions and 4 deletions

View File

@@ -16,7 +16,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2
with:
go-version: 1.22
go-version: 1.24
- name: Run tests
run: cd livestream && go test -v

View File

@@ -1,4 +1,4 @@
FROM golang:1.24.4 AS builder
FROM golang:1.24.6 AS builder
WORKDIR /code
COPY go.sum go.mod ./
RUN go mod download -x

View File

@@ -185,3 +185,7 @@ func (c *PostHogKafkaConsumer) Close() {
}
close(c.incoming)
}
func (c *PostHogKafkaConsumer) IncomingRatio() float64 {
return float64(len(c.incoming)) / float64(cap(c.incoming))
}

View File

@@ -5,6 +5,7 @@ import (
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/labstack/echo/v4"
"github.com/posthog/posthog/livestream/auth"
@@ -111,9 +112,12 @@ func StreamEventsHandler(log echo.Logger, subChan chan events.Subscription, filt
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
timeout := time.After(30 * time.Minute)
for {
select {
case <-timeout:
log.Debug("SSE connection to be terminated after timeout")
return nil
case <-c.Request().Context().Done():
log.Debugf("SSE client disconnected, ip: %v", c.RealIP())
return nil

View File

@@ -13,6 +13,7 @@ import (
"github.com/posthog/posthog/livestream/events"
"github.com/posthog/posthog/livestream/geo"
"github.com/posthog/posthog/livestream/handlers"
"github.com/posthog/posthog/livestream/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
@@ -53,6 +54,17 @@ func main() {
defer consumer.Close()
go consumer.Consume()
go func() {
for {
metrics.IncomingQueue.Set(consumer.IncomingRatio())
metrics.EventQueue.Set(float64(len(phEventChan)) / float64(cap(phEventChan)))
metrics.StatsQueue.Set(float64(len(statsChan)) / float64(cap(statsChan)))
metrics.SubQueue.Set(float64(len(subChan)) / float64(cap(subChan)))
metrics.UnSubQueue.Set(float64(len(unSubChan)) / float64(cap(unSubChan)))
time.Sleep(7127 * time.Millisecond)
}
}()
filter := events.NewFilter(subChan, unSubChan, phEventChan)
go filter.Run()
@@ -63,7 +75,7 @@ func main() {
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{
Level: 9, // Set compression level to maximum
Level: 9, // Set the compression level to maximum
}))
e.Use(echoprometheus.NewMiddlewareWithConfig(
echoprometheus.MiddlewareConfig{DoNotUseRequestPathFor404: true, Subsystem: "livestream"}))

View File

@@ -25,4 +25,25 @@ var (
Name: "livestream_ph_events_total",
Help: "The total number of handled PostHog events, less than or equal to consumed",
})
IncomingQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "livestream_incoming_queue_use_ratio",
Help: "How much of incoming queue is used",
})
EventQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "livestream_event_queue_use_ratio",
Help: "How much of parsed event queue is used",
})
StatsQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "livestream_stats_queue_use_ratio",
Help: "How much of stats queue is used",
})
SubQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "livestream_sub_queue_use_ratio",
Help: "How much of sub queue is used (a connection create subscription)",
})
UnSubQueue = promauto.NewGauge(prometheus.GaugeOpts{
Name: "livestream_unsub_queue_use_ratio",
Help: "How much of unsub queue is used (disconnecting removes subscription)",
})
)