mirror of
https://github.com/Heretek-AI/telemetry-service.git
synced 2026-07-01 13:54:38 -04:00
feat: add execution_id field for unique record identification
- Add ExecutionID to TelemetryIn, TelemetryOut, TelemetryStatusUpdate structs - Add FindRecordByExecutionID() for O(1) unique-index lookups - Update UpsertTelemetry to prefer execution_id lookup with random_id fallback - Add execution_id sanitization in validate() - Map execution_id in handler (in→out)
This commit is contained in:
+75
-11
@@ -73,10 +73,11 @@ type Config struct {
|
||||
// TelemetryIn matches payload from api.func (bash client)
|
||||
type TelemetryIn struct {
|
||||
// Required
|
||||
RandomID string `json:"random_id"` // Session UUID
|
||||
Type string `json:"type"` // "lxc", "vm", "tool", "addon"
|
||||
NSAPP string `json:"nsapp"` // Application name (e.g., "jellyfin")
|
||||
Status string `json:"status"` // "installing", "success", "failed", "aborted", "unknown"
|
||||
RandomID string `json:"random_id"` // Session UUID
|
||||
ExecutionID string `json:"execution_id,omitempty"` // Unique execution ID (unique-indexed in PocketBase)
|
||||
Type string `json:"type"` // "lxc", "vm", "tool", "addon"
|
||||
NSAPP string `json:"nsapp"` // Application name (e.g., "jellyfin")
|
||||
Status string `json:"status"` // "installing", "success", "failed", "aborted", "unknown"
|
||||
|
||||
// Container/VM specs
|
||||
CTType int `json:"ct_type,omitempty"` // 1=unprivileged, 2=privileged/VM
|
||||
@@ -120,11 +121,12 @@ type TelemetryIn struct {
|
||||
|
||||
// TelemetryOut is sent to PocketBase (matches telemetry collection)
|
||||
type TelemetryOut struct {
|
||||
RandomID string `json:"random_id"`
|
||||
Type string `json:"type"`
|
||||
NSAPP string `json:"nsapp"`
|
||||
Status string `json:"status"`
|
||||
CTType int `json:"ct_type,omitempty"`
|
||||
RandomID string `json:"random_id"`
|
||||
ExecutionID string `json:"execution_id,omitempty"`
|
||||
Type string `json:"type"`
|
||||
NSAPP string `json:"nsapp"`
|
||||
Status string `json:"status"`
|
||||
CTType int `json:"ct_type,omitempty"`
|
||||
DiskSize int `json:"disk_size,omitempty"`
|
||||
CoreCount int `json:"core_count,omitempty"`
|
||||
RAMSize int `json:"ram_size,omitempty"`
|
||||
@@ -152,6 +154,7 @@ type TelemetryOut struct {
|
||||
// TelemetryStatusUpdate contains only fields needed for status updates
|
||||
type TelemetryStatusUpdate struct {
|
||||
Status string `json:"status"`
|
||||
ExecutionID string `json:"execution_id,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
InstallDuration int `json:"install_duration,omitempty"`
|
||||
@@ -291,6 +294,48 @@ func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (s
|
||||
return result.Items[0].ID, nil
|
||||
}
|
||||
|
||||
// FindRecordByExecutionID searches for an existing record by execution_id (unique-indexed, O(1) lookup)
|
||||
func (p *PBClient) FindRecordByExecutionID(ctx context.Context, executionID string) (string, error) {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
filter := fmt.Sprintf("execution_id='%s'", executionID)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1",
|
||||
p.baseURL, p.targetColl, url.QueryEscape(filter)),
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+p.token)
|
||||
|
||||
resp, err := p.http.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return "", fmt.Errorf("pocketbase search by execution_id failed: %s", resp.Status)
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Items []struct {
|
||||
ID string `json:"id"`
|
||||
} `json:"items"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(result.Items) == 0 {
|
||||
return "", nil // Not found
|
||||
}
|
||||
return result.Items[0].ID, nil
|
||||
}
|
||||
|
||||
// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record
|
||||
func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, update TelemetryStatusUpdate) error {
|
||||
if err := p.ensureAuth(ctx); err != nil {
|
||||
@@ -422,7 +467,9 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
||||
// All records go to the same collection; repo_source is stored as a field.
|
||||
//
|
||||
// For status="installing": always creates a new record.
|
||||
// For status!="installing": updates existing record (found by random_id).
|
||||
// For status!="installing": updates existing record.
|
||||
// - Prefers execution_id lookup (unique-indexed, O(1)) when available.
|
||||
// - Falls back to random_id lookup (filter query) for old clients.
|
||||
func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) error {
|
||||
// For "installing" status, always create new record
|
||||
if payload.Status == "installing" {
|
||||
@@ -430,7 +477,21 @@ func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) er
|
||||
}
|
||||
|
||||
// For status updates (success/failed/unknown), find and update existing record
|
||||
recordID, err := p.FindRecordByRandomID(ctx, payload.RandomID)
|
||||
// Prefer execution_id (unique-indexed) over random_id (filter query) for faster lookups
|
||||
var recordID string
|
||||
var err error
|
||||
|
||||
if payload.ExecutionID != "" {
|
||||
recordID, err = p.FindRecordByExecutionID(ctx, payload.ExecutionID)
|
||||
if err != nil {
|
||||
// Execution ID lookup failed, fall back to random_id
|
||||
recordID, err = p.FindRecordByRandomID(ctx, payload.RandomID)
|
||||
}
|
||||
} else {
|
||||
// Old client without execution_id — use random_id lookup
|
||||
recordID, err = p.FindRecordByRandomID(ctx, payload.RandomID)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Search failed, log and return error
|
||||
return fmt.Errorf("cannot find record to update: %w", err)
|
||||
@@ -445,6 +506,7 @@ func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) er
|
||||
// Update only status, error, exit_code, and new metrics fields
|
||||
update := TelemetryStatusUpdate{
|
||||
Status: payload.Status,
|
||||
ExecutionID: payload.ExecutionID,
|
||||
Error: payload.Error,
|
||||
ExitCode: payload.ExitCode,
|
||||
InstallDuration: payload.InstallDuration,
|
||||
@@ -744,6 +806,7 @@ func sanitizeMultiLine(s string, max int) string {
|
||||
func validate(in *TelemetryIn) error {
|
||||
// Sanitize all string fields
|
||||
in.RandomID = sanitizeShort(in.RandomID, 64)
|
||||
in.ExecutionID = sanitizeShort(in.ExecutionID, 64)
|
||||
in.Type = sanitizeShort(in.Type, 8)
|
||||
in.NSAPP = sanitizeShort(in.NSAPP, 64)
|
||||
in.Status = sanitizeShort(in.Status, 16)
|
||||
@@ -1784,6 +1847,7 @@ func main() {
|
||||
// Map input to PocketBase schema
|
||||
out := TelemetryOut{
|
||||
RandomID: in.RandomID,
|
||||
ExecutionID: in.ExecutionID,
|
||||
Type: in.Type,
|
||||
NSAPP: in.NSAPP,
|
||||
Status: in.Status,
|
||||
|
||||
Reference in New Issue
Block a user