From 1e0127d51a1da3a5f8bd55975bbcf5f16498df16 Mon Sep 17 00:00:00 2001 From: MickLesk Date: Mon, 9 Feb 2026 20:35:41 +0100 Subject: [PATCH] Initial commit: Telemetry service for ProxmoxVE/VED - Go microservice for anonymous telemetry collection - PocketBase integration for data storage - Rate limiting and caching support - SMTP alerts for failure rate monitoring - Built-in dashboard for visualization - Migration tool for data import - GDPR/DSGVO compliant (no personal data, no IP logging) --- .gitignore | 2 + Dockerfile | 52 ++ LICENSE | 2 +- README.md | 83 ++- alerts.go | 267 ++++++++ cache.go | 158 +++++ dashboard.go | 1487 ++++++++++++++++++++++++++++++++++++++++++ entrypoint.sh | 55 ++ go.mod | 10 + go.sum | 10 + migration/migrate.go | 366 +++++++++++ migration/migrate.sh | 67 ++ service.go | 1027 +++++++++++++++++++++++++++++ 13 files changed, 3583 insertions(+), 3 deletions(-) create mode 100644 Dockerfile create mode 100644 alerts.go create mode 100644 cache.go create mode 100644 dashboard.go create mode 100644 entrypoint.sh create mode 100644 go.mod create mode 100644 go.sum create mode 100644 migration/migrate.go create mode 100755 migration/migrate.sh create mode 100644 service.go diff --git a/.gitignore b/.gitignore index aaadf73..03cf14d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ *.dll *.so *.dylib +telemetry-service +migration/migrate # Test binary, built with `go test -c` *.test diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3d58795 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,52 @@ +FROM golang:1.25-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download 2>/dev/null || true +COPY . . +RUN go build -trimpath -ldflags "-s -w" -o /out/telemetry-service . +RUN go build -trimpath -ldflags "-s -w" -o /out/migrate ./migration/migrate.go + +FROM alpine:3.23 +RUN apk add --no-cache ca-certificates tzdata +WORKDIR /app +COPY --from=build /out/telemetry-service /app/telemetry-service +COPY --from=build /out/migrate /app/migrate +COPY entrypoint.sh /app/entrypoint.sh +RUN chmod +x /app/entrypoint.sh /app/migrate + +# Service config +ENV LISTEN_ADDR=":8080" +ENV MAX_BODY_BYTES="1024" +ENV RATE_LIMIT_RPM="60" +ENV RATE_BURST="20" +ENV UPSTREAM_TIMEOUT_MS="4000" +ENV ENABLE_REQUEST_LOGGING="false" + +# Cache config (optional) +ENV ENABLE_CACHE="true" +ENV CACHE_TTL_SECONDS="60" +ENV ENABLE_REDIS="false" +# ENV REDIS_URL="redis://localhost:6379" + +# Alert config (optional) +ENV ALERT_ENABLED="false" +# ENV SMTP_HOST="" +# ENV SMTP_PORT="587" +# ENV SMTP_USER="" +# ENV SMTP_PASSWORD="" +# ENV SMTP_FROM="telemetry@proxmoxved.local" +# ENV SMTP_TO="" +# ENV SMTP_USE_TLS="false" +ENV ALERT_FAILURE_THRESHOLD="20.0" +ENV ALERT_CHECK_INTERVAL_MIN="15" +ENV ALERT_COOLDOWN_MIN="60" + +# Migration config (optional) +ENV RUN_MIGRATION="false" +ENV MIGRATION_REQUIRED="false" +ENV MIGRATION_SOURCE_URL="https://api.htl-braunau.at/dev/data" + +EXPOSE 8080 +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s \ + CMD wget -q --spider http://localhost:8080/healthz || exit 1 +ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/LICENSE b/LICENSE index 886e535..1fae4b9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2026 Proxmox Helper Scripts (CE) +Copyright (c) 2026 Community Scripts Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index a818427..0211ab9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,81 @@ -# telemetry-service -Standalone Go service for ProxmoxVE and ProxmoxVE data, cache, and telemetry. +# Telemetry Service + +A standalone Go microservice that collects anonymous telemetry data from [ProxmoxVE](https://github.com/community-scripts/ProxmoxVE) and [ProxmoxVED](https://github.com/community-scripts/ProxmoxVED) script installations. + +## Overview + +This service acts as a telemetry ingestion layer between the bash installation scripts and a PocketBase backend. When users run scripts from the ProxmoxVE/ProxmoxVED repositories, optional anonymous usage data is sent here for aggregation and analysis. + +**What gets collected:** +- Script name and installation status (success/failed) +- Container/VM type and resource allocation (CPU, RAM, disk) +- OS type and version +- Proxmox VE version +- Anonymous session ID (randomly generated UUID) + +**What is NOT collected:** +- IP addresses (not logged, not stored) +- Hostnames or domain names +- User credentials or personal information +- Hardware identifiers (MAC addresses, serial numbers) +- Network configuration or internal IPs +- Any data that could identify a person or system + +**What this enables:** +- Understanding which scripts are most popular +- Identifying scripts with high failure rates +- Tracking resource allocation trends +- Improving script quality based on real-world data + +## Features + +- **Telemetry Ingestion** - Receives and validates telemetry data from bash scripts +- **PocketBase Integration** - Stores data in PocketBase collections +- **Rate Limiting** - Configurable per-IP rate limiting to prevent abuse +- **Caching** - In-memory or Redis-backed caching support +- **Email Alerts** - SMTP-based alerts when failure rates exceed thresholds +- **Dashboard** - Built-in HTML dashboard for telemetry visualization +- **Migration Tool** - Migrate data from external sources to PocketBase + +## Architecture + +``` +┌─────────────────┐ ┌───────────────────┐ ┌────────────┐ +│ Bash Scripts │────▶│ Telemetry Service │────▶│ PocketBase │ +│ (ProxmoxVE/VED) │ │ (this repo) │ │ Database │ +└─────────────────┘ └───────────────────┘ └────────────┘ +``` + +## Project Structure + +``` +├── service.go # Main service, HTTP handlers, rate limiting +├── cache.go # In-memory and Redis caching +├── alerts.go # SMTP alert system +├── dashboard.go # Dashboard HTML generation +├── migration/ +│ ├── migrate.go # Data migration tool +│ └── migrate.sh # Migration shell script +├── Dockerfile # Container build +├── entrypoint.sh # Container entrypoint with migration support +└── go.mod # Go module definition +``` + +## Related Projects + +- [ProxmoxVE](https://github.com/community-scripts/ProxmoxVE) - Proxmox VE Helper Scripts +- [ProxmoxVED](https://github.com/community-scripts/ProxmoxVED) - Proxmox VE Helper Scripts (Dev) + +## Privacy & Compliance + +This service is designed with privacy in mind and is **GDPR/DSGVO compliant**: + +- ✅ **No personal data** - Only anonymous technical metrics are collected +- ✅ **No IP logging** - Request logging is disabled by default, IPs are never stored +- ✅ **Transparent** - All collected fields are documented and the code is open source +- ✅ **No tracking** - Session IDs are randomly generated and cannot be linked to users +- ✅ **No third parties** - Data is only stored in our self-hosted PocketBase instance + +## License + +MIT License - see [LICENSE](LICENSE) file. diff --git a/alerts.go b/alerts.go new file mode 100644 index 0000000..dccbbd6 --- /dev/null +++ b/alerts.go @@ -0,0 +1,267 @@ +package main + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "log" + "net/smtp" + "strings" + "sync" + "time" +) + +// AlertConfig holds SMTP alert configuration +type AlertConfig struct { + Enabled bool + SMTPHost string + SMTPPort int + SMTPUser string + SMTPPassword string + SMTPFrom string + SMTPTo []string + UseTLS bool + FailureThreshold float64 // Alert when failure rate exceeds this (e.g., 20.0 = 20%) + CheckInterval time.Duration // How often to check + Cooldown time.Duration // Minimum time between alerts +} + +// Alerter handles alerting functionality +type Alerter struct { + cfg AlertConfig + lastAlertAt time.Time + mu sync.Mutex + pb *PBClient + lastStats alertStats + alertHistory []AlertEvent +} + +type alertStats struct { + successCount int + failedCount int + checkedAt time.Time +} + +// AlertEvent records an alert that was sent +type AlertEvent struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Message string `json:"message"` + FailureRate float64 `json:"failure_rate,omitempty"` +} + +// NewAlerter creates a new alerter instance +func NewAlerter(cfg AlertConfig, pb *PBClient) *Alerter { + return &Alerter{ + cfg: cfg, + pb: pb, + alertHistory: make([]AlertEvent, 0), + } +} + +// Start begins the alert monitoring loop +func (a *Alerter) Start() { + if !a.cfg.Enabled { + log.Println("INFO: alerting disabled") + return + } + + if a.cfg.SMTPHost == "" || len(a.cfg.SMTPTo) == 0 { + log.Println("WARN: alerting enabled but SMTP not configured") + return + } + + go a.monitorLoop() + log.Printf("INFO: alert monitoring started (threshold: %.1f%%, interval: %v)", a.cfg.FailureThreshold, a.cfg.CheckInterval) +} + +func (a *Alerter) monitorLoop() { + ticker := time.NewTicker(a.cfg.CheckInterval) + defer ticker.Stop() + + for range ticker.C { + a.checkAndAlert() + } +} + +func (a *Alerter) checkAndAlert() { + ctx, cancel := newTimeoutContext(10 * time.Second) + defer cancel() + + // Fetch last hour's data + data, err := a.pb.FetchDashboardData(ctx, 1) + if err != nil { + log.Printf("WARN: alert check failed: %v", err) + return + } + + // Calculate current failure rate + total := data.SuccessCount + data.FailedCount + if total < 10 { + // Not enough data to determine rate + return + } + + failureRate := float64(data.FailedCount) / float64(total) * 100 + + // Check if we should alert + if failureRate >= a.cfg.FailureThreshold { + a.maybeSendAlert(failureRate, data.FailedCount, total) + } +} + +func (a *Alerter) maybeSendAlert(rate float64, failed, total int) { + a.mu.Lock() + defer a.mu.Unlock() + + // Check cooldown + if time.Since(a.lastAlertAt) < a.cfg.Cooldown { + return + } + + // Send alert + subject := fmt.Sprintf("[ProxmoxVED Alert] High Failure Rate: %.1f%%", rate) + body := fmt.Sprintf(`ProxmoxVE Helper Scripts - Telemetry Alert + +⚠️ High installation failure rate detected! + +Current Statistics (last 24h): +- Failure Rate: %.1f%% +- Failed Installations: %d +- Total Installations: %d +- Threshold: %.1f%% + +Time: %s + +Please check the dashboard for more details. + +--- +This is an automated alert from the telemetry service. +`, rate, failed, total, a.cfg.FailureThreshold, time.Now().Format(time.RFC1123)) + + if err := a.sendEmail(subject, body); err != nil { + log.Printf("ERROR: failed to send alert email: %v", err) + return + } + + a.lastAlertAt = time.Now() + a.alertHistory = append(a.alertHistory, AlertEvent{ + Timestamp: time.Now(), + Type: "high_failure_rate", + Message: fmt.Sprintf("Failure rate %.1f%% exceeded threshold %.1f%%", rate, a.cfg.FailureThreshold), + FailureRate: rate, + }) + + // Keep only last 100 alerts + if len(a.alertHistory) > 100 { + a.alertHistory = a.alertHistory[len(a.alertHistory)-100:] + } + + log.Printf("ALERT: sent high failure rate alert (%.1f%%)", rate) +} + +func (a *Alerter) sendEmail(subject, body string) error { + // Build message + var msg bytes.Buffer + msg.WriteString(fmt.Sprintf("From: %s\r\n", a.cfg.SMTPFrom)) + msg.WriteString(fmt.Sprintf("To: %s\r\n", strings.Join(a.cfg.SMTPTo, ", "))) + msg.WriteString(fmt.Sprintf("Subject: %s\r\n", subject)) + msg.WriteString("MIME-Version: 1.0\r\n") + msg.WriteString("Content-Type: text/plain; charset=UTF-8\r\n") + msg.WriteString("\r\n") + msg.WriteString(body) + + addr := fmt.Sprintf("%s:%d", a.cfg.SMTPHost, a.cfg.SMTPPort) + + var auth smtp.Auth + if a.cfg.SMTPUser != "" && a.cfg.SMTPPassword != "" { + auth = smtp.PlainAuth("", a.cfg.SMTPUser, a.cfg.SMTPPassword, a.cfg.SMTPHost) + } + + if a.cfg.UseTLS { + // TLS connection + tlsConfig := &tls.Config{ + ServerName: a.cfg.SMTPHost, + } + + conn, err := tls.Dial("tcp", addr, tlsConfig) + if err != nil { + return fmt.Errorf("TLS dial failed: %w", err) + } + defer conn.Close() + + client, err := smtp.NewClient(conn, a.cfg.SMTPHost) + if err != nil { + return fmt.Errorf("SMTP client failed: %w", err) + } + defer client.Close() + + if auth != nil { + if err := client.Auth(auth); err != nil { + return fmt.Errorf("SMTP auth failed: %w", err) + } + } + + if err := client.Mail(a.cfg.SMTPFrom); err != nil { + return fmt.Errorf("SMTP MAIL failed: %w", err) + } + + for _, to := range a.cfg.SMTPTo { + if err := client.Rcpt(to); err != nil { + return fmt.Errorf("SMTP RCPT failed: %w", err) + } + } + + w, err := client.Data() + if err != nil { + return fmt.Errorf("SMTP DATA failed: %w", err) + } + + _, err = w.Write(msg.Bytes()) + if err != nil { + return fmt.Errorf("SMTP write failed: %w", err) + } + + return w.Close() + } + + // Non-TLS (STARTTLS) + return smtp.SendMail(addr, auth, a.cfg.SMTPFrom, a.cfg.SMTPTo, msg.Bytes()) +} + +// GetAlertHistory returns recent alert events +func (a *Alerter) GetAlertHistory() []AlertEvent { + a.mu.Lock() + defer a.mu.Unlock() + result := make([]AlertEvent, len(a.alertHistory)) + copy(result, a.alertHistory) + return result +} + +// TestAlert sends a test alert email +func (a *Alerter) TestAlert() error { + if !a.cfg.Enabled || a.cfg.SMTPHost == "" { + return fmt.Errorf("alerting not configured") + } + + subject := "[ProxmoxVED] Test Alert" + body := fmt.Sprintf(`This is a test alert from ProxmoxVE Helper Scripts telemetry service. + +If you received this email, your alert configuration is working correctly. + +Time: %s +SMTP Host: %s +Recipients: %s + +--- +This is an automated test message. +`, time.Now().Format(time.RFC1123), a.cfg.SMTPHost, strings.Join(a.cfg.SMTPTo, ", ")) + + return a.sendEmail(subject, body) +} + +// Helper for timeout context +func newTimeoutContext(d time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), d) +} diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..54cc5f5 --- /dev/null +++ b/cache.go @@ -0,0 +1,158 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "sync" + "time" + + "github.com/redis/go-redis/v9" +) + +// CacheConfig holds cache configuration +type CacheConfig struct { + RedisURL string + EnableRedis bool + DefaultTTL time.Duration +} + +// Cache provides caching functionality with Redis or in-memory fallback +type Cache struct { + redis *redis.Client + useRedis bool + defaultTTL time.Duration + + // In-memory fallback + mu sync.RWMutex + memData map[string]cacheEntry +} + +type cacheEntry struct { + data []byte + expiresAt time.Time +} + +// NewCache creates a new cache instance +func NewCache(cfg CacheConfig) *Cache { + c := &Cache{ + defaultTTL: cfg.DefaultTTL, + memData: make(map[string]cacheEntry), + } + + if cfg.EnableRedis && cfg.RedisURL != "" { + opts, err := redis.ParseURL(cfg.RedisURL) + if err != nil { + log.Printf("WARN: invalid redis URL, using in-memory cache: %v", err) + return c + } + + client := redis.NewClient(opts) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + log.Printf("WARN: redis connection failed, using in-memory cache: %v", err) + return c + } + + c.redis = client + c.useRedis = true + log.Printf("INFO: connected to Redis for caching") + } + + // Start cleanup goroutine for in-memory cache + if !c.useRedis { + go c.cleanupLoop() + } + + return c +} + +func (c *Cache) cleanupLoop() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + c.mu.Lock() + now := time.Now() + for k, v := range c.memData { + if now.After(v.expiresAt) { + delete(c.memData, k) + } + } + c.mu.Unlock() + } +} + +// Get retrieves a value from cache +func (c *Cache) Get(ctx context.Context, key string, dest interface{}) bool { + if c.useRedis { + data, err := c.redis.Get(ctx, key).Bytes() + if err != nil { + return false + } + return json.Unmarshal(data, dest) == nil + } + + // In-memory fallback + c.mu.RLock() + entry, ok := c.memData[key] + c.mu.RUnlock() + + if !ok || time.Now().After(entry.expiresAt) { + return false + } + + return json.Unmarshal(entry.data, dest) == nil +} + +// Set stores a value in cache +func (c *Cache) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error { + if ttl == 0 { + ttl = c.defaultTTL + } + + data, err := json.Marshal(value) + if err != nil { + return err + } + + if c.useRedis { + return c.redis.Set(ctx, key, data, ttl).Err() + } + + // In-memory fallback + c.mu.Lock() + c.memData[key] = cacheEntry{ + data: data, + expiresAt: time.Now().Add(ttl), + } + c.mu.Unlock() + + return nil +} + +// Delete removes a key from cache +func (c *Cache) Delete(ctx context.Context, key string) error { + if c.useRedis { + return c.redis.Del(ctx, key).Err() + } + + c.mu.Lock() + delete(c.memData, key) + c.mu.Unlock() + return nil +} + +// InvalidateDashboard clears dashboard cache +func (c *Cache) InvalidateDashboard(ctx context.Context) { + // Delete all dashboard cache keys + for days := 1; days <= 365; days++ { + _ = c.Delete(ctx, dashboardCacheKey(days)) + } +} + +func dashboardCacheKey(days int) string { + return "dashboard:" + string(rune(days)) +} diff --git a/dashboard.go b/dashboard.go new file mode 100644 index 0000000..ca27901 --- /dev/null +++ b/dashboard.go @@ -0,0 +1,1487 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" +) + +// DashboardData holds aggregated statistics for the dashboard +type DashboardData struct { + TotalInstalls int `json:"total_installs"` + SuccessCount int `json:"success_count"` + FailedCount int `json:"failed_count"` + InstallingCount int `json:"installing_count"` + SuccessRate float64 `json:"success_rate"` + TopApps []AppCount `json:"top_apps"` + OsDistribution []OsCount `json:"os_distribution"` + MethodStats []MethodCount `json:"method_stats"` + PveVersions []PveCount `json:"pve_versions"` + TypeStats []TypeCount `json:"type_stats"` + ErrorAnalysis []ErrorGroup `json:"error_analysis"` + FailedApps []AppFailure `json:"failed_apps"` + RecentRecords []TelemetryRecord `json:"recent_records"` + DailyStats []DailyStat `json:"daily_stats"` +} + +type AppCount struct { + App string `json:"app"` + Count int `json:"count"` +} + +type OsCount struct { + Os string `json:"os"` + Count int `json:"count"` +} + +type MethodCount struct { + Method string `json:"method"` + Count int `json:"count"` +} + +type PveCount struct { + Version string `json:"version"` + Count int `json:"count"` +} + +type TypeCount struct { + Type string `json:"type"` + Count int `json:"count"` +} + +type ErrorGroup struct { + Pattern string `json:"pattern"` + Count int `json:"count"` + Apps string `json:"apps"` // Comma-separated list of affected apps +} + +type AppFailure struct { + App string `json:"app"` + TotalCount int `json:"total_count"` + FailedCount int `json:"failed_count"` + FailureRate float64 `json:"failure_rate"` +} + +type DailyStat struct { + Date string `json:"date"` + Success int `json:"success"` + Failed int `json:"failed"` +} + +// FetchDashboardData retrieves aggregated data from PocketBase +func (p *PBClient) FetchDashboardData(ctx context.Context, days int) (*DashboardData, error) { + if err := p.ensureAuth(ctx); err != nil { + return nil, err + } + + data := &DashboardData{} + + // Calculate date filter + since := time.Now().AddDate(0, 0, -days).Format("2006-01-02 00:00:00") + filter := url.QueryEscape(fmt.Sprintf("created >= '%s'", since)) + + // Fetch all records for the period + records, err := p.fetchRecords(ctx, filter, 500) + if err != nil { + return nil, err + } + + // Aggregate statistics + appCounts := make(map[string]int) + appFailures := make(map[string]int) + osCounts := make(map[string]int) + methodCounts := make(map[string]int) + pveCounts := make(map[string]int) + typeCounts := make(map[string]int) + errorPatterns := make(map[string]map[string]bool) // pattern -> set of apps + dailySuccess := make(map[string]int) + dailyFailed := make(map[string]int) + + for _, r := range records { + data.TotalInstalls++ + + switch r.Status { + case "success": + data.SuccessCount++ + case "failed": + data.FailedCount++ + // Track failed apps + if r.NSAPP != "" { + appFailures[r.NSAPP]++ + } + // Group errors by pattern + if r.Error != "" { + pattern := normalizeError(r.Error) + if errorPatterns[pattern] == nil { + errorPatterns[pattern] = make(map[string]bool) + } + if r.NSAPP != "" { + errorPatterns[pattern][r.NSAPP] = true + } + } + case "installing": + data.InstallingCount++ + } + + // Count apps + if r.NSAPP != "" { + appCounts[r.NSAPP]++ + } + + // Count OS + if r.OsType != "" { + osCounts[r.OsType]++ + } + + // Count methods + if r.Method != "" { + methodCounts[r.Method]++ + } + + // Count PVE versions + if r.PveVer != "" { + pveCounts[r.PveVer]++ + } + + // Count types (LXC vs VM) + if r.Type != "" { + typeCounts[r.Type]++ + } + + // Daily stats (use Created field if available) + if r.Created != "" { + date := r.Created[:10] // "2026-02-09" + if r.Status == "success" { + dailySuccess[date]++ + } else if r.Status == "failed" { + dailyFailed[date]++ + } + } + } + + // Calculate success rate + completed := data.SuccessCount + data.FailedCount + if completed > 0 { + data.SuccessRate = float64(data.SuccessCount) / float64(completed) * 100 + } + + // Convert maps to sorted slices (top 10) + data.TopApps = topN(appCounts, 10) + data.OsDistribution = topNOs(osCounts, 10) + data.MethodStats = topNMethod(methodCounts, 10) + data.PveVersions = topNPve(pveCounts, 10) + data.TypeStats = topNType(typeCounts, 10) + + // Error analysis + data.ErrorAnalysis = buildErrorAnalysis(errorPatterns, 10) + + // Failed apps with failure rates + data.FailedApps = buildFailedApps(appCounts, appFailures, 10) + + // Daily stats for chart + data.DailyStats = buildDailyStats(dailySuccess, dailyFailed, days) + + // Recent records (last 20) + if len(records) > 20 { + data.RecentRecords = records[:20] + } else { + data.RecentRecords = records + } + + return data, nil +} + +// TelemetryRecord includes Created timestamp +type TelemetryRecord struct { + TelemetryOut + Created string `json:"created"` +} + +func (p *PBClient) fetchRecords(ctx context.Context, filter string, limit int) ([]TelemetryRecord, error) { + var allRecords []TelemetryRecord + page := 1 + perPage := 100 + + for { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + fmt.Sprintf("%s/api/collections/%s/records?filter=%s&sort=-created&page=%d&perPage=%d", + p.baseURL, p.targetColl, filter, page, perPage), + nil, + ) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return nil, err + } + + var result struct { + Items []TelemetryRecord `json:"items"` + TotalItems int `json:"totalItems"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + resp.Body.Close() + return nil, err + } + resp.Body.Close() + + allRecords = append(allRecords, result.Items...) + + if len(allRecords) >= limit || len(allRecords) >= result.TotalItems { + break + } + page++ + } + + return allRecords, nil +} + +func topN(m map[string]int, n int) []AppCount { + result := make([]AppCount, 0, len(m)) + for k, v := range m { + result = append(result, AppCount{App: k, Count: v}) + } + // Simple bubble sort for small datasets + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNOs(m map[string]int, n int) []OsCount { + result := make([]OsCount, 0, len(m)) + for k, v := range m { + result = append(result, OsCount{Os: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNMethod(m map[string]int, n int) []MethodCount { + result := make([]MethodCount, 0, len(m)) + for k, v := range m { + result = append(result, MethodCount{Method: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNPve(m map[string]int, n int) []PveCount { + result := make([]PveCount, 0, len(m)) + for k, v := range m { + result = append(result, PveCount{Version: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNType(m map[string]int, n int) []TypeCount { + result := make([]TypeCount, 0, len(m)) + for k, v := range m { + result = append(result, TypeCount{Type: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +// normalizeError simplifies error messages into patterns for grouping +func normalizeError(err string) string { + err = strings.TrimSpace(err) + if err == "" { + return "unknown" + } + + // Normalize common patterns + err = strings.ToLower(err) + + // Remove specific numbers, IPs, paths that vary + // Keep it simple for now - just truncate and normalize + if len(err) > 60 { + err = err[:60] + } + + // Common error pattern replacements + patterns := map[string]string{ + "connection refused": "connection refused", + "timeout": "timeout", + "no space left": "disk full", + "permission denied": "permission denied", + "not found": "not found", + "failed to download": "download failed", + "apt": "apt error", + "dpkg": "dpkg error", + "curl": "network error", + "wget": "network error", + "docker": "docker error", + "systemctl": "systemd error", + "service": "service error", + } + + for pattern, label := range patterns { + if strings.Contains(err, pattern) { + return label + } + } + + // If no pattern matches, return first 40 chars + if len(err) > 40 { + return err[:40] + "..." + } + return err +} + +func buildErrorAnalysis(patterns map[string]map[string]bool, n int) []ErrorGroup { + result := make([]ErrorGroup, 0, len(patterns)) + + for pattern, apps := range patterns { + appList := make([]string, 0, len(apps)) + for app := range apps { + appList = append(appList, app) + } + + // Limit app list display + appsStr := strings.Join(appList, ", ") + if len(appsStr) > 50 { + appsStr = appsStr[:47] + "..." + } + + result = append(result, ErrorGroup{ + Pattern: pattern, + Count: len(apps), // Number of unique apps with this error + Apps: appsStr, + }) + } + + // Sort by count descending + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + + if len(result) > n { + return result[:n] + } + return result +} + +func buildFailedApps(total, failed map[string]int, n int) []AppFailure { + result := make([]AppFailure, 0) + + for app, failCount := range failed { + totalCount := total[app] + if totalCount == 0 { + continue + } + + rate := float64(failCount) / float64(totalCount) * 100 + result = append(result, AppFailure{ + App: app, + TotalCount: totalCount, + FailedCount: failCount, + FailureRate: rate, + }) + } + + // Sort by failure rate descending + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].FailureRate > result[i].FailureRate { + result[i], result[j] = result[j], result[i] + } + } + } + + if len(result) > n { + return result[:n] + } + return result +} + +func buildDailyStats(success, failed map[string]int, days int) []DailyStat { + result := make([]DailyStat, 0, days) + for i := days - 1; i >= 0; i-- { + date := time.Now().AddDate(0, 0, -i).Format("2006-01-02") + result = append(result, DailyStat{ + Date: date, + Success: success[date], + Failed: failed[date], + }) + } + return result +} + +// DashboardHTML returns the embedded dashboard HTML +func DashboardHTML() string { + return ` + + + + + Telemetry Dashboard - ProxmoxVE Helper Scripts + + + + + + +
+

+ + + + + Telemetry Dashboard +

+
+ + + + + +
+
+ + + +
+
+
Total Installations
+
-
+
+
+
Successful
+
-
+
+
+
Failed
+
-
+
+
+
In Progress
+
-
+
+
+
Success Rate
+
-
+
+
+
LXC / VM
+
-
+
+
+ +
+

Proxmox VE Versions

+
+ Loading... +
+
+ +
+
+

Installations Over Time

+
+ +
+
+
+

Status Distribution

+
+ +
+
+
+ +
+
+

Top Applications

+
+ +
+
+
+

OS Distribution

+
+ +
+
+
+

Installation Method

+
+ +
+
+
+ +
+

+ + + + + + Error Analysis +

+
+ Loading... +
+
+ +
+

+ + + + + + Apps with Highest Failure Rates +

+
+ Loading... +
+
+ +
+

Recent Installations

+
+ + + +
+ + + + + + + + + + + + + + + + +
AppStatusOSTypeMethodResourcesExit CodeError
Loading...
+ +
+ + + + + +` +} diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..67c09b2 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,55 @@ +#!/bin/sh +set -e + +echo "=============================================" +echo " ProxmoxVED Telemetry Service" +echo "=============================================" + +# Map Coolify ENV names to migration script names +# Coolify uses PB_URL, PB_TARGET_COLLECTION +export POCKETBASE_URL="${POCKETBASE_URL:-$PB_URL}" +export POCKETBASE_COLLECTION="${POCKETBASE_COLLECTION:-$PB_TARGET_COLLECTION}" + +# Run migration if enabled +if [ "$RUN_MIGRATION" = "true" ]; then + echo "" + echo "🔄 Migration mode enabled" + echo " Source: $MIGRATION_SOURCE_URL" + echo " Target: $POCKETBASE_URL" + echo " Collection: $POCKETBASE_COLLECTION" + echo "" + + # Wait for PocketBase to be ready + echo "⏳ Waiting for PocketBase to be ready..." + RETRIES=30 + until wget -q --spider "$POCKETBASE_URL/api/health" 2>/dev/null; do + RETRIES=$((RETRIES - 1)) + if [ $RETRIES -le 0 ]; then + echo "❌ PocketBase not reachable after 30 attempts" + if [ "$MIGRATION_REQUIRED" = "true" ]; then + exit 1 + fi + echo "⚠️ Continuing without migration..." + break + fi + echo " Waiting... ($RETRIES attempts left)" + sleep 2 + done + + if wget -q --spider "$POCKETBASE_URL/api/health" 2>/dev/null; then + echo "✅ PocketBase is ready" + echo "" + echo "🚀 Starting migration..." + /app/migrate || { + if [ "$MIGRATION_REQUIRED" = "true" ]; then + echo "❌ Migration failed!" + exit 1 + fi + echo "⚠️ Migration failed, but continuing..." + } + echo "" + fi +fi + +echo "🚀 Starting telemetry service..." +exec /app/telemetry-service diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7c1021a --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/community-scripts/telemetry-service + +go 1.25.5 + +require github.com/redis/go-redis/v9 v9.7.0 + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f11d99f --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= diff --git a/migration/migrate.go b/migration/migrate.go new file mode 100644 index 0000000..7212c2d --- /dev/null +++ b/migration/migrate.go @@ -0,0 +1,366 @@ +// +build ignore + +// Migration script to import data from the old API to PocketBase +// Run with: go run migrate.go +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "time" +) + +const ( + defaultSourceAPI = "https://api.htl-braunau.at/dev/data" + defaultPBURL = "http://localhost:8090" + batchSize = 100 +) + +var ( + sourceAPI string + summaryAPI string + authToken string // PocketBase auth token +) + +// OldDataModel represents the data structure from the old API +type OldDataModel struct { + ID string `json:"id"` + CtType int `json:"ct_type"` + DiskSize int `json:"disk_size"` + CoreCount int `json:"core_count"` + RamSize int `json:"ram_size"` + OsType string `json:"os_type"` + OsVersion string `json:"os_version"` + DisableIP6 string `json:"disableip6"` + NsApp string `json:"nsapp"` + Method string `json:"method"` + CreatedAt string `json:"created_at"` + PveVersion string `json:"pve_version"` + Status string `json:"status"` + RandomID string `json:"random_id"` + Type string `json:"type"` + Error string `json:"error"` +} + +// PBRecord represents the PocketBase record format +type PBRecord struct { + CtType int `json:"ct_type"` + DiskSize int `json:"disk_size"` + CoreCount int `json:"core_count"` + RamSize int `json:"ram_size"` + OsType string `json:"os_type"` + OsVersion string `json:"os_version"` + DisableIP6 string `json:"disableip6"` + NsApp string `json:"nsapp"` + Method string `json:"method"` + PveVersion string `json:"pve_version"` + Status string `json:"status"` + RandomID string `json:"random_id"` + Type string `json:"type"` + Error string `json:"error"` + // created_at will be set automatically by PocketBase +} + +type Summary struct { + TotalEntries int `json:"total_entries"` +} + +func main() { + // Setup source URLs + baseURL := os.Getenv("MIGRATION_SOURCE_URL") + if baseURL == "" { + baseURL = defaultSourceAPI + } + sourceAPI = baseURL + "/paginated" + summaryAPI = baseURL + "/summary" + + // Support both POCKETBASE_URL and PB_URL (Coolify uses PB_URL) + pbURL := os.Getenv("POCKETBASE_URL") + if pbURL == "" { + pbURL = os.Getenv("PB_URL") + } + if pbURL == "" { + pbURL = defaultPBURL + } + + // Support both POCKETBASE_COLLECTION and PB_TARGET_COLLECTION + pbCollection := os.Getenv("POCKETBASE_COLLECTION") + if pbCollection == "" { + pbCollection = os.Getenv("PB_TARGET_COLLECTION") + } + if pbCollection == "" { + pbCollection = "_dev_telemetry_data" + } + + // Auth collection + authCollection := os.Getenv("PB_AUTH_COLLECTION") + if authCollection == "" { + authCollection = "_dev_telemetry_service" + } + + // Credentials + pbIdentity := os.Getenv("PB_IDENTITY") + pbPassword := os.Getenv("PB_PASSWORD") + + fmt.Println("===========================================") + fmt.Println(" Data Migration to PocketBase") + fmt.Println("===========================================") + fmt.Printf("Source API: %s\n", baseURL) + fmt.Printf("PocketBase URL: %s\n", pbURL) + fmt.Printf("Collection: %s\n", pbCollection) + fmt.Printf("Auth Collection: %s\n", authCollection) + fmt.Println("-------------------------------------------") + + // Authenticate with PocketBase + if pbIdentity != "" && pbPassword != "" { + fmt.Println("🔐 Authenticating with PocketBase...") + err := authenticate(pbURL, authCollection, pbIdentity, pbPassword) + if err != nil { + fmt.Printf("❌ Authentication failed: %v\n", err) + os.Exit(1) + } + fmt.Println("✅ Authentication successful") + } else { + fmt.Println("⚠️ No credentials provided, trying without auth...") + } + fmt.Println("-------------------------------------------") + + // Get total count + summary, err := getSummary() + if err != nil { + fmt.Printf("❌ Failed to get summary: %v\n", err) + os.Exit(1) + } + fmt.Printf("📊 Total entries to migrate: %d\n", summary.TotalEntries) + fmt.Println("-------------------------------------------") + + // Calculate pages + totalPages := (summary.TotalEntries + batchSize - 1) / batchSize + + var totalMigrated, totalFailed, totalSkipped int + + for page := 1; page <= totalPages; page++ { + fmt.Printf("📦 Fetching page %d/%d (items %d-%d)...\n", + page, totalPages, + (page-1)*batchSize+1, + min(page*batchSize, summary.TotalEntries)) + + data, err := fetchPage(page, batchSize) + if err != nil { + fmt.Printf(" ❌ Failed to fetch page %d: %v\n", page, err) + totalFailed += batchSize + continue + } + + for i, record := range data { + err := importRecord(pbURL, pbCollection, record) + if err != nil { + if isUniqueViolation(err) { + totalSkipped++ + continue + } + fmt.Printf(" ❌ Failed to import record %d: %v\n", (page-1)*batchSize+i+1, err) + totalFailed++ + continue + } + totalMigrated++ + } + + fmt.Printf(" ✅ Page %d complete (migrated: %d, skipped: %d, failed: %d)\n", + page, len(data), totalSkipped, totalFailed) + + // Small delay to avoid overwhelming the server + time.Sleep(100 * time.Millisecond) + } + + fmt.Println("===========================================") + fmt.Println(" Migration Complete") + fmt.Println("===========================================") + fmt.Printf("✅ Successfully migrated: %d\n", totalMigrated) + fmt.Printf("⏭️ Skipped (duplicates): %d\n", totalSkipped) + fmt.Printf("❌ Failed: %d\n", totalFailed) + fmt.Println("===========================================") +} + +func getSummary() (*Summary, error) { + resp, err := http.Get(summaryAPI) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var summary Summary + if err := json.NewDecoder(resp.Body).Decode(&summary); err != nil { + return nil, err + } + + return &summary, nil +} + +func authenticate(pbURL, authCollection, identity, password string) error { + body := map[string]string{ + "identity": identity, + "password": password, + } + jsonData, _ := json.Marshal(body) + + url := fmt.Sprintf("%s/api/collections/%s/auth-with-password", pbURL, authCollection) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + var result struct { + Token string `json:"token"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return err + } + if result.Token == "" { + return fmt.Errorf("no token in response") + } + + authToken = result.Token + return nil +} + +func fetchPage(page, limit int) ([]OldDataModel, error) { + url := fmt.Sprintf("%s?page=%d&limit=%d", sourceAPI, page, limit) + + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + var data []OldDataModel + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return nil, err + } + + return data, nil +} + +func importRecord(pbURL, collection string, old OldDataModel) error { + // Map status: "done" -> "success" + status := old.Status + switch status { + case "done": + status = "success" + case "installing", "failed", "unknown", "success": + // keep as-is + default: + status = "unknown" + } + + // Ensure ct_type is not 0 (required field) + ctType := old.CtType + if ctType == 0 { + ctType = 1 // default to unprivileged + } + + // Ensure type is set + recordType := old.Type + if recordType == "" { + recordType = "lxc" + } + + record := PBRecord{ + CtType: ctType, + DiskSize: old.DiskSize, + CoreCount: old.CoreCount, + RamSize: old.RamSize, + OsType: old.OsType, + OsVersion: old.OsVersion, + DisableIP6: old.DisableIP6, + NsApp: old.NsApp, + Method: old.Method, + PveVersion: old.PveVersion, + Status: status, + RandomID: old.RandomID, + Type: recordType, + Error: old.Error, + } + + jsonData, err := json.Marshal(record) + if err != nil { + return err + } + + url := fmt.Sprintf("%s/api/collections/%s/records", pbURL, collection) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + if authToken != "" { + req.Header.Set("Authorization", "Bearer "+authToken) + } + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +func isUniqueViolation(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return contains(errStr, "UNIQUE constraint failed") || + contains(errStr, "duplicate") || + contains(errStr, "already exists") || + contains(errStr, "validation_not_unique") +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/migration/migrate.sh b/migration/migrate.sh new file mode 100755 index 0000000..1da9e25 --- /dev/null +++ b/migration/migrate.sh @@ -0,0 +1,67 @@ +#!/bin/bash +# Migration script to import data from the old API to PocketBase +# Usage: ./migrate.sh [POCKETBASE_URL] [COLLECTION_NAME] +# +# Examples: +# ./migrate.sh # Uses defaults +# ./migrate.sh http://localhost:8090 # Custom PB URL +# ./migrate.sh http://localhost:8090 my_telemetry # Custom URL and collection + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Default values +POCKETBASE_URL="${1:-http://localhost:8090}" +POCKETBASE_COLLECTION="${2:-_dev_telemetry_data}" + +echo "=============================================" +echo " ProxmoxVED Data Migration Tool" +echo "=============================================" +echo "" +echo "This script will migrate telemetry data from:" +echo " Source: https://api.htl-braunau.at/dev/data" +echo " Target: $POCKETBASE_URL" +echo " Collection: $POCKETBASE_COLLECTION" +echo "" + +# Check if PocketBase is reachable +echo "🔍 Checking PocketBase connection..." +if ! curl -sf "$POCKETBASE_URL/api/health" > /dev/null 2>&1; then + echo "❌ Cannot reach PocketBase at $POCKETBASE_URL" + echo " Make sure PocketBase is running and the URL is correct." + exit 1 +fi +echo "✅ PocketBase is reachable" +echo "" + +# Check source API +echo "🔍 Checking source API..." +SUMMARY=$(curl -sf "https://api.htl-braunau.at/dev/data/summary" 2>/dev/null || echo "") +if [ -z "$SUMMARY" ]; then + echo "❌ Cannot reach source API" + exit 1 +fi + +TOTAL=$(echo "$SUMMARY" | grep -o '"total_entries":[0-9]*' | cut -d: -f2) +echo "✅ Source API is reachable ($TOTAL entries available)" +echo "" + +# Confirm migration +read -p "⚠️ Do you want to start the migration? [y/N] " -n 1 -r +echo "" +if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Migration cancelled." + exit 0 +fi + +echo "" +echo "Starting migration..." +echo "" + +# Run the Go migration script +cd "$SCRIPT_DIR" +POCKETBASE_URL="$POCKETBASE_URL" POCKETBASE_COLLECTION="$POCKETBASE_COLLECTION" go run migrate.go + +echo "" +echo "Migration complete!" diff --git a/service.go b/service.go new file mode 100644 index 0000000..5782979 --- /dev/null +++ b/service.go @@ -0,0 +1,1027 @@ +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "strings" + "sync" + "time" +) + +type Config struct { + ListenAddr string + TrustedProxiesCIDR []string + + // PocketBase + PBBaseURL string + PBAuthCollection string // "_dev_telemetry_service" + PBIdentity string // email + PBPassword string + PBTargetColl string // "_dev_telemetry_data" + + // Limits + MaxBodyBytes int64 + RateLimitRPM int // requests per minute per key + RateBurst int // burst tokens + RateKeyMode string // "ip" or "header" + RateKeyHeader string // e.g. "X-Telemetry-Key" + RequestTimeout time.Duration // upstream timeout + EnableReqLogging bool // default false (GDPR-friendly) + + // Cache + RedisURL string + EnableRedis bool + CacheTTL time.Duration + CacheEnabled bool + + // Alerts (SMTP) + AlertEnabled bool + SMTPHost string + SMTPPort int + SMTPUser string + SMTPPassword string + SMTPFrom string + SMTPTo []string + SMTPUseTLS bool + AlertFailureThreshold float64 + AlertCheckInterval time.Duration + AlertCooldown time.Duration +} + +// TelemetryIn matches payload from api.func (bash client) +type TelemetryIn struct { + // Required + RandomID string `json:"random_id"` // Session UUID + Type string `json:"type"` // "lxc" or "vm" + NSAPP string `json:"nsapp"` // Application name (e.g., "jellyfin") + Status string `json:"status"` // "installing", "success", "failed", "unknown" + + // Container/VM specs + CTType int `json:"ct_type,omitempty"` // 1=unprivileged, 2=privileged/VM + DiskSize int `json:"disk_size,omitempty"` // GB + CoreCount int `json:"core_count,omitempty"` // CPU cores + RAMSize int `json:"ram_size,omitempty"` // MB + + // System info + OsType string `json:"os_type,omitempty"` // "debian", "ubuntu", "alpine", etc. + OsVersion string `json:"os_version,omitempty"` // "12", "24.04", etc. + PveVer string `json:"pve_version,omitempty"` + + // Optional + Method string `json:"method,omitempty"` // "default", "advanced" + Error string `json:"error,omitempty"` // Error description (max 120 chars) + ExitCode int `json:"exit_code,omitempty"` // 0-255 +} + +// TelemetryOut is sent to PocketBase (matches _dev_telemetry_data collection) +type TelemetryOut struct { + RandomID string `json:"random_id"` + Type string `json:"type"` + NSAPP string `json:"nsapp"` + Status string `json:"status"` + CTType int `json:"ct_type,omitempty"` + DiskSize int `json:"disk_size,omitempty"` + CoreCount int `json:"core_count,omitempty"` + RAMSize int `json:"ram_size,omitempty"` + OsType string `json:"os_type,omitempty"` + OsVersion string `json:"os_version,omitempty"` + PveVer string `json:"pve_version,omitempty"` + Method string `json:"method,omitempty"` + Error string `json:"error,omitempty"` + ExitCode int `json:"exit_code,omitempty"` +} + +// TelemetryStatusUpdate contains only fields needed for status updates +type TelemetryStatusUpdate struct { + Status string `json:"status"` + Error string `json:"error,omitempty"` + ExitCode int `json:"exit_code"` +} + +type PBClient struct { + baseURL string + authCollection string + identity string + password string + targetColl string + + mu sync.Mutex + token string + exp time.Time + http *http.Client +} + +func NewPBClient(cfg Config) *PBClient { + return &PBClient{ + baseURL: strings.TrimRight(cfg.PBBaseURL, "/"), + authCollection: cfg.PBAuthCollection, + identity: cfg.PBIdentity, + password: cfg.PBPassword, + targetColl: cfg.PBTargetColl, + http: &http.Client{ + Timeout: cfg.RequestTimeout, + }, + } +} + +func (p *PBClient) ensureAuth(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + // refresh if token missing or expiring soon + if p.token != "" && time.Until(p.exp) > 60*time.Second { + return nil + } + + body := map[string]string{ + "identity": p.identity, + "password": p.password, + } + b, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + fmt.Sprintf("%s/api/collections/%s/auth-with-password", p.baseURL, p.authCollection), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 4<<10)) + return fmt.Errorf("pocketbase auth failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + + var out struct { + Token string `json:"token"` + // record omitted + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return err + } + if out.Token == "" { + return errors.New("pocketbase auth token missing") + } + + // PocketBase JWT exp can be parsed, but keep it simple: set 50 min + p.token = out.Token + p.exp = time.Now().Add(50 * time.Minute) + return nil +} + +// FindRecordByRandomID searches for an existing record by random_id +func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (string, error) { + if err := p.ensureAuth(ctx); err != nil { + return "", err + } + + // URL encode the filter + filter := fmt.Sprintf("random_id='%s'", randomID) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1", + p.baseURL, p.targetColl, filter), + nil, + ) + if err != nil { + return "", err + } + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("pocketbase search failed: %s", resp.Status) + } + + var result struct { + Items []struct { + ID string `json:"id"` + } `json:"items"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return "", err + } + + if len(result.Items) == 0 { + return "", nil // Not found + } + return result.Items[0].ID, nil +} + +// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record +func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, update TelemetryStatusUpdate) error { + if err := p.ensureAuth(ctx); err != nil { + return err + } + + b, _ := json.Marshal(update) + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, + fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, p.targetColl, recordID), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10)) + return fmt.Errorf("pocketbase update failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + return nil +} + +// FetchRecordsPaginated retrieves records with pagination and optional filters +func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType string) ([]TelemetryRecord, int, error) { + if err := p.ensureAuth(ctx); err != nil { + return nil, 0, err + } + + // Build filter + var filters []string + if status != "" { + filters = append(filters, fmt.Sprintf("status='%s'", status)) + } + if app != "" { + filters = append(filters, fmt.Sprintf("nsapp~'%s'", app)) + } + if osType != "" { + filters = append(filters, fmt.Sprintf("os_type='%s'", osType)) + } + + filterStr := "" + if len(filters) > 0 { + filterStr = "&filter=" + strings.Join(filters, "&&") + } + + reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=-created&page=%d&perPage=%d%s", + p.baseURL, p.targetColl, page, limit, filterStr) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + return nil, 0, err + } + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, 0, fmt.Errorf("pocketbase fetch failed: %s", resp.Status) + } + + var result struct { + Items []TelemetryRecord `json:"items"` + TotalItems int `json:"totalItems"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, 0, err + } + + return result.Items, result.TotalItems, nil +} + +// UpsertTelemetry handles both creation and updates intelligently +// - status="installing": Always creates a new record +// - status!="installing": Updates existing record (found by random_id) with status/error/exit_code only +func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) error { + // For "installing" status, always create new record + if payload.Status == "installing" { + return p.CreateTelemetry(ctx, payload) + } + + // For status updates (success/failed/unknown), find and update existing record + recordID, err := p.FindRecordByRandomID(ctx, payload.RandomID) + if err != nil { + // Search failed, log and return error + return fmt.Errorf("cannot find record to update: %w", err) + } + + if recordID == "" { + // Record not found - this shouldn't happen normally + // Create a full record as fallback + return p.CreateTelemetry(ctx, payload) + } + + // Update only status, error, and exit_code + update := TelemetryStatusUpdate{ + Status: payload.Status, + Error: payload.Error, + ExitCode: payload.ExitCode, + } + return p.UpdateTelemetryStatus(ctx, recordID, update) +} + +func (p *PBClient) CreateTelemetry(ctx context.Context, payload TelemetryOut) error { + if err := p.ensureAuth(ctx); err != nil { + return err + } + + b, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, p.targetColl), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10)) + return fmt.Errorf("pocketbase create failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + return nil +} + +// -------- Rate limiter (token bucket / minute window, simple) -------- + +type bucket struct { + tokens int + reset time.Time +} + +type RateLimiter struct { + mu sync.Mutex + buckets map[string]*bucket + rpm int + burst int + window time.Duration + cleanInt time.Duration +} + +func NewRateLimiter(rpm, burst int) *RateLimiter { + rl := &RateLimiter{ + buckets: make(map[string]*bucket), + rpm: rpm, + burst: burst, + window: time.Minute, + cleanInt: 5 * time.Minute, + } + go rl.cleanupLoop() + return rl +} + +func (r *RateLimiter) cleanupLoop() { + t := time.NewTicker(r.cleanInt) + defer t.Stop() + for range t.C { + now := time.Now() + r.mu.Lock() + for k, b := range r.buckets { + if now.After(b.reset.Add(2 * r.window)) { + delete(r.buckets, k) + } + } + r.mu.Unlock() + } +} + +func (r *RateLimiter) Allow(key string) bool { + if r.rpm <= 0 { + return true + } + now := time.Now() + r.mu.Lock() + defer r.mu.Unlock() + + b, ok := r.buckets[key] + if !ok || now.After(b.reset) { + r.buckets[key] = &bucket{tokens: min(r.burst, r.rpm), reset: now.Add(r.window)} + b = r.buckets[key] + } + if b.tokens <= 0 { + return false + } + b.tokens-- + return true +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// -------- Utility: GDPR-safe key extraction -------- + +type ProxyTrust struct { + nets []*net.IPNet +} + +func NewProxyTrust(cidrs []string) (*ProxyTrust, error) { + var nets []*net.IPNet + for _, c := range cidrs { + _, n, err := net.ParseCIDR(strings.TrimSpace(c)) + if err != nil { + return nil, err + } + nets = append(nets, n) + } + return &ProxyTrust{nets: nets}, nil +} + +func (pt *ProxyTrust) isTrusted(ip net.IP) bool { + for _, n := range pt.nets { + if n.Contains(ip) { + return true + } + } + return false +} + +func getClientIP(r *http.Request, pt *ProxyTrust) net.IP { + // If behind reverse proxy, trust X-Forwarded-For only if remote is trusted proxy. + host, _, _ := net.SplitHostPort(r.RemoteAddr) + remote := net.ParseIP(host) + if remote == nil { + return nil + } + + if pt != nil && pt.isTrusted(remote) { + xff := r.Header.Get("X-Forwarded-For") + if xff != "" { + parts := strings.Split(xff, ",") + ip := net.ParseIP(strings.TrimSpace(parts[0])) + if ip != nil { + return ip + } + } + } + return remote +} + +// -------- Validation (strict allowlist) -------- + +var ( + // Allowed values for 'type' field + allowedType = map[string]bool{"lxc": true, "vm": true} + + // Allowed values for 'status' field + allowedStatus = map[string]bool{"installing": true, "success": true, "failed": true, "unknown": true} + + // Allowed values for 'os_type' field + allowedOsType = map[string]bool{ + "debian": true, "ubuntu": true, "alpine": true, "devuan": true, + "fedora": true, "rocky": true, "alma": true, "centos": true, + "opensuse": true, "gentoo": true, "openeuler": true, + } +) + +func sanitizeShort(s string, max int) string { + s = strings.TrimSpace(s) + if s == "" { + return "" + } + // remove line breaks and high-risk chars + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + if len(s) > max { + s = s[:max] + } + return s +} + +func validate(in *TelemetryIn) error { + // Sanitize all string fields + in.RandomID = sanitizeShort(in.RandomID, 64) + in.Type = sanitizeShort(in.Type, 8) + in.NSAPP = sanitizeShort(in.NSAPP, 64) + in.Status = sanitizeShort(in.Status, 16) + in.OsType = sanitizeShort(in.OsType, 32) + in.OsVersion = sanitizeShort(in.OsVersion, 32) + in.PveVer = sanitizeShort(in.PveVer, 32) + in.Method = sanitizeShort(in.Method, 32) + + // IMPORTANT: "error" must be short and not contain identifiers/logs + in.Error = sanitizeShort(in.Error, 120) + + // Required fields for all requests + if in.RandomID == "" || in.Type == "" || in.NSAPP == "" || in.Status == "" { + return errors.New("missing required fields: random_id, type, nsapp, status") + } + + // Validate enums + if !allowedType[in.Type] { + return errors.New("invalid type (must be 'lxc' or 'vm')") + } + if !allowedStatus[in.Status] { + return errors.New("invalid status") + } + + // For status updates (not installing), skip numeric field validation + // These are only required for initial creation + isUpdate := in.Status != "installing" + + // os_type is optional but if provided must be valid + if in.OsType != "" && !allowedOsType[in.OsType] { + return errors.New("invalid os_type") + } + + // method is optional and flexible - just sanitized, no strict validation + // Values like "default", "advanced", "mydefaults-global", "mydefaults-app" are all valid + + // Validate numeric ranges (only strict for new records) + if !isUpdate { + if in.CTType < 0 || in.CTType > 2 { + return errors.New("invalid ct_type (must be 0, 1, or 2)") + } + } + if in.DiskSize < 0 || in.DiskSize > 100000 { + return errors.New("invalid disk_size") + } + if in.CoreCount < 0 || in.CoreCount > 256 { + return errors.New("invalid core_count") + } + if in.RAMSize < 0 || in.RAMSize > 1048576 { + return errors.New("invalid ram_size") + } + if in.ExitCode < 0 || in.ExitCode > 255 { + return errors.New("invalid exit_code") + } + + return nil +} + +// computeHash generates a hash for deduplication (GDPR-safe, no IP) +func computeHash(out TelemetryOut) string { + key := fmt.Sprintf("%s|%s|%s|%s|%d", + out.RandomID, out.NSAPP, out.Type, out.Status, out.ExitCode, + ) + sum := sha256.Sum256([]byte(key)) + return hex.EncodeToString(sum[:]) +} + +// -------- HTTP server -------- + +func main() { + cfg := Config{ + ListenAddr: env("LISTEN_ADDR", ":8080"), + TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")), + + PBBaseURL: mustEnv("PB_URL"), + PBAuthCollection: env("PB_AUTH_COLLECTION", "_dev_telemetry_service"), + PBIdentity: mustEnv("PB_IDENTITY"), + PBPassword: mustEnv("PB_PASSWORD"), + PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"), + + MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024), + RateLimitRPM: envInt("RATE_LIMIT_RPM", 60), + RateBurst: envInt("RATE_BURST", 20), + RateKeyMode: env("RATE_KEY_MODE", "ip"), // "ip" or "header" + RateKeyHeader: env("RATE_KEY_HEADER", "X-Telemetry-Key"), + RequestTimeout: time.Duration(envInt("UPSTREAM_TIMEOUT_MS", 4000)) * time.Millisecond, + EnableReqLogging: envBool("ENABLE_REQUEST_LOGGING", false), + + // Cache config + RedisURL: env("REDIS_URL", ""), + EnableRedis: envBool("ENABLE_REDIS", false), + CacheTTL: time.Duration(envInt("CACHE_TTL_SECONDS", 60)) * time.Second, + CacheEnabled: envBool("ENABLE_CACHE", true), + + // Alert config + AlertEnabled: envBool("ALERT_ENABLED", false), + SMTPHost: env("SMTP_HOST", ""), + SMTPPort: envInt("SMTP_PORT", 587), + SMTPUser: env("SMTP_USER", ""), + SMTPPassword: env("SMTP_PASSWORD", ""), + SMTPFrom: env("SMTP_FROM", "telemetry@proxmoxved.local"), + SMTPTo: splitCSV(env("SMTP_TO", "")), + SMTPUseTLS: envBool("SMTP_USE_TLS", false), + AlertFailureThreshold: envFloat("ALERT_FAILURE_THRESHOLD", 20.0), + AlertCheckInterval: time.Duration(envInt("ALERT_CHECK_INTERVAL_MIN", 15)) * time.Minute, + AlertCooldown: time.Duration(envInt("ALERT_COOLDOWN_MIN", 60)) * time.Minute, + } + + var pt *ProxyTrust + if strings.TrimSpace(env("TRUSTED_PROXIES_CIDR", "")) != "" { + p, err := NewProxyTrust(cfg.TrustedProxiesCIDR) + if err != nil { + log.Fatalf("invalid TRUSTED_PROXIES_CIDR: %v", err) + } + pt = p + } + + pb := NewPBClient(cfg) + rl := NewRateLimiter(cfg.RateLimitRPM, cfg.RateBurst) + + // Initialize cache + cache := NewCache(CacheConfig{ + RedisURL: cfg.RedisURL, + EnableRedis: cfg.EnableRedis, + DefaultTTL: cfg.CacheTTL, + }) + + // Initialize alerter + alerter := NewAlerter(AlertConfig{ + Enabled: cfg.AlertEnabled, + SMTPHost: cfg.SMTPHost, + SMTPPort: cfg.SMTPPort, + SMTPUser: cfg.SMTPUser, + SMTPPassword: cfg.SMTPPassword, + SMTPFrom: cfg.SMTPFrom, + SMTPTo: cfg.SMTPTo, + UseTLS: cfg.SMTPUseTLS, + FailureThreshold: cfg.AlertFailureThreshold, + CheckInterval: cfg.AlertCheckInterval, + Cooldown: cfg.AlertCooldown, + }, pb) + alerter.Start() + + mux := http.NewServeMux() + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + // Check PocketBase connectivity + ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second) + defer cancel() + + status := map[string]interface{}{ + "status": "ok", + "time": time.Now().UTC().Format(time.RFC3339), + } + + if err := pb.ensureAuth(ctx); err != nil { + status["status"] = "degraded" + status["pocketbase"] = "disconnected" + w.WriteHeader(503) + } else { + status["pocketbase"] = "connected" + w.WriteHeader(200) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) + }) + + // Dashboard HTML page - serve on root + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + _, _ = w.Write([]byte(DashboardHTML())) + }) + + // Redirect /dashboard to / for backwards compatibility + mux.HandleFunc("/dashboard", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/", http.StatusMovedPermanently) + }) + + // Prometheus-style metrics endpoint + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + data, err := pb.FetchDashboardData(ctx, 1) // Last 24h only for metrics + if err != nil { + http.Error(w, "failed to fetch metrics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintf(w, "# HELP telemetry_installs_total Total number of installations\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_total counter\n") + fmt.Fprintf(w, "telemetry_installs_total %d\n\n", data.TotalInstalls) + fmt.Fprintf(w, "# HELP telemetry_installs_success_total Successful installations\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_success_total counter\n") + fmt.Fprintf(w, "telemetry_installs_success_total %d\n\n", data.SuccessCount) + fmt.Fprintf(w, "# HELP telemetry_installs_failed_total Failed installations\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_failed_total counter\n") + fmt.Fprintf(w, "telemetry_installs_failed_total %d\n\n", data.FailedCount) + fmt.Fprintf(w, "# HELP telemetry_installs_pending Current installing count\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_pending gauge\n") + fmt.Fprintf(w, "telemetry_installs_pending %d\n\n", data.InstallingCount) + fmt.Fprintf(w, "# HELP telemetry_success_rate Success rate percentage\n") + fmt.Fprintf(w, "# TYPE telemetry_success_rate gauge\n") + fmt.Fprintf(w, "telemetry_success_rate %.2f\n", data.SuccessRate) + }) + + // Dashboard API endpoint (with caching) + mux.HandleFunc("/api/dashboard", func(w http.ResponseWriter, r *http.Request) { + days := 30 + if d := r.URL.Query().Get("days"); d != "" { + fmt.Sscanf(d, "%d", &days) + if days < 1 { + days = 1 + } + if days > 365 { + days = 365 + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + // Try cache first + cacheKey := fmt.Sprintf("dashboard:%d", days) + var data *DashboardData + if cfg.CacheEnabled && cache.Get(ctx, cacheKey, &data) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Cache", "HIT") + json.NewEncoder(w).Encode(data) + return + } + + data, err := pb.FetchDashboardData(ctx, days) + if err != nil { + log.Printf("dashboard fetch failed: %v", err) + http.Error(w, "failed to fetch data", http.StatusInternalServerError) + return + } + + // Cache the result + if cfg.CacheEnabled { + _ = cache.Set(ctx, cacheKey, data, cfg.CacheTTL) + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Cache", "MISS") + json.NewEncoder(w).Encode(data) + }) + + // Paginated records API + mux.HandleFunc("/api/records", func(w http.ResponseWriter, r *http.Request) { + page := 1 + limit := 50 + status := r.URL.Query().Get("status") + app := r.URL.Query().Get("app") + osType := r.URL.Query().Get("os") + + if p := r.URL.Query().Get("page"); p != "" { + fmt.Sscanf(p, "%d", &page) + if page < 1 { + page = 1 + } + } + if l := r.URL.Query().Get("limit"); l != "" { + fmt.Sscanf(l, "%d", &limit) + if limit < 1 { + limit = 1 + } + if limit > 100 { + limit = 100 + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType) + if err != nil { + log.Printf("records fetch failed: %v", err) + http.Error(w, "failed to fetch records", http.StatusInternalServerError) + return + } + + response := map[string]interface{}{ + "records": records, + "page": page, + "limit": limit, + "total": total, + "total_pages": (total + limit - 1) / limit, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + }) + + // Alert history and test endpoints + mux.HandleFunc("/api/alerts", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "enabled": cfg.AlertEnabled, + "history": alerter.GetAlertHistory(), + }) + }) + + mux.HandleFunc("/api/alerts/test", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + if err := alerter.TestAlert(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("test alert sent")) + }) + + mux.HandleFunc("/telemetry", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + // rate key: IP or header (header allows non-identifying keys, but header can be abused too) + var key string + switch cfg.RateKeyMode { + case "header": + key = strings.TrimSpace(r.Header.Get(cfg.RateKeyHeader)) + if key == "" { + key = "missing" + } + default: + ip := getClientIP(r, pt) + if ip == nil { + key = "unknown" + } else { + // GDPR: do NOT store IP anywhere permanent; use it only in-memory for RL key + key = ip.String() + } + } + if !rl.Allow(key) { + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + + r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodyBytes) + raw, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "invalid body", http.StatusBadRequest) + return + } + + // strict JSON decode (no unknown fields) + var in TelemetryIn + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + if err := dec.Decode(&in); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + if err := validate(&in); err != nil { + http.Error(w, "invalid payload", http.StatusBadRequest) + return + } + + // Map input to PocketBase schema + out := TelemetryOut{ + RandomID: in.RandomID, + Type: in.Type, + NSAPP: in.NSAPP, + Status: in.Status, + CTType: in.CTType, + DiskSize: in.DiskSize, + CoreCount: in.CoreCount, + RAMSize: in.RAMSize, + OsType: in.OsType, + OsVersion: in.OsVersion, + PveVer: in.PveVer, + Method: in.Method, + Error: in.Error, + ExitCode: in.ExitCode, + } + _ = computeHash(out) // For future deduplication + + ctx, cancel := context.WithTimeout(r.Context(), cfg.RequestTimeout) + defer cancel() + + // Upsert: Creates new record if random_id doesn't exist, updates if it does + if err := pb.UpsertTelemetry(ctx, out); err != nil { + // GDPR: don't log raw payload, don't log IPs; log only generic error + log.Printf("pocketbase write failed: %v", err) + http.Error(w, "upstream error", http.StatusBadGateway) + return + } + + if cfg.EnableReqLogging { + log.Printf("telemetry accepted nsapp=%s status=%s", out.NSAPP, out.Status) + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("accepted")) + }) + + srv := &http.Server{ + Addr: cfg.ListenAddr, + Handler: securityHeaders(mux), + ReadHeaderTimeout: 3 * time.Second, + } + + log.Printf("telemetry-ingest listening on %s", cfg.ListenAddr) + log.Fatal(srv.ListenAndServe()) +} + +func securityHeaders(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Minimal security headers (no cookies anyway) + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("X-Frame-Options", "DENY") + w.Header().Set("Referrer-Policy", "no-referrer") + next.ServeHTTP(w, r) + }) +} + +func env(k, def string) string { + v := os.Getenv(k) + if v == "" { + return def + } + return v +} +func mustEnv(k string) string { + v := os.Getenv(k) + if v == "" { + log.Fatalf("missing env %s", k) + } + return v +} +func envInt(k string, def int) int { + v := os.Getenv(k) + if v == "" { + return def + } + var i int + _, _ = fmt.Sscanf(v, "%d", &i) + if i == 0 && v != "0" { + return def + } + return i +} +func envInt64(k string, def int64) int64 { + v := os.Getenv(k) + if v == "" { + return def + } + var i int64 + _, _ = fmt.Sscanf(v, "%d", &i) + if i == 0 && v != "0" { + return def + } + return i +} +func envBool(k string, def bool) bool { + v := strings.ToLower(strings.TrimSpace(os.Getenv(k))) + if v == "" { + return def + } + return v == "1" || v == "true" || v == "yes" || v == "on" +} +func envFloat(k string, def float64) float64 { + v := os.Getenv(k) + if v == "" { + return def + } + var f float64 + _, _ = fmt.Sscanf(v, "%f", &f) + if f == 0 && v != "0" { + return def + } + return f +} +func splitCSV(s string) []string { + s = strings.TrimSpace(s) + if s == "" { + return nil + } + parts := strings.Split(s, ",") + var out []string + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +}