diff --git a/.gitignore b/.gitignore index aaadf73..03cf14d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ *.dll *.so *.dylib +telemetry-service +migration/migrate # Test binary, built with `go test -c` *.test diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3d58795 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,52 @@ +FROM golang:1.25-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download 2>/dev/null || true +COPY . . +RUN go build -trimpath -ldflags "-s -w" -o /out/telemetry-service . +RUN go build -trimpath -ldflags "-s -w" -o /out/migrate ./migration/migrate.go + +FROM alpine:3.23 +RUN apk add --no-cache ca-certificates tzdata +WORKDIR /app +COPY --from=build /out/telemetry-service /app/telemetry-service +COPY --from=build /out/migrate /app/migrate +COPY entrypoint.sh /app/entrypoint.sh +RUN chmod +x /app/entrypoint.sh /app/migrate + +# Service config +ENV LISTEN_ADDR=":8080" +ENV MAX_BODY_BYTES="1024" +ENV RATE_LIMIT_RPM="60" +ENV RATE_BURST="20" +ENV UPSTREAM_TIMEOUT_MS="4000" +ENV ENABLE_REQUEST_LOGGING="false" + +# Cache config (optional) +ENV ENABLE_CACHE="true" +ENV CACHE_TTL_SECONDS="60" +ENV ENABLE_REDIS="false" +# ENV REDIS_URL="redis://localhost:6379" + +# Alert config (optional) +ENV ALERT_ENABLED="false" +# ENV SMTP_HOST="" +# ENV SMTP_PORT="587" +# ENV SMTP_USER="" +# ENV SMTP_PASSWORD="" +# ENV SMTP_FROM="telemetry@proxmoxved.local" +# ENV SMTP_TO="" +# ENV SMTP_USE_TLS="false" +ENV ALERT_FAILURE_THRESHOLD="20.0" +ENV ALERT_CHECK_INTERVAL_MIN="15" +ENV ALERT_COOLDOWN_MIN="60" + +# Migration config (optional) +ENV RUN_MIGRATION="false" +ENV MIGRATION_REQUIRED="false" +ENV MIGRATION_SOURCE_URL="https://api.htl-braunau.at/dev/data" + +EXPOSE 8080 +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s \ + CMD wget -q --spider http://localhost:8080/healthz || exit 1 +ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/LICENSE b/LICENSE index 886e535..1fae4b9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2026 Proxmox Helper Scripts (CE) +Copyright (c) 2026 Community Scripts Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index a818427..0211ab9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,81 @@ -# telemetry-service -Standalone Go service for ProxmoxVE and ProxmoxVE data, cache, and telemetry. +# Telemetry Service + +A standalone Go microservice that collects anonymous telemetry data from [ProxmoxVE](https://github.com/community-scripts/ProxmoxVE) and [ProxmoxVED](https://github.com/community-scripts/ProxmoxVED) script installations. + +## Overview + +This service acts as a telemetry ingestion layer between the bash installation scripts and a PocketBase backend. When users run scripts from the ProxmoxVE/ProxmoxVED repositories, optional anonymous usage data is sent here for aggregation and analysis. + +**What gets collected:** +- Script name and installation status (success/failed) +- Container/VM type and resource allocation (CPU, RAM, disk) +- OS type and version +- Proxmox VE version +- Anonymous session ID (randomly generated UUID) + +**What is NOT collected:** +- IP addresses (not logged, not stored) +- Hostnames or domain names +- User credentials or personal information +- Hardware identifiers (MAC addresses, serial numbers) +- Network configuration or internal IPs +- Any data that could identify a person or system + +**What this enables:** +- Understanding which scripts are most popular +- Identifying scripts with high failure rates +- Tracking resource allocation trends +- Improving script quality based on real-world data + +## Features + +- **Telemetry Ingestion** - Receives and validates telemetry data from bash scripts +- **PocketBase Integration** - Stores data in PocketBase collections +- **Rate Limiting** - Configurable per-IP rate limiting to prevent abuse +- **Caching** - In-memory or Redis-backed caching support +- **Email Alerts** - SMTP-based alerts when failure rates exceed thresholds +- **Dashboard** - Built-in HTML dashboard for telemetry visualization +- **Migration Tool** - Migrate data from external sources to PocketBase + +## Architecture + +``` +┌─────────────────┐ ┌───────────────────┐ ┌────────────┐ +│ Bash Scripts │────▶│ Telemetry Service │────▶│ PocketBase │ +│ (ProxmoxVE/VED) │ │ (this repo) │ │ Database │ +└─────────────────┘ └───────────────────┘ └────────────┘ +``` + +## Project Structure + +``` +├── service.go # Main service, HTTP handlers, rate limiting +├── cache.go # In-memory and Redis caching +├── alerts.go # SMTP alert system +├── dashboard.go # Dashboard HTML generation +├── migration/ +│ ├── migrate.go # Data migration tool +│ └── migrate.sh # Migration shell script +├── Dockerfile # Container build +├── entrypoint.sh # Container entrypoint with migration support +└── go.mod # Go module definition +``` + +## Related Projects + +- [ProxmoxVE](https://github.com/community-scripts/ProxmoxVE) - Proxmox VE Helper Scripts +- [ProxmoxVED](https://github.com/community-scripts/ProxmoxVED) - Proxmox VE Helper Scripts (Dev) + +## Privacy & Compliance + +This service is designed with privacy in mind and is **GDPR/DSGVO compliant**: + +- ✅ **No personal data** - Only anonymous technical metrics are collected +- ✅ **No IP logging** - Request logging is disabled by default, IPs are never stored +- ✅ **Transparent** - All collected fields are documented and the code is open source +- ✅ **No tracking** - Session IDs are randomly generated and cannot be linked to users +- ✅ **No third parties** - Data is only stored in our self-hosted PocketBase instance + +## License + +MIT License - see [LICENSE](LICENSE) file. diff --git a/alerts.go b/alerts.go new file mode 100644 index 0000000..dccbbd6 --- /dev/null +++ b/alerts.go @@ -0,0 +1,267 @@ +package main + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "log" + "net/smtp" + "strings" + "sync" + "time" +) + +// AlertConfig holds SMTP alert configuration +type AlertConfig struct { + Enabled bool + SMTPHost string + SMTPPort int + SMTPUser string + SMTPPassword string + SMTPFrom string + SMTPTo []string + UseTLS bool + FailureThreshold float64 // Alert when failure rate exceeds this (e.g., 20.0 = 20%) + CheckInterval time.Duration // How often to check + Cooldown time.Duration // Minimum time between alerts +} + +// Alerter handles alerting functionality +type Alerter struct { + cfg AlertConfig + lastAlertAt time.Time + mu sync.Mutex + pb *PBClient + lastStats alertStats + alertHistory []AlertEvent +} + +type alertStats struct { + successCount int + failedCount int + checkedAt time.Time +} + +// AlertEvent records an alert that was sent +type AlertEvent struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Message string `json:"message"` + FailureRate float64 `json:"failure_rate,omitempty"` +} + +// NewAlerter creates a new alerter instance +func NewAlerter(cfg AlertConfig, pb *PBClient) *Alerter { + return &Alerter{ + cfg: cfg, + pb: pb, + alertHistory: make([]AlertEvent, 0), + } +} + +// Start begins the alert monitoring loop +func (a *Alerter) Start() { + if !a.cfg.Enabled { + log.Println("INFO: alerting disabled") + return + } + + if a.cfg.SMTPHost == "" || len(a.cfg.SMTPTo) == 0 { + log.Println("WARN: alerting enabled but SMTP not configured") + return + } + + go a.monitorLoop() + log.Printf("INFO: alert monitoring started (threshold: %.1f%%, interval: %v)", a.cfg.FailureThreshold, a.cfg.CheckInterval) +} + +func (a *Alerter) monitorLoop() { + ticker := time.NewTicker(a.cfg.CheckInterval) + defer ticker.Stop() + + for range ticker.C { + a.checkAndAlert() + } +} + +func (a *Alerter) checkAndAlert() { + ctx, cancel := newTimeoutContext(10 * time.Second) + defer cancel() + + // Fetch last hour's data + data, err := a.pb.FetchDashboardData(ctx, 1) + if err != nil { + log.Printf("WARN: alert check failed: %v", err) + return + } + + // Calculate current failure rate + total := data.SuccessCount + data.FailedCount + if total < 10 { + // Not enough data to determine rate + return + } + + failureRate := float64(data.FailedCount) / float64(total) * 100 + + // Check if we should alert + if failureRate >= a.cfg.FailureThreshold { + a.maybeSendAlert(failureRate, data.FailedCount, total) + } +} + +func (a *Alerter) maybeSendAlert(rate float64, failed, total int) { + a.mu.Lock() + defer a.mu.Unlock() + + // Check cooldown + if time.Since(a.lastAlertAt) < a.cfg.Cooldown { + return + } + + // Send alert + subject := fmt.Sprintf("[ProxmoxVED Alert] High Failure Rate: %.1f%%", rate) + body := fmt.Sprintf(`ProxmoxVE Helper Scripts - Telemetry Alert + +⚠️ High installation failure rate detected! + +Current Statistics (last 24h): +- Failure Rate: %.1f%% +- Failed Installations: %d +- Total Installations: %d +- Threshold: %.1f%% + +Time: %s + +Please check the dashboard for more details. + +--- +This is an automated alert from the telemetry service. +`, rate, failed, total, a.cfg.FailureThreshold, time.Now().Format(time.RFC1123)) + + if err := a.sendEmail(subject, body); err != nil { + log.Printf("ERROR: failed to send alert email: %v", err) + return + } + + a.lastAlertAt = time.Now() + a.alertHistory = append(a.alertHistory, AlertEvent{ + Timestamp: time.Now(), + Type: "high_failure_rate", + Message: fmt.Sprintf("Failure rate %.1f%% exceeded threshold %.1f%%", rate, a.cfg.FailureThreshold), + FailureRate: rate, + }) + + // Keep only last 100 alerts + if len(a.alertHistory) > 100 { + a.alertHistory = a.alertHistory[len(a.alertHistory)-100:] + } + + log.Printf("ALERT: sent high failure rate alert (%.1f%%)", rate) +} + +func (a *Alerter) sendEmail(subject, body string) error { + // Build message + var msg bytes.Buffer + msg.WriteString(fmt.Sprintf("From: %s\r\n", a.cfg.SMTPFrom)) + msg.WriteString(fmt.Sprintf("To: %s\r\n", strings.Join(a.cfg.SMTPTo, ", "))) + msg.WriteString(fmt.Sprintf("Subject: %s\r\n", subject)) + msg.WriteString("MIME-Version: 1.0\r\n") + msg.WriteString("Content-Type: text/plain; charset=UTF-8\r\n") + msg.WriteString("\r\n") + msg.WriteString(body) + + addr := fmt.Sprintf("%s:%d", a.cfg.SMTPHost, a.cfg.SMTPPort) + + var auth smtp.Auth + if a.cfg.SMTPUser != "" && a.cfg.SMTPPassword != "" { + auth = smtp.PlainAuth("", a.cfg.SMTPUser, a.cfg.SMTPPassword, a.cfg.SMTPHost) + } + + if a.cfg.UseTLS { + // TLS connection + tlsConfig := &tls.Config{ + ServerName: a.cfg.SMTPHost, + } + + conn, err := tls.Dial("tcp", addr, tlsConfig) + if err != nil { + return fmt.Errorf("TLS dial failed: %w", err) + } + defer conn.Close() + + client, err := smtp.NewClient(conn, a.cfg.SMTPHost) + if err != nil { + return fmt.Errorf("SMTP client failed: %w", err) + } + defer client.Close() + + if auth != nil { + if err := client.Auth(auth); err != nil { + return fmt.Errorf("SMTP auth failed: %w", err) + } + } + + if err := client.Mail(a.cfg.SMTPFrom); err != nil { + return fmt.Errorf("SMTP MAIL failed: %w", err) + } + + for _, to := range a.cfg.SMTPTo { + if err := client.Rcpt(to); err != nil { + return fmt.Errorf("SMTP RCPT failed: %w", err) + } + } + + w, err := client.Data() + if err != nil { + return fmt.Errorf("SMTP DATA failed: %w", err) + } + + _, err = w.Write(msg.Bytes()) + if err != nil { + return fmt.Errorf("SMTP write failed: %w", err) + } + + return w.Close() + } + + // Non-TLS (STARTTLS) + return smtp.SendMail(addr, auth, a.cfg.SMTPFrom, a.cfg.SMTPTo, msg.Bytes()) +} + +// GetAlertHistory returns recent alert events +func (a *Alerter) GetAlertHistory() []AlertEvent { + a.mu.Lock() + defer a.mu.Unlock() + result := make([]AlertEvent, len(a.alertHistory)) + copy(result, a.alertHistory) + return result +} + +// TestAlert sends a test alert email +func (a *Alerter) TestAlert() error { + if !a.cfg.Enabled || a.cfg.SMTPHost == "" { + return fmt.Errorf("alerting not configured") + } + + subject := "[ProxmoxVED] Test Alert" + body := fmt.Sprintf(`This is a test alert from ProxmoxVE Helper Scripts telemetry service. + +If you received this email, your alert configuration is working correctly. + +Time: %s +SMTP Host: %s +Recipients: %s + +--- +This is an automated test message. +`, time.Now().Format(time.RFC1123), a.cfg.SMTPHost, strings.Join(a.cfg.SMTPTo, ", ")) + + return a.sendEmail(subject, body) +} + +// Helper for timeout context +func newTimeoutContext(d time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), d) +} diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..54cc5f5 --- /dev/null +++ b/cache.go @@ -0,0 +1,158 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "sync" + "time" + + "github.com/redis/go-redis/v9" +) + +// CacheConfig holds cache configuration +type CacheConfig struct { + RedisURL string + EnableRedis bool + DefaultTTL time.Duration +} + +// Cache provides caching functionality with Redis or in-memory fallback +type Cache struct { + redis *redis.Client + useRedis bool + defaultTTL time.Duration + + // In-memory fallback + mu sync.RWMutex + memData map[string]cacheEntry +} + +type cacheEntry struct { + data []byte + expiresAt time.Time +} + +// NewCache creates a new cache instance +func NewCache(cfg CacheConfig) *Cache { + c := &Cache{ + defaultTTL: cfg.DefaultTTL, + memData: make(map[string]cacheEntry), + } + + if cfg.EnableRedis && cfg.RedisURL != "" { + opts, err := redis.ParseURL(cfg.RedisURL) + if err != nil { + log.Printf("WARN: invalid redis URL, using in-memory cache: %v", err) + return c + } + + client := redis.NewClient(opts) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + log.Printf("WARN: redis connection failed, using in-memory cache: %v", err) + return c + } + + c.redis = client + c.useRedis = true + log.Printf("INFO: connected to Redis for caching") + } + + // Start cleanup goroutine for in-memory cache + if !c.useRedis { + go c.cleanupLoop() + } + + return c +} + +func (c *Cache) cleanupLoop() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + c.mu.Lock() + now := time.Now() + for k, v := range c.memData { + if now.After(v.expiresAt) { + delete(c.memData, k) + } + } + c.mu.Unlock() + } +} + +// Get retrieves a value from cache +func (c *Cache) Get(ctx context.Context, key string, dest interface{}) bool { + if c.useRedis { + data, err := c.redis.Get(ctx, key).Bytes() + if err != nil { + return false + } + return json.Unmarshal(data, dest) == nil + } + + // In-memory fallback + c.mu.RLock() + entry, ok := c.memData[key] + c.mu.RUnlock() + + if !ok || time.Now().After(entry.expiresAt) { + return false + } + + return json.Unmarshal(entry.data, dest) == nil +} + +// Set stores a value in cache +func (c *Cache) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error { + if ttl == 0 { + ttl = c.defaultTTL + } + + data, err := json.Marshal(value) + if err != nil { + return err + } + + if c.useRedis { + return c.redis.Set(ctx, key, data, ttl).Err() + } + + // In-memory fallback + c.mu.Lock() + c.memData[key] = cacheEntry{ + data: data, + expiresAt: time.Now().Add(ttl), + } + c.mu.Unlock() + + return nil +} + +// Delete removes a key from cache +func (c *Cache) Delete(ctx context.Context, key string) error { + if c.useRedis { + return c.redis.Del(ctx, key).Err() + } + + c.mu.Lock() + delete(c.memData, key) + c.mu.Unlock() + return nil +} + +// InvalidateDashboard clears dashboard cache +func (c *Cache) InvalidateDashboard(ctx context.Context) { + // Delete all dashboard cache keys + for days := 1; days <= 365; days++ { + _ = c.Delete(ctx, dashboardCacheKey(days)) + } +} + +func dashboardCacheKey(days int) string { + return "dashboard:" + string(rune(days)) +} diff --git a/dashboard.go b/dashboard.go new file mode 100644 index 0000000..ca27901 --- /dev/null +++ b/dashboard.go @@ -0,0 +1,1487 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" +) + +// DashboardData holds aggregated statistics for the dashboard +type DashboardData struct { + TotalInstalls int `json:"total_installs"` + SuccessCount int `json:"success_count"` + FailedCount int `json:"failed_count"` + InstallingCount int `json:"installing_count"` + SuccessRate float64 `json:"success_rate"` + TopApps []AppCount `json:"top_apps"` + OsDistribution []OsCount `json:"os_distribution"` + MethodStats []MethodCount `json:"method_stats"` + PveVersions []PveCount `json:"pve_versions"` + TypeStats []TypeCount `json:"type_stats"` + ErrorAnalysis []ErrorGroup `json:"error_analysis"` + FailedApps []AppFailure `json:"failed_apps"` + RecentRecords []TelemetryRecord `json:"recent_records"` + DailyStats []DailyStat `json:"daily_stats"` +} + +type AppCount struct { + App string `json:"app"` + Count int `json:"count"` +} + +type OsCount struct { + Os string `json:"os"` + Count int `json:"count"` +} + +type MethodCount struct { + Method string `json:"method"` + Count int `json:"count"` +} + +type PveCount struct { + Version string `json:"version"` + Count int `json:"count"` +} + +type TypeCount struct { + Type string `json:"type"` + Count int `json:"count"` +} + +type ErrorGroup struct { + Pattern string `json:"pattern"` + Count int `json:"count"` + Apps string `json:"apps"` // Comma-separated list of affected apps +} + +type AppFailure struct { + App string `json:"app"` + TotalCount int `json:"total_count"` + FailedCount int `json:"failed_count"` + FailureRate float64 `json:"failure_rate"` +} + +type DailyStat struct { + Date string `json:"date"` + Success int `json:"success"` + Failed int `json:"failed"` +} + +// FetchDashboardData retrieves aggregated data from PocketBase +func (p *PBClient) FetchDashboardData(ctx context.Context, days int) (*DashboardData, error) { + if err := p.ensureAuth(ctx); err != nil { + return nil, err + } + + data := &DashboardData{} + + // Calculate date filter + since := time.Now().AddDate(0, 0, -days).Format("2006-01-02 00:00:00") + filter := url.QueryEscape(fmt.Sprintf("created >= '%s'", since)) + + // Fetch all records for the period + records, err := p.fetchRecords(ctx, filter, 500) + if err != nil { + return nil, err + } + + // Aggregate statistics + appCounts := make(map[string]int) + appFailures := make(map[string]int) + osCounts := make(map[string]int) + methodCounts := make(map[string]int) + pveCounts := make(map[string]int) + typeCounts := make(map[string]int) + errorPatterns := make(map[string]map[string]bool) // pattern -> set of apps + dailySuccess := make(map[string]int) + dailyFailed := make(map[string]int) + + for _, r := range records { + data.TotalInstalls++ + + switch r.Status { + case "success": + data.SuccessCount++ + case "failed": + data.FailedCount++ + // Track failed apps + if r.NSAPP != "" { + appFailures[r.NSAPP]++ + } + // Group errors by pattern + if r.Error != "" { + pattern := normalizeError(r.Error) + if errorPatterns[pattern] == nil { + errorPatterns[pattern] = make(map[string]bool) + } + if r.NSAPP != "" { + errorPatterns[pattern][r.NSAPP] = true + } + } + case "installing": + data.InstallingCount++ + } + + // Count apps + if r.NSAPP != "" { + appCounts[r.NSAPP]++ + } + + // Count OS + if r.OsType != "" { + osCounts[r.OsType]++ + } + + // Count methods + if r.Method != "" { + methodCounts[r.Method]++ + } + + // Count PVE versions + if r.PveVer != "" { + pveCounts[r.PveVer]++ + } + + // Count types (LXC vs VM) + if r.Type != "" { + typeCounts[r.Type]++ + } + + // Daily stats (use Created field if available) + if r.Created != "" { + date := r.Created[:10] // "2026-02-09" + if r.Status == "success" { + dailySuccess[date]++ + } else if r.Status == "failed" { + dailyFailed[date]++ + } + } + } + + // Calculate success rate + completed := data.SuccessCount + data.FailedCount + if completed > 0 { + data.SuccessRate = float64(data.SuccessCount) / float64(completed) * 100 + } + + // Convert maps to sorted slices (top 10) + data.TopApps = topN(appCounts, 10) + data.OsDistribution = topNOs(osCounts, 10) + data.MethodStats = topNMethod(methodCounts, 10) + data.PveVersions = topNPve(pveCounts, 10) + data.TypeStats = topNType(typeCounts, 10) + + // Error analysis + data.ErrorAnalysis = buildErrorAnalysis(errorPatterns, 10) + + // Failed apps with failure rates + data.FailedApps = buildFailedApps(appCounts, appFailures, 10) + + // Daily stats for chart + data.DailyStats = buildDailyStats(dailySuccess, dailyFailed, days) + + // Recent records (last 20) + if len(records) > 20 { + data.RecentRecords = records[:20] + } else { + data.RecentRecords = records + } + + return data, nil +} + +// TelemetryRecord includes Created timestamp +type TelemetryRecord struct { + TelemetryOut + Created string `json:"created"` +} + +func (p *PBClient) fetchRecords(ctx context.Context, filter string, limit int) ([]TelemetryRecord, error) { + var allRecords []TelemetryRecord + page := 1 + perPage := 100 + + for { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + fmt.Sprintf("%s/api/collections/%s/records?filter=%s&sort=-created&page=%d&perPage=%d", + p.baseURL, p.targetColl, filter, page, perPage), + nil, + ) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return nil, err + } + + var result struct { + Items []TelemetryRecord `json:"items"` + TotalItems int `json:"totalItems"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + resp.Body.Close() + return nil, err + } + resp.Body.Close() + + allRecords = append(allRecords, result.Items...) + + if len(allRecords) >= limit || len(allRecords) >= result.TotalItems { + break + } + page++ + } + + return allRecords, nil +} + +func topN(m map[string]int, n int) []AppCount { + result := make([]AppCount, 0, len(m)) + for k, v := range m { + result = append(result, AppCount{App: k, Count: v}) + } + // Simple bubble sort for small datasets + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNOs(m map[string]int, n int) []OsCount { + result := make([]OsCount, 0, len(m)) + for k, v := range m { + result = append(result, OsCount{Os: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNMethod(m map[string]int, n int) []MethodCount { + result := make([]MethodCount, 0, len(m)) + for k, v := range m { + result = append(result, MethodCount{Method: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNPve(m map[string]int, n int) []PveCount { + result := make([]PveCount, 0, len(m)) + for k, v := range m { + result = append(result, PveCount{Version: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +func topNType(m map[string]int, n int) []TypeCount { + result := make([]TypeCount, 0, len(m)) + for k, v := range m { + result = append(result, TypeCount{Type: k, Count: v}) + } + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + if len(result) > n { + return result[:n] + } + return result +} + +// normalizeError simplifies error messages into patterns for grouping +func normalizeError(err string) string { + err = strings.TrimSpace(err) + if err == "" { + return "unknown" + } + + // Normalize common patterns + err = strings.ToLower(err) + + // Remove specific numbers, IPs, paths that vary + // Keep it simple for now - just truncate and normalize + if len(err) > 60 { + err = err[:60] + } + + // Common error pattern replacements + patterns := map[string]string{ + "connection refused": "connection refused", + "timeout": "timeout", + "no space left": "disk full", + "permission denied": "permission denied", + "not found": "not found", + "failed to download": "download failed", + "apt": "apt error", + "dpkg": "dpkg error", + "curl": "network error", + "wget": "network error", + "docker": "docker error", + "systemctl": "systemd error", + "service": "service error", + } + + for pattern, label := range patterns { + if strings.Contains(err, pattern) { + return label + } + } + + // If no pattern matches, return first 40 chars + if len(err) > 40 { + return err[:40] + "..." + } + return err +} + +func buildErrorAnalysis(patterns map[string]map[string]bool, n int) []ErrorGroup { + result := make([]ErrorGroup, 0, len(patterns)) + + for pattern, apps := range patterns { + appList := make([]string, 0, len(apps)) + for app := range apps { + appList = append(appList, app) + } + + // Limit app list display + appsStr := strings.Join(appList, ", ") + if len(appsStr) > 50 { + appsStr = appsStr[:47] + "..." + } + + result = append(result, ErrorGroup{ + Pattern: pattern, + Count: len(apps), // Number of unique apps with this error + Apps: appsStr, + }) + } + + // Sort by count descending + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].Count > result[i].Count { + result[i], result[j] = result[j], result[i] + } + } + } + + if len(result) > n { + return result[:n] + } + return result +} + +func buildFailedApps(total, failed map[string]int, n int) []AppFailure { + result := make([]AppFailure, 0) + + for app, failCount := range failed { + totalCount := total[app] + if totalCount == 0 { + continue + } + + rate := float64(failCount) / float64(totalCount) * 100 + result = append(result, AppFailure{ + App: app, + TotalCount: totalCount, + FailedCount: failCount, + FailureRate: rate, + }) + } + + // Sort by failure rate descending + for i := 0; i < len(result)-1; i++ { + for j := i + 1; j < len(result); j++ { + if result[j].FailureRate > result[i].FailureRate { + result[i], result[j] = result[j], result[i] + } + } + } + + if len(result) > n { + return result[:n] + } + return result +} + +func buildDailyStats(success, failed map[string]int, days int) []DailyStat { + result := make([]DailyStat, 0, days) + for i := days - 1; i >= 0; i-- { + date := time.Now().AddDate(0, 0, -i).Format("2006-01-02") + result = append(result, DailyStat{ + Date: date, + Success: success[date], + Failed: failed[date], + }) + } + return result +} + +// DashboardHTML returns the embedded dashboard HTML +func DashboardHTML() string { + return ` + + + + + Telemetry Dashboard - ProxmoxVE Helper Scripts + + + + + + +
+

+ + + + + Telemetry Dashboard +

+
+ + + + + +
+
+ + + +
+
+
Total Installations
+
-
+
+
+
Successful
+
-
+
+
+
Failed
+
-
+
+
+
In Progress
+
-
+
+
+
Success Rate
+
-
+
+
+
LXC / VM
+
-
+
+
+ +
+

Proxmox VE Versions

+
+ Loading... +
+
+ +
+
+

Installations Over Time

+
+ +
+
+
+

Status Distribution

+
+ +
+
+
+ +
+
+

Top Applications

+
+ +
+
+
+

OS Distribution

+
+ +
+
+
+

Installation Method

+
+ +
+
+
+ +
+

+ + + + + + Error Analysis +

+
+ Loading... +
+
+ +
+

+ + + + + + Apps with Highest Failure Rates +

+
+ Loading... +
+
+ +
+

Recent Installations

+
+ + + +
+ + + + + + + + + + + + + + + + +
AppStatusOSTypeMethodResourcesExit CodeError
Loading...
+ +
+ + + + + +` +} diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..67c09b2 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,55 @@ +#!/bin/sh +set -e + +echo "=============================================" +echo " ProxmoxVED Telemetry Service" +echo "=============================================" + +# Map Coolify ENV names to migration script names +# Coolify uses PB_URL, PB_TARGET_COLLECTION +export POCKETBASE_URL="${POCKETBASE_URL:-$PB_URL}" +export POCKETBASE_COLLECTION="${POCKETBASE_COLLECTION:-$PB_TARGET_COLLECTION}" + +# Run migration if enabled +if [ "$RUN_MIGRATION" = "true" ]; then + echo "" + echo "🔄 Migration mode enabled" + echo " Source: $MIGRATION_SOURCE_URL" + echo " Target: $POCKETBASE_URL" + echo " Collection: $POCKETBASE_COLLECTION" + echo "" + + # Wait for PocketBase to be ready + echo "⏳ Waiting for PocketBase to be ready..." + RETRIES=30 + until wget -q --spider "$POCKETBASE_URL/api/health" 2>/dev/null; do + RETRIES=$((RETRIES - 1)) + if [ $RETRIES -le 0 ]; then + echo "❌ PocketBase not reachable after 30 attempts" + if [ "$MIGRATION_REQUIRED" = "true" ]; then + exit 1 + fi + echo "⚠️ Continuing without migration..." + break + fi + echo " Waiting... ($RETRIES attempts left)" + sleep 2 + done + + if wget -q --spider "$POCKETBASE_URL/api/health" 2>/dev/null; then + echo "✅ PocketBase is ready" + echo "" + echo "🚀 Starting migration..." + /app/migrate || { + if [ "$MIGRATION_REQUIRED" = "true" ]; then + echo "❌ Migration failed!" + exit 1 + fi + echo "⚠️ Migration failed, but continuing..." + } + echo "" + fi +fi + +echo "🚀 Starting telemetry service..." +exec /app/telemetry-service diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7c1021a --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/community-scripts/telemetry-service + +go 1.25.5 + +require github.com/redis/go-redis/v9 v9.7.0 + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f11d99f --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= diff --git a/migration/migrate.go b/migration/migrate.go new file mode 100644 index 0000000..7212c2d --- /dev/null +++ b/migration/migrate.go @@ -0,0 +1,366 @@ +// +build ignore + +// Migration script to import data from the old API to PocketBase +// Run with: go run migrate.go +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "time" +) + +const ( + defaultSourceAPI = "https://api.htl-braunau.at/dev/data" + defaultPBURL = "http://localhost:8090" + batchSize = 100 +) + +var ( + sourceAPI string + summaryAPI string + authToken string // PocketBase auth token +) + +// OldDataModel represents the data structure from the old API +type OldDataModel struct { + ID string `json:"id"` + CtType int `json:"ct_type"` + DiskSize int `json:"disk_size"` + CoreCount int `json:"core_count"` + RamSize int `json:"ram_size"` + OsType string `json:"os_type"` + OsVersion string `json:"os_version"` + DisableIP6 string `json:"disableip6"` + NsApp string `json:"nsapp"` + Method string `json:"method"` + CreatedAt string `json:"created_at"` + PveVersion string `json:"pve_version"` + Status string `json:"status"` + RandomID string `json:"random_id"` + Type string `json:"type"` + Error string `json:"error"` +} + +// PBRecord represents the PocketBase record format +type PBRecord struct { + CtType int `json:"ct_type"` + DiskSize int `json:"disk_size"` + CoreCount int `json:"core_count"` + RamSize int `json:"ram_size"` + OsType string `json:"os_type"` + OsVersion string `json:"os_version"` + DisableIP6 string `json:"disableip6"` + NsApp string `json:"nsapp"` + Method string `json:"method"` + PveVersion string `json:"pve_version"` + Status string `json:"status"` + RandomID string `json:"random_id"` + Type string `json:"type"` + Error string `json:"error"` + // created_at will be set automatically by PocketBase +} + +type Summary struct { + TotalEntries int `json:"total_entries"` +} + +func main() { + // Setup source URLs + baseURL := os.Getenv("MIGRATION_SOURCE_URL") + if baseURL == "" { + baseURL = defaultSourceAPI + } + sourceAPI = baseURL + "/paginated" + summaryAPI = baseURL + "/summary" + + // Support both POCKETBASE_URL and PB_URL (Coolify uses PB_URL) + pbURL := os.Getenv("POCKETBASE_URL") + if pbURL == "" { + pbURL = os.Getenv("PB_URL") + } + if pbURL == "" { + pbURL = defaultPBURL + } + + // Support both POCKETBASE_COLLECTION and PB_TARGET_COLLECTION + pbCollection := os.Getenv("POCKETBASE_COLLECTION") + if pbCollection == "" { + pbCollection = os.Getenv("PB_TARGET_COLLECTION") + } + if pbCollection == "" { + pbCollection = "_dev_telemetry_data" + } + + // Auth collection + authCollection := os.Getenv("PB_AUTH_COLLECTION") + if authCollection == "" { + authCollection = "_dev_telemetry_service" + } + + // Credentials + pbIdentity := os.Getenv("PB_IDENTITY") + pbPassword := os.Getenv("PB_PASSWORD") + + fmt.Println("===========================================") + fmt.Println(" Data Migration to PocketBase") + fmt.Println("===========================================") + fmt.Printf("Source API: %s\n", baseURL) + fmt.Printf("PocketBase URL: %s\n", pbURL) + fmt.Printf("Collection: %s\n", pbCollection) + fmt.Printf("Auth Collection: %s\n", authCollection) + fmt.Println("-------------------------------------------") + + // Authenticate with PocketBase + if pbIdentity != "" && pbPassword != "" { + fmt.Println("🔐 Authenticating with PocketBase...") + err := authenticate(pbURL, authCollection, pbIdentity, pbPassword) + if err != nil { + fmt.Printf("❌ Authentication failed: %v\n", err) + os.Exit(1) + } + fmt.Println("✅ Authentication successful") + } else { + fmt.Println("⚠️ No credentials provided, trying without auth...") + } + fmt.Println("-------------------------------------------") + + // Get total count + summary, err := getSummary() + if err != nil { + fmt.Printf("❌ Failed to get summary: %v\n", err) + os.Exit(1) + } + fmt.Printf("📊 Total entries to migrate: %d\n", summary.TotalEntries) + fmt.Println("-------------------------------------------") + + // Calculate pages + totalPages := (summary.TotalEntries + batchSize - 1) / batchSize + + var totalMigrated, totalFailed, totalSkipped int + + for page := 1; page <= totalPages; page++ { + fmt.Printf("📦 Fetching page %d/%d (items %d-%d)...\n", + page, totalPages, + (page-1)*batchSize+1, + min(page*batchSize, summary.TotalEntries)) + + data, err := fetchPage(page, batchSize) + if err != nil { + fmt.Printf(" ❌ Failed to fetch page %d: %v\n", page, err) + totalFailed += batchSize + continue + } + + for i, record := range data { + err := importRecord(pbURL, pbCollection, record) + if err != nil { + if isUniqueViolation(err) { + totalSkipped++ + continue + } + fmt.Printf(" ❌ Failed to import record %d: %v\n", (page-1)*batchSize+i+1, err) + totalFailed++ + continue + } + totalMigrated++ + } + + fmt.Printf(" ✅ Page %d complete (migrated: %d, skipped: %d, failed: %d)\n", + page, len(data), totalSkipped, totalFailed) + + // Small delay to avoid overwhelming the server + time.Sleep(100 * time.Millisecond) + } + + fmt.Println("===========================================") + fmt.Println(" Migration Complete") + fmt.Println("===========================================") + fmt.Printf("✅ Successfully migrated: %d\n", totalMigrated) + fmt.Printf("⏭️ Skipped (duplicates): %d\n", totalSkipped) + fmt.Printf("❌ Failed: %d\n", totalFailed) + fmt.Println("===========================================") +} + +func getSummary() (*Summary, error) { + resp, err := http.Get(summaryAPI) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var summary Summary + if err := json.NewDecoder(resp.Body).Decode(&summary); err != nil { + return nil, err + } + + return &summary, nil +} + +func authenticate(pbURL, authCollection, identity, password string) error { + body := map[string]string{ + "identity": identity, + "password": password, + } + jsonData, _ := json.Marshal(body) + + url := fmt.Sprintf("%s/api/collections/%s/auth-with-password", pbURL, authCollection) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + var result struct { + Token string `json:"token"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return err + } + if result.Token == "" { + return fmt.Errorf("no token in response") + } + + authToken = result.Token + return nil +} + +func fetchPage(page, limit int) ([]OldDataModel, error) { + url := fmt.Sprintf("%s?page=%d&limit=%d", sourceAPI, page, limit) + + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + var data []OldDataModel + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return nil, err + } + + return data, nil +} + +func importRecord(pbURL, collection string, old OldDataModel) error { + // Map status: "done" -> "success" + status := old.Status + switch status { + case "done": + status = "success" + case "installing", "failed", "unknown", "success": + // keep as-is + default: + status = "unknown" + } + + // Ensure ct_type is not 0 (required field) + ctType := old.CtType + if ctType == 0 { + ctType = 1 // default to unprivileged + } + + // Ensure type is set + recordType := old.Type + if recordType == "" { + recordType = "lxc" + } + + record := PBRecord{ + CtType: ctType, + DiskSize: old.DiskSize, + CoreCount: old.CoreCount, + RamSize: old.RamSize, + OsType: old.OsType, + OsVersion: old.OsVersion, + DisableIP6: old.DisableIP6, + NsApp: old.NsApp, + Method: old.Method, + PveVersion: old.PveVersion, + Status: status, + RandomID: old.RandomID, + Type: recordType, + Error: old.Error, + } + + jsonData, err := json.Marshal(record) + if err != nil { + return err + } + + url := fmt.Sprintf("%s/api/collections/%s/records", pbURL, collection) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + if authToken != "" { + req.Header.Set("Authorization", "Bearer "+authToken) + } + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +func isUniqueViolation(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return contains(errStr, "UNIQUE constraint failed") || + contains(errStr, "duplicate") || + contains(errStr, "already exists") || + contains(errStr, "validation_not_unique") +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/migration/migrate.sh b/migration/migrate.sh new file mode 100755 index 0000000..1da9e25 --- /dev/null +++ b/migration/migrate.sh @@ -0,0 +1,67 @@ +#!/bin/bash +# Migration script to import data from the old API to PocketBase +# Usage: ./migrate.sh [POCKETBASE_URL] [COLLECTION_NAME] +# +# Examples: +# ./migrate.sh # Uses defaults +# ./migrate.sh http://localhost:8090 # Custom PB URL +# ./migrate.sh http://localhost:8090 my_telemetry # Custom URL and collection + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Default values +POCKETBASE_URL="${1:-http://localhost:8090}" +POCKETBASE_COLLECTION="${2:-_dev_telemetry_data}" + +echo "=============================================" +echo " ProxmoxVED Data Migration Tool" +echo "=============================================" +echo "" +echo "This script will migrate telemetry data from:" +echo " Source: https://api.htl-braunau.at/dev/data" +echo " Target: $POCKETBASE_URL" +echo " Collection: $POCKETBASE_COLLECTION" +echo "" + +# Check if PocketBase is reachable +echo "🔍 Checking PocketBase connection..." +if ! curl -sf "$POCKETBASE_URL/api/health" > /dev/null 2>&1; then + echo "❌ Cannot reach PocketBase at $POCKETBASE_URL" + echo " Make sure PocketBase is running and the URL is correct." + exit 1 +fi +echo "✅ PocketBase is reachable" +echo "" + +# Check source API +echo "🔍 Checking source API..." +SUMMARY=$(curl -sf "https://api.htl-braunau.at/dev/data/summary" 2>/dev/null || echo "") +if [ -z "$SUMMARY" ]; then + echo "❌ Cannot reach source API" + exit 1 +fi + +TOTAL=$(echo "$SUMMARY" | grep -o '"total_entries":[0-9]*' | cut -d: -f2) +echo "✅ Source API is reachable ($TOTAL entries available)" +echo "" + +# Confirm migration +read -p "⚠️ Do you want to start the migration? [y/N] " -n 1 -r +echo "" +if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "Migration cancelled." + exit 0 +fi + +echo "" +echo "Starting migration..." +echo "" + +# Run the Go migration script +cd "$SCRIPT_DIR" +POCKETBASE_URL="$POCKETBASE_URL" POCKETBASE_COLLECTION="$POCKETBASE_COLLECTION" go run migrate.go + +echo "" +echo "Migration complete!" diff --git a/service.go b/service.go new file mode 100644 index 0000000..5782979 --- /dev/null +++ b/service.go @@ -0,0 +1,1027 @@ +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "strings" + "sync" + "time" +) + +type Config struct { + ListenAddr string + TrustedProxiesCIDR []string + + // PocketBase + PBBaseURL string + PBAuthCollection string // "_dev_telemetry_service" + PBIdentity string // email + PBPassword string + PBTargetColl string // "_dev_telemetry_data" + + // Limits + MaxBodyBytes int64 + RateLimitRPM int // requests per minute per key + RateBurst int // burst tokens + RateKeyMode string // "ip" or "header" + RateKeyHeader string // e.g. "X-Telemetry-Key" + RequestTimeout time.Duration // upstream timeout + EnableReqLogging bool // default false (GDPR-friendly) + + // Cache + RedisURL string + EnableRedis bool + CacheTTL time.Duration + CacheEnabled bool + + // Alerts (SMTP) + AlertEnabled bool + SMTPHost string + SMTPPort int + SMTPUser string + SMTPPassword string + SMTPFrom string + SMTPTo []string + SMTPUseTLS bool + AlertFailureThreshold float64 + AlertCheckInterval time.Duration + AlertCooldown time.Duration +} + +// TelemetryIn matches payload from api.func (bash client) +type TelemetryIn struct { + // Required + RandomID string `json:"random_id"` // Session UUID + Type string `json:"type"` // "lxc" or "vm" + NSAPP string `json:"nsapp"` // Application name (e.g., "jellyfin") + Status string `json:"status"` // "installing", "success", "failed", "unknown" + + // Container/VM specs + CTType int `json:"ct_type,omitempty"` // 1=unprivileged, 2=privileged/VM + DiskSize int `json:"disk_size,omitempty"` // GB + CoreCount int `json:"core_count,omitempty"` // CPU cores + RAMSize int `json:"ram_size,omitempty"` // MB + + // System info + OsType string `json:"os_type,omitempty"` // "debian", "ubuntu", "alpine", etc. + OsVersion string `json:"os_version,omitempty"` // "12", "24.04", etc. + PveVer string `json:"pve_version,omitempty"` + + // Optional + Method string `json:"method,omitempty"` // "default", "advanced" + Error string `json:"error,omitempty"` // Error description (max 120 chars) + ExitCode int `json:"exit_code,omitempty"` // 0-255 +} + +// TelemetryOut is sent to PocketBase (matches _dev_telemetry_data collection) +type TelemetryOut struct { + RandomID string `json:"random_id"` + Type string `json:"type"` + NSAPP string `json:"nsapp"` + Status string `json:"status"` + CTType int `json:"ct_type,omitempty"` + DiskSize int `json:"disk_size,omitempty"` + CoreCount int `json:"core_count,omitempty"` + RAMSize int `json:"ram_size,omitempty"` + OsType string `json:"os_type,omitempty"` + OsVersion string `json:"os_version,omitempty"` + PveVer string `json:"pve_version,omitempty"` + Method string `json:"method,omitempty"` + Error string `json:"error,omitempty"` + ExitCode int `json:"exit_code,omitempty"` +} + +// TelemetryStatusUpdate contains only fields needed for status updates +type TelemetryStatusUpdate struct { + Status string `json:"status"` + Error string `json:"error,omitempty"` + ExitCode int `json:"exit_code"` +} + +type PBClient struct { + baseURL string + authCollection string + identity string + password string + targetColl string + + mu sync.Mutex + token string + exp time.Time + http *http.Client +} + +func NewPBClient(cfg Config) *PBClient { + return &PBClient{ + baseURL: strings.TrimRight(cfg.PBBaseURL, "/"), + authCollection: cfg.PBAuthCollection, + identity: cfg.PBIdentity, + password: cfg.PBPassword, + targetColl: cfg.PBTargetColl, + http: &http.Client{ + Timeout: cfg.RequestTimeout, + }, + } +} + +func (p *PBClient) ensureAuth(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + // refresh if token missing or expiring soon + if p.token != "" && time.Until(p.exp) > 60*time.Second { + return nil + } + + body := map[string]string{ + "identity": p.identity, + "password": p.password, + } + b, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + fmt.Sprintf("%s/api/collections/%s/auth-with-password", p.baseURL, p.authCollection), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 4<<10)) + return fmt.Errorf("pocketbase auth failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + + var out struct { + Token string `json:"token"` + // record omitted + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return err + } + if out.Token == "" { + return errors.New("pocketbase auth token missing") + } + + // PocketBase JWT exp can be parsed, but keep it simple: set 50 min + p.token = out.Token + p.exp = time.Now().Add(50 * time.Minute) + return nil +} + +// FindRecordByRandomID searches for an existing record by random_id +func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (string, error) { + if err := p.ensureAuth(ctx); err != nil { + return "", err + } + + // URL encode the filter + filter := fmt.Sprintf("random_id='%s'", randomID) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1", + p.baseURL, p.targetColl, filter), + nil, + ) + if err != nil { + return "", err + } + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("pocketbase search failed: %s", resp.Status) + } + + var result struct { + Items []struct { + ID string `json:"id"` + } `json:"items"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return "", err + } + + if len(result.Items) == 0 { + return "", nil // Not found + } + return result.Items[0].ID, nil +} + +// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record +func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, update TelemetryStatusUpdate) error { + if err := p.ensureAuth(ctx); err != nil { + return err + } + + b, _ := json.Marshal(update) + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, + fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, p.targetColl, recordID), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10)) + return fmt.Errorf("pocketbase update failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + return nil +} + +// FetchRecordsPaginated retrieves records with pagination and optional filters +func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType string) ([]TelemetryRecord, int, error) { + if err := p.ensureAuth(ctx); err != nil { + return nil, 0, err + } + + // Build filter + var filters []string + if status != "" { + filters = append(filters, fmt.Sprintf("status='%s'", status)) + } + if app != "" { + filters = append(filters, fmt.Sprintf("nsapp~'%s'", app)) + } + if osType != "" { + filters = append(filters, fmt.Sprintf("os_type='%s'", osType)) + } + + filterStr := "" + if len(filters) > 0 { + filterStr = "&filter=" + strings.Join(filters, "&&") + } + + reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=-created&page=%d&perPage=%d%s", + p.baseURL, p.targetColl, page, limit, filterStr) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + return nil, 0, err + } + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, 0, fmt.Errorf("pocketbase fetch failed: %s", resp.Status) + } + + var result struct { + Items []TelemetryRecord `json:"items"` + TotalItems int `json:"totalItems"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, 0, err + } + + return result.Items, result.TotalItems, nil +} + +// UpsertTelemetry handles both creation and updates intelligently +// - status="installing": Always creates a new record +// - status!="installing": Updates existing record (found by random_id) with status/error/exit_code only +func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) error { + // For "installing" status, always create new record + if payload.Status == "installing" { + return p.CreateTelemetry(ctx, payload) + } + + // For status updates (success/failed/unknown), find and update existing record + recordID, err := p.FindRecordByRandomID(ctx, payload.RandomID) + if err != nil { + // Search failed, log and return error + return fmt.Errorf("cannot find record to update: %w", err) + } + + if recordID == "" { + // Record not found - this shouldn't happen normally + // Create a full record as fallback + return p.CreateTelemetry(ctx, payload) + } + + // Update only status, error, and exit_code + update := TelemetryStatusUpdate{ + Status: payload.Status, + Error: payload.Error, + ExitCode: payload.ExitCode, + } + return p.UpdateTelemetryStatus(ctx, recordID, update) +} + +func (p *PBClient) CreateTelemetry(ctx context.Context, payload TelemetryOut) error { + if err := p.ensureAuth(ctx); err != nil { + return err + } + + b, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, p.targetColl), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10)) + return fmt.Errorf("pocketbase create failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + return nil +} + +// -------- Rate limiter (token bucket / minute window, simple) -------- + +type bucket struct { + tokens int + reset time.Time +} + +type RateLimiter struct { + mu sync.Mutex + buckets map[string]*bucket + rpm int + burst int + window time.Duration + cleanInt time.Duration +} + +func NewRateLimiter(rpm, burst int) *RateLimiter { + rl := &RateLimiter{ + buckets: make(map[string]*bucket), + rpm: rpm, + burst: burst, + window: time.Minute, + cleanInt: 5 * time.Minute, + } + go rl.cleanupLoop() + return rl +} + +func (r *RateLimiter) cleanupLoop() { + t := time.NewTicker(r.cleanInt) + defer t.Stop() + for range t.C { + now := time.Now() + r.mu.Lock() + for k, b := range r.buckets { + if now.After(b.reset.Add(2 * r.window)) { + delete(r.buckets, k) + } + } + r.mu.Unlock() + } +} + +func (r *RateLimiter) Allow(key string) bool { + if r.rpm <= 0 { + return true + } + now := time.Now() + r.mu.Lock() + defer r.mu.Unlock() + + b, ok := r.buckets[key] + if !ok || now.After(b.reset) { + r.buckets[key] = &bucket{tokens: min(r.burst, r.rpm), reset: now.Add(r.window)} + b = r.buckets[key] + } + if b.tokens <= 0 { + return false + } + b.tokens-- + return true +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// -------- Utility: GDPR-safe key extraction -------- + +type ProxyTrust struct { + nets []*net.IPNet +} + +func NewProxyTrust(cidrs []string) (*ProxyTrust, error) { + var nets []*net.IPNet + for _, c := range cidrs { + _, n, err := net.ParseCIDR(strings.TrimSpace(c)) + if err != nil { + return nil, err + } + nets = append(nets, n) + } + return &ProxyTrust{nets: nets}, nil +} + +func (pt *ProxyTrust) isTrusted(ip net.IP) bool { + for _, n := range pt.nets { + if n.Contains(ip) { + return true + } + } + return false +} + +func getClientIP(r *http.Request, pt *ProxyTrust) net.IP { + // If behind reverse proxy, trust X-Forwarded-For only if remote is trusted proxy. + host, _, _ := net.SplitHostPort(r.RemoteAddr) + remote := net.ParseIP(host) + if remote == nil { + return nil + } + + if pt != nil && pt.isTrusted(remote) { + xff := r.Header.Get("X-Forwarded-For") + if xff != "" { + parts := strings.Split(xff, ",") + ip := net.ParseIP(strings.TrimSpace(parts[0])) + if ip != nil { + return ip + } + } + } + return remote +} + +// -------- Validation (strict allowlist) -------- + +var ( + // Allowed values for 'type' field + allowedType = map[string]bool{"lxc": true, "vm": true} + + // Allowed values for 'status' field + allowedStatus = map[string]bool{"installing": true, "success": true, "failed": true, "unknown": true} + + // Allowed values for 'os_type' field + allowedOsType = map[string]bool{ + "debian": true, "ubuntu": true, "alpine": true, "devuan": true, + "fedora": true, "rocky": true, "alma": true, "centos": true, + "opensuse": true, "gentoo": true, "openeuler": true, + } +) + +func sanitizeShort(s string, max int) string { + s = strings.TrimSpace(s) + if s == "" { + return "" + } + // remove line breaks and high-risk chars + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + if len(s) > max { + s = s[:max] + } + return s +} + +func validate(in *TelemetryIn) error { + // Sanitize all string fields + in.RandomID = sanitizeShort(in.RandomID, 64) + in.Type = sanitizeShort(in.Type, 8) + in.NSAPP = sanitizeShort(in.NSAPP, 64) + in.Status = sanitizeShort(in.Status, 16) + in.OsType = sanitizeShort(in.OsType, 32) + in.OsVersion = sanitizeShort(in.OsVersion, 32) + in.PveVer = sanitizeShort(in.PveVer, 32) + in.Method = sanitizeShort(in.Method, 32) + + // IMPORTANT: "error" must be short and not contain identifiers/logs + in.Error = sanitizeShort(in.Error, 120) + + // Required fields for all requests + if in.RandomID == "" || in.Type == "" || in.NSAPP == "" || in.Status == "" { + return errors.New("missing required fields: random_id, type, nsapp, status") + } + + // Validate enums + if !allowedType[in.Type] { + return errors.New("invalid type (must be 'lxc' or 'vm')") + } + if !allowedStatus[in.Status] { + return errors.New("invalid status") + } + + // For status updates (not installing), skip numeric field validation + // These are only required for initial creation + isUpdate := in.Status != "installing" + + // os_type is optional but if provided must be valid + if in.OsType != "" && !allowedOsType[in.OsType] { + return errors.New("invalid os_type") + } + + // method is optional and flexible - just sanitized, no strict validation + // Values like "default", "advanced", "mydefaults-global", "mydefaults-app" are all valid + + // Validate numeric ranges (only strict for new records) + if !isUpdate { + if in.CTType < 0 || in.CTType > 2 { + return errors.New("invalid ct_type (must be 0, 1, or 2)") + } + } + if in.DiskSize < 0 || in.DiskSize > 100000 { + return errors.New("invalid disk_size") + } + if in.CoreCount < 0 || in.CoreCount > 256 { + return errors.New("invalid core_count") + } + if in.RAMSize < 0 || in.RAMSize > 1048576 { + return errors.New("invalid ram_size") + } + if in.ExitCode < 0 || in.ExitCode > 255 { + return errors.New("invalid exit_code") + } + + return nil +} + +// computeHash generates a hash for deduplication (GDPR-safe, no IP) +func computeHash(out TelemetryOut) string { + key := fmt.Sprintf("%s|%s|%s|%s|%d", + out.RandomID, out.NSAPP, out.Type, out.Status, out.ExitCode, + ) + sum := sha256.Sum256([]byte(key)) + return hex.EncodeToString(sum[:]) +} + +// -------- HTTP server -------- + +func main() { + cfg := Config{ + ListenAddr: env("LISTEN_ADDR", ":8080"), + TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")), + + PBBaseURL: mustEnv("PB_URL"), + PBAuthCollection: env("PB_AUTH_COLLECTION", "_dev_telemetry_service"), + PBIdentity: mustEnv("PB_IDENTITY"), + PBPassword: mustEnv("PB_PASSWORD"), + PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"), + + MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024), + RateLimitRPM: envInt("RATE_LIMIT_RPM", 60), + RateBurst: envInt("RATE_BURST", 20), + RateKeyMode: env("RATE_KEY_MODE", "ip"), // "ip" or "header" + RateKeyHeader: env("RATE_KEY_HEADER", "X-Telemetry-Key"), + RequestTimeout: time.Duration(envInt("UPSTREAM_TIMEOUT_MS", 4000)) * time.Millisecond, + EnableReqLogging: envBool("ENABLE_REQUEST_LOGGING", false), + + // Cache config + RedisURL: env("REDIS_URL", ""), + EnableRedis: envBool("ENABLE_REDIS", false), + CacheTTL: time.Duration(envInt("CACHE_TTL_SECONDS", 60)) * time.Second, + CacheEnabled: envBool("ENABLE_CACHE", true), + + // Alert config + AlertEnabled: envBool("ALERT_ENABLED", false), + SMTPHost: env("SMTP_HOST", ""), + SMTPPort: envInt("SMTP_PORT", 587), + SMTPUser: env("SMTP_USER", ""), + SMTPPassword: env("SMTP_PASSWORD", ""), + SMTPFrom: env("SMTP_FROM", "telemetry@proxmoxved.local"), + SMTPTo: splitCSV(env("SMTP_TO", "")), + SMTPUseTLS: envBool("SMTP_USE_TLS", false), + AlertFailureThreshold: envFloat("ALERT_FAILURE_THRESHOLD", 20.0), + AlertCheckInterval: time.Duration(envInt("ALERT_CHECK_INTERVAL_MIN", 15)) * time.Minute, + AlertCooldown: time.Duration(envInt("ALERT_COOLDOWN_MIN", 60)) * time.Minute, + } + + var pt *ProxyTrust + if strings.TrimSpace(env("TRUSTED_PROXIES_CIDR", "")) != "" { + p, err := NewProxyTrust(cfg.TrustedProxiesCIDR) + if err != nil { + log.Fatalf("invalid TRUSTED_PROXIES_CIDR: %v", err) + } + pt = p + } + + pb := NewPBClient(cfg) + rl := NewRateLimiter(cfg.RateLimitRPM, cfg.RateBurst) + + // Initialize cache + cache := NewCache(CacheConfig{ + RedisURL: cfg.RedisURL, + EnableRedis: cfg.EnableRedis, + DefaultTTL: cfg.CacheTTL, + }) + + // Initialize alerter + alerter := NewAlerter(AlertConfig{ + Enabled: cfg.AlertEnabled, + SMTPHost: cfg.SMTPHost, + SMTPPort: cfg.SMTPPort, + SMTPUser: cfg.SMTPUser, + SMTPPassword: cfg.SMTPPassword, + SMTPFrom: cfg.SMTPFrom, + SMTPTo: cfg.SMTPTo, + UseTLS: cfg.SMTPUseTLS, + FailureThreshold: cfg.AlertFailureThreshold, + CheckInterval: cfg.AlertCheckInterval, + Cooldown: cfg.AlertCooldown, + }, pb) + alerter.Start() + + mux := http.NewServeMux() + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + // Check PocketBase connectivity + ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second) + defer cancel() + + status := map[string]interface{}{ + "status": "ok", + "time": time.Now().UTC().Format(time.RFC3339), + } + + if err := pb.ensureAuth(ctx); err != nil { + status["status"] = "degraded" + status["pocketbase"] = "disconnected" + w.WriteHeader(503) + } else { + status["pocketbase"] = "connected" + w.WriteHeader(200) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) + }) + + // Dashboard HTML page - serve on root + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + _, _ = w.Write([]byte(DashboardHTML())) + }) + + // Redirect /dashboard to / for backwards compatibility + mux.HandleFunc("/dashboard", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/", http.StatusMovedPermanently) + }) + + // Prometheus-style metrics endpoint + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + data, err := pb.FetchDashboardData(ctx, 1) // Last 24h only for metrics + if err != nil { + http.Error(w, "failed to fetch metrics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintf(w, "# HELP telemetry_installs_total Total number of installations\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_total counter\n") + fmt.Fprintf(w, "telemetry_installs_total %d\n\n", data.TotalInstalls) + fmt.Fprintf(w, "# HELP telemetry_installs_success_total Successful installations\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_success_total counter\n") + fmt.Fprintf(w, "telemetry_installs_success_total %d\n\n", data.SuccessCount) + fmt.Fprintf(w, "# HELP telemetry_installs_failed_total Failed installations\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_failed_total counter\n") + fmt.Fprintf(w, "telemetry_installs_failed_total %d\n\n", data.FailedCount) + fmt.Fprintf(w, "# HELP telemetry_installs_pending Current installing count\n") + fmt.Fprintf(w, "# TYPE telemetry_installs_pending gauge\n") + fmt.Fprintf(w, "telemetry_installs_pending %d\n\n", data.InstallingCount) + fmt.Fprintf(w, "# HELP telemetry_success_rate Success rate percentage\n") + fmt.Fprintf(w, "# TYPE telemetry_success_rate gauge\n") + fmt.Fprintf(w, "telemetry_success_rate %.2f\n", data.SuccessRate) + }) + + // Dashboard API endpoint (with caching) + mux.HandleFunc("/api/dashboard", func(w http.ResponseWriter, r *http.Request) { + days := 30 + if d := r.URL.Query().Get("days"); d != "" { + fmt.Sscanf(d, "%d", &days) + if days < 1 { + days = 1 + } + if days > 365 { + days = 365 + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + // Try cache first + cacheKey := fmt.Sprintf("dashboard:%d", days) + var data *DashboardData + if cfg.CacheEnabled && cache.Get(ctx, cacheKey, &data) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Cache", "HIT") + json.NewEncoder(w).Encode(data) + return + } + + data, err := pb.FetchDashboardData(ctx, days) + if err != nil { + log.Printf("dashboard fetch failed: %v", err) + http.Error(w, "failed to fetch data", http.StatusInternalServerError) + return + } + + // Cache the result + if cfg.CacheEnabled { + _ = cache.Set(ctx, cacheKey, data, cfg.CacheTTL) + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Cache", "MISS") + json.NewEncoder(w).Encode(data) + }) + + // Paginated records API + mux.HandleFunc("/api/records", func(w http.ResponseWriter, r *http.Request) { + page := 1 + limit := 50 + status := r.URL.Query().Get("status") + app := r.URL.Query().Get("app") + osType := r.URL.Query().Get("os") + + if p := r.URL.Query().Get("page"); p != "" { + fmt.Sscanf(p, "%d", &page) + if page < 1 { + page = 1 + } + } + if l := r.URL.Query().Get("limit"); l != "" { + fmt.Sscanf(l, "%d", &limit) + if limit < 1 { + limit = 1 + } + if limit > 100 { + limit = 100 + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType) + if err != nil { + log.Printf("records fetch failed: %v", err) + http.Error(w, "failed to fetch records", http.StatusInternalServerError) + return + } + + response := map[string]interface{}{ + "records": records, + "page": page, + "limit": limit, + "total": total, + "total_pages": (total + limit - 1) / limit, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + }) + + // Alert history and test endpoints + mux.HandleFunc("/api/alerts", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "enabled": cfg.AlertEnabled, + "history": alerter.GetAlertHistory(), + }) + }) + + mux.HandleFunc("/api/alerts/test", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + if err := alerter.TestAlert(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("test alert sent")) + }) + + mux.HandleFunc("/telemetry", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + // rate key: IP or header (header allows non-identifying keys, but header can be abused too) + var key string + switch cfg.RateKeyMode { + case "header": + key = strings.TrimSpace(r.Header.Get(cfg.RateKeyHeader)) + if key == "" { + key = "missing" + } + default: + ip := getClientIP(r, pt) + if ip == nil { + key = "unknown" + } else { + // GDPR: do NOT store IP anywhere permanent; use it only in-memory for RL key + key = ip.String() + } + } + if !rl.Allow(key) { + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + + r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodyBytes) + raw, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "invalid body", http.StatusBadRequest) + return + } + + // strict JSON decode (no unknown fields) + var in TelemetryIn + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + if err := dec.Decode(&in); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + if err := validate(&in); err != nil { + http.Error(w, "invalid payload", http.StatusBadRequest) + return + } + + // Map input to PocketBase schema + out := TelemetryOut{ + RandomID: in.RandomID, + Type: in.Type, + NSAPP: in.NSAPP, + Status: in.Status, + CTType: in.CTType, + DiskSize: in.DiskSize, + CoreCount: in.CoreCount, + RAMSize: in.RAMSize, + OsType: in.OsType, + OsVersion: in.OsVersion, + PveVer: in.PveVer, + Method: in.Method, + Error: in.Error, + ExitCode: in.ExitCode, + } + _ = computeHash(out) // For future deduplication + + ctx, cancel := context.WithTimeout(r.Context(), cfg.RequestTimeout) + defer cancel() + + // Upsert: Creates new record if random_id doesn't exist, updates if it does + if err := pb.UpsertTelemetry(ctx, out); err != nil { + // GDPR: don't log raw payload, don't log IPs; log only generic error + log.Printf("pocketbase write failed: %v", err) + http.Error(w, "upstream error", http.StatusBadGateway) + return + } + + if cfg.EnableReqLogging { + log.Printf("telemetry accepted nsapp=%s status=%s", out.NSAPP, out.Status) + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("accepted")) + }) + + srv := &http.Server{ + Addr: cfg.ListenAddr, + Handler: securityHeaders(mux), + ReadHeaderTimeout: 3 * time.Second, + } + + log.Printf("telemetry-ingest listening on %s", cfg.ListenAddr) + log.Fatal(srv.ListenAndServe()) +} + +func securityHeaders(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Minimal security headers (no cookies anyway) + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("X-Frame-Options", "DENY") + w.Header().Set("Referrer-Policy", "no-referrer") + next.ServeHTTP(w, r) + }) +} + +func env(k, def string) string { + v := os.Getenv(k) + if v == "" { + return def + } + return v +} +func mustEnv(k string) string { + v := os.Getenv(k) + if v == "" { + log.Fatalf("missing env %s", k) + } + return v +} +func envInt(k string, def int) int { + v := os.Getenv(k) + if v == "" { + return def + } + var i int + _, _ = fmt.Sscanf(v, "%d", &i) + if i == 0 && v != "0" { + return def + } + return i +} +func envInt64(k string, def int64) int64 { + v := os.Getenv(k) + if v == "" { + return def + } + var i int64 + _, _ = fmt.Sscanf(v, "%d", &i) + if i == 0 && v != "0" { + return def + } + return i +} +func envBool(k string, def bool) bool { + v := strings.ToLower(strings.TrimSpace(os.Getenv(k))) + if v == "" { + return def + } + return v == "1" || v == "true" || v == "yes" || v == "on" +} +func envFloat(k string, def float64) float64 { + v := os.Getenv(k) + if v == "" { + return def + } + var f float64 + _, _ = fmt.Sscanf(v, "%f", &f) + if f == 0 && v != "0" { + return def + } + return f +} +func splitCSV(s string) []string { + s = strings.TrimSpace(s) + if s == "" { + return nil + } + parts := strings.Split(s, ",") + var out []string + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +}