feat: add metrics to livestream server (#27953)

This commit is contained in:
Paweł Szczur
2025-01-28 14:04:34 +01:00
committed by GitHub
parent cb81e9dbb3
commit a0945c093b
11 changed files with 212 additions and 137 deletions

View File

@@ -4,8 +4,26 @@
The start of something fresh.
Hog 3000 powers live event stream on PostHog: https://us.posthog.com/project/0/activity/live
## Endpoints
- `/` - dummy placeholder
- `/served` - total number of events and users recorded
- `/stats` - number of unique users (distinct id) on a page
- `/events` - stream consumed events to the requester, it's a done through
[Server Side Event](sse-moz), it supports extra query params adding filters:
- `eventType` - event type name,
- `distinctId` - only events with a given distinctId,
- `geo` - return only coordinates guessed based on IP,
- `/debug` - dummy html for SSE testing,
- `/debug/sse/` - backend for `/debug` generating a server side events,
- `/metrics` - exposes metrcis in Prometheus format
## Installing
One needs a IP -> (lat,lng) database:
```bash
curl https://mmdbcdn.posthog.net/ | brotli -d > mmdb.db
```
@@ -17,3 +35,5 @@ Run it!
```bash
go run .
```
[sse-moz]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

View File

@@ -71,20 +71,15 @@ func convertToResponsePostHogEvent(event PostHogEvent, teamId int) *ResponsePost
}
}
var personUUIDV5Namespace *uuid.UUID
var personUUIDV5Namespace = uuid.Must(uuid.FromString("932979b4-65c3-4424-8467-0b66ec27bc22"))
func uuidFromDistinctId(teamId int, distinctId string) string {
if teamId == 0 || distinctId == "" {
return ""
}
if personUUIDV5Namespace == nil {
uuid, _ := uuid.FromString("932979b4-65c3-4424-8467-0b66ec27bc22")
personUUIDV5Namespace = &uuid
}
input := fmt.Sprintf("%d:%s", teamId, distinctId)
return uuid.NewV5(*personUUIDV5Namespace, input).String()
return uuid.NewV5(personUUIDV5Namespace, input).String()
}
func removeSubscription(clientId string, subs []Subscription) []Subscription {

View File

@@ -10,14 +10,18 @@ require (
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jackc/pgx/v5 v5.7.2
github.com/labstack/echo-contrib v0.17.2
github.com/labstack/echo/v4 v4.13.3
github.com/oschwald/maxminddb-golang v1.13.1
github.com/prometheus/client_golang v1.20.5
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/docker-credential-helpers v0.8.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.2 // indirect
@@ -29,15 +33,18 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/magiconair/properties v1.8.9 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
@@ -56,6 +63,7 @@ require (
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.36.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect

View File

@@ -191,12 +191,16 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/labstack/echo-contrib v0.17.2 h1:K1zivqmtcC70X9VdBFdLomjPDEVHlrcAObqmuFj1c6w=
github.com/labstack/echo-contrib v0.17.2/go.mod h1:NeDh3PX7j/u+jR4iuDt1zHmWZSCz9c/p9mxXcDpyS8E=
github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaafY=
github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
@@ -272,14 +276,14 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.46.0 h1:doXzt5ybi1HBKpsZOL0sSkaNHJJqkyfEWZGGqqScV0Y=
github.com/prometheus/common v0.46.0/go.mod h1:Tp0qkxpb9Jsg54QMe+EAmqXkSV7Evdy1BTn+g2pa/hQ=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ=
github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
@@ -417,8 +421,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576/go.mod h1:1R3kvZ1dtP3+4p4d3G8uJ8rFk/fWlScl38vanWACI08=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8=
google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=

View File

@@ -7,7 +7,7 @@
<script>
// Example taken from: https://www.w3schools.com/html/html5_serversentevents.asp
if (typeof EventSource !== "undefined") {
const source = new EventSource("/sse");
const source = new EventSource("/debug/sse");
source.onmessage = function (event) {
console.log(event);
document.getElementById("result").innerHTML += event.data + "<br>";

View File

@@ -2,6 +2,7 @@ package main
import (
"encoding/json"
"errors"
"log"
"time"
@@ -72,20 +73,30 @@ func NewPostHogKafkaConsumer(brokers string, securityProtocol string, groupID st
}
func (c *PostHogKafkaConsumer) Consume() {
err := c.consumer.SubscribeTopics([]string{c.topic}, nil)
if err != nil {
if err := c.consumer.SubscribeTopics([]string{c.topic}, nil); err != nil {
sentry.CaptureException(err)
log.Fatalf("Failed to subscribe to topic: %v", err)
}
for {
msg, err := c.consumer.ReadMessage(-1)
msg, err := c.consumer.ReadMessage(15 * time.Second)
if err != nil {
var inErr kafka.Error
if errors.As(err, &inErr) {
if inErr.Code() == kafka.ErrTransport {
connectFailure.Inc()
} else if inErr.IsTimeout() {
timeoutConsume.Inc()
continue
}
}
log.Printf("Error consuming message: %v", err)
sentry.CaptureException(err)
continue
}
msgConsumed.Inc()
var wrapperMessage PostHogEventWrapper
err = json.Unmarshal(msg.Value, &wrapperMessage)
if err != nil {

View File

@@ -36,5 +36,6 @@ func (ts *Stats) keepStats(statsChan chan PostHogEvent) {
}
ts.Store[token].Add(event.DistinctId, "1")
ts.GlobalStore.Add(event.DistinctId, "1")
handledEvents.Inc()
}
}

View File

@@ -1,19 +1,16 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/getsentry/sentry-go"
"github.com/labstack/echo-contrib/echoprometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper"
)
@@ -24,7 +21,7 @@ func main() {
err := sentry.Init(sentry.ClientOptions{
Dsn: viper.GetString("sentry.dsn"),
Debug: isProd,
Debug: !isProd,
AttachStacktrace: true,
})
if err != nil {
@@ -96,102 +93,23 @@ func main() {
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{
Level: 9, // Set compression level to maximum
}))
e.Use(echoprometheus.NewMiddleware("livestream"))
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
AllowOrigins: []string{"*"},
AllowMethods: []string{http.MethodGet, http.MethodHead},
}))
e.File("/", "./index.html")
// Routes
e.GET("/", index)
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
e.GET("/served", servedHandler(stats))
e.GET("/stats", statsHandler(stats))
e.GET("/events", func(c echo.Context) error {
e.Logger.Printf("SSE client connected, ip: %v", c.RealIP())
var teamId string
eventType := c.QueryParam("eventType")
distinctId := c.QueryParam("distinctId")
geo := c.QueryParam("geo")
teamIdInt := 0
token := ""
geoOnly := false
if strings.ToLower(geo) == "true" || geo == "1" {
geoOnly = true
} else {
teamId = ""
authHeader := c.Request().Header.Get("Authorization")
if authHeader == "" {
return errors.New("authorization header is required")
}
claims, err := decodeAuthToken(authHeader)
if err != nil {
return err
}
teamId = strconv.Itoa(int(claims["team_id"].(float64)))
token = fmt.Sprint(claims["api_token"])
if teamId == "" {
return errors.New("teamId is required unless geo=true")
}
}
eventTypes := []string{}
if eventType != "" {
eventTypes = strings.Split(eventType, ",")
}
subscription := Subscription{
TeamId: teamIdInt,
Token: token,
ClientId: c.Response().Header().Get(echo.HeaderXRequestID),
DistinctId: distinctId,
Geo: geoOnly,
EventTypes: eventTypes,
EventChan: make(chan interface{}, 100),
ShouldClose: &atomic.Bool{},
}
subChan <- subscription
w := c.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
for {
select {
case <-c.Request().Context().Done():
e.Logger.Printf("SSE client disconnected, ip: %v", c.RealIP())
filter.unSubChan <- subscription
subscription.ShouldClose.Store(true)
return nil
case payload := <-subscription.EventChan:
jsonData, err := json.Marshal(payload)
if err != nil {
sentry.CaptureException(err)
log.Println("Error marshalling payload", err)
continue
}
event := Event{
Data: jsonData,
}
if err := event.WriteTo(w); err != nil {
return err
}
w.Flush()
}
}
})
e.GET("/events", streamEventsHandler(e.Logger, subChan, filter))
e.GET("/jwt", func(c echo.Context) error {
authHeader := c.Request().Header.Get("Authorization")
@@ -207,32 +125,35 @@ func main() {
return c.JSON(http.StatusOK, claims)
})
e.GET("/sse", func(c echo.Context) error {
e.Logger.Printf("Map client connected, ip: %v", c.RealIP())
if !isProd {
e.File("/debug", "./index.html")
e.GET("/debug/sse", func(c echo.Context) error {
e.Logger.Printf("Map client connected, ip: %v", c.RealIP())
w := c.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w := c.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Request().Context().Done():
e.Logger.Printf("SSE client disconnected, ip: %v", c.RealIP())
return nil
case <-ticker.C:
event := Event{
Data: []byte("ping: " + time.Now().Format(time.RFC3339Nano)),
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Request().Context().Done():
e.Logger.Printf("SSE client disconnected, ip: %v", c.RealIP())
return nil
case <-ticker.C:
event := Event{
Data: []byte("ping: " + time.Now().Format(time.RFC3339Nano)),
}
if err := event.WriteTo(w); err != nil {
return err
}
w.Flush()
}
if err := event.WriteTo(w); err != nil {
return err
}
w.Flush()
}
}
})
})
}
e.Logger.Fatal(e.Start(":8080"))
}

View File

@@ -1,4 +1,4 @@
// go:generate mockery
//go:generate mockery
package main
import (

25
livestream/metrics.go Normal file
View File

@@ -0,0 +1,25 @@
package main
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
msgConsumed = promauto.NewCounter(prometheus.CounterOpts{
Name: "livestream_kafka_consumed_total",
Help: "The total number of processed events",
})
timeoutConsume = promauto.NewCounter(prometheus.CounterOpts{
Name: "livestream_kafka_timeout_total",
Help: "The total number of timeout consume",
})
connectFailure = promauto.NewCounter(prometheus.CounterOpts{
Name: "livestream_kafka_connect_failure_total",
Help: "The total number of failed connect attempts",
})
handledEvents = promauto.NewCounter(prometheus.CounterOpts{
Name: "livestream_ph_events_total",
Help: "The total number of handled PostHog events, less or equal than consumed",
})
)

View File

@@ -1,10 +1,15 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync/atomic"
"github.com/getsentry/sentry-go"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/labstack/echo/v4"
)
@@ -60,3 +65,88 @@ func statsHandler(stats *Stats) func(c echo.Context) error {
return c.JSON(http.StatusOK, siteStats)
}
}
func streamEventsHandler(log echo.Logger, subChan chan Subscription, filter *Filter) func(c echo.Context) error {
return func(c echo.Context) error {
log.Debugf("SSE client connected, ip: %v", c.RealIP())
var teamId string
eventType := c.QueryParam("eventType")
distinctId := c.QueryParam("distinctId")
geo := c.QueryParam("geo")
teamIdInt := 0
token := ""
geoOnly := false
if strings.ToLower(geo) == "true" || geo == "1" {
geoOnly = true
} else {
authHeader := c.Request().Header.Get("Authorization")
if authHeader == "" {
return errors.New("authorization header is required")
}
claims, err := decodeAuthToken(authHeader)
if err != nil {
return err
}
teamId = strconv.Itoa(int(claims["team_id"].(float64)))
token = fmt.Sprint(claims["api_token"])
if teamId == "" {
return errors.New("teamId is required unless geo=true")
}
}
var eventTypes []string
if eventType != "" {
eventTypes = strings.Split(eventType, ",")
}
subscription := Subscription{
TeamId: teamIdInt,
Token: token,
ClientId: c.Response().Header().Get(echo.HeaderXRequestID),
DistinctId: distinctId,
Geo: geoOnly,
EventTypes: eventTypes,
EventChan: make(chan interface{}, 100),
ShouldClose: &atomic.Bool{},
}
subChan <- subscription
defer func() {
subscription.ShouldClose.Store(true)
filter.unSubChan <- subscription
}()
w := c.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
for {
select {
case <-c.Request().Context().Done():
log.Debugf("SSE client disconnected, ip: %v", c.RealIP())
return nil
case payload := <-subscription.EventChan:
jsonData, err := json.Marshal(payload)
if err != nil {
sentry.CaptureException(err)
log.Errorf("Error marshalling payload: %w", err)
continue
}
event := Event{
Data: jsonData,
}
if err := event.WriteTo(w); err != nil {
return err
}
w.Flush()
}
}
}
}