fix: pipeline migration now supports PB v0.22+ (fields vs schema)

The EnsurePipelineField migration was using PocketBase's legacy 'schema' key
which was renamed to 'fields' in PB v0.22+. Since the install script fetches
the latest PB release, the migration silently failed: it read an empty schema
array, PATCHed with the wrong key, and the pipeline field was never created.
All pipeline data sent to PB was silently dropped.

Changes:
- Auto-detect schema/fields key from PB collection response
- Use correct key and field format for both PB <0.22 and >=0.22
- Add retry logic (3 attempts with backoff) for startup race conditions
- Add diagnostic logging when pipeline steps are tracked
- Log which API format (schema vs fields) is being used
This commit is contained in:
CanbiZ (MickLesk)
2026-03-02 15:27:23 +01:00
parent 89493fdf12
commit d4d9cafc97
+62 -18
View File
@@ -413,6 +413,7 @@ func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, u
// EnsurePipelineField checks the PocketBase collection schema and adds the
// 'pipeline' JSON field if it does not yet exist. Called once on startup.
// Supports both PB <0.22 ("schema" key) and PB >=0.22 ("fields" key).
func (p *PBClient) EnsurePipelineField(ctx context.Context) error {
if err := p.ensureAuth(ctx); err != nil {
return err
@@ -436,31 +437,56 @@ func (p *PBClient) EnsurePipelineField(ctx context.Context) error {
return fmt.Errorf("read collection schema: HTTP %s", resp.Status)
}
var collection struct {
Schema []map[string]interface{} `json:"schema"`
}
// PB <0.22 uses "schema", PB >=0.22 uses "fields" for field definitions.
// Parse both and use whichever is present.
var raw map[string]json.RawMessage
body, _ := io.ReadAll(resp.Body)
if err := json.Unmarshal(body, &collection); err != nil {
return fmt.Errorf("decode collection schema: %w", err)
if err := json.Unmarshal(body, &raw); err != nil {
return fmt.Errorf("decode collection: %w", err)
}
// Determine which key holds the field definitions
fieldsKey := "fields" // PB >=0.22 default
fieldsDef, hasFields := raw["fields"]
schemaDef, hasSchema := raw["schema"]
if !hasFields && hasSchema {
fieldsKey = "schema" // PB <0.22 fallback
fieldsDef = schemaDef
} else if !hasFields && !hasSchema {
return fmt.Errorf("collection response has neither 'fields' nor 'schema' key")
}
var fields []map[string]interface{}
if fieldsDef != nil {
if err := json.Unmarshal(fieldsDef, &fields); err != nil {
return fmt.Errorf("decode %s array: %w", fieldsKey, err)
}
}
// Check if pipeline field already exists
for _, field := range collection.Schema {
for _, field := range fields {
if name, _ := field["name"].(string); name == "pipeline" {
log.Printf("[MIGRATION] Pipeline field already exists in '%s' (via %s key)", p.targetColl, fieldsKey)
return nil
}
}
// Append the pipeline field (PocketBase JSON type)
log.Printf("[MIGRATION] Adding 'pipeline' JSON field to collection '%s'", p.targetColl)
collection.Schema = append(collection.Schema, map[string]interface{}{
log.Printf("[MIGRATION] Adding 'pipeline' JSON field to collection '%s' (using %s key)", p.targetColl, fieldsKey)
newField := map[string]interface{}{
"name": "pipeline",
"type": "json",
"required": false,
"options": map[string]interface{}{},
})
}
// PB <0.22 expects "options", PB >=0.22 uses "maxSize" directly
if fieldsKey == "schema" {
newField["options"] = map[string]interface{}{}
} else {
newField["maxSize"] = 0 // 0 = unlimited
}
fields = append(fields, newField)
patchBody, _ := json.Marshal(map[string]interface{}{"schema": collection.Schema})
patchBody, _ := json.Marshal(map[string]interface{}{fieldsKey: fields})
patchReq, err := http.NewRequestWithContext(ctx, http.MethodPatch,
fmt.Sprintf("%s/api/collections/%s", p.baseURL, p.targetColl),
bytes.NewReader(patchBody))
@@ -478,10 +504,10 @@ func (p *PBClient) EnsurePipelineField(ctx context.Context) error {
if patchResp.StatusCode < 200 || patchResp.StatusCode >= 300 {
rb, _ := io.ReadAll(io.LimitReader(patchResp.Body, 4<<10))
return fmt.Errorf("add pipeline field: %s: %s", patchResp.Status, string(rb))
return fmt.Errorf("add pipeline field via %s: %s: %s", fieldsKey, patchResp.Status, string(rb))
}
log.Printf("[MIGRATION] Successfully added 'pipeline' field to '%s'", p.targetColl)
log.Printf("[MIGRATION] Successfully added 'pipeline' field to '%s' via %s key", p.targetColl, fieldsKey)
return nil
}
@@ -1390,13 +1416,26 @@ func main() {
pb := NewPBClient(cfg)
// Auto-migrate: ensure 'pipeline' field exists in PocketBase collection
// Auto-migrate: ensure 'pipeline' field exists in PocketBase collection.
// Retry up to 3 times with backoff in case PB is still starting up.
{
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := pb.EnsurePipelineField(ctx); err != nil {
log.Printf("[MIGRATION] Warning: could not ensure pipeline field: %v", err)
var migErr error
for attempt := 1; attempt <= 3; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
migErr = pb.EnsurePipelineField(ctx)
cancel()
if migErr == nil {
break
}
log.Printf("[MIGRATION] Attempt %d/3 failed: %v", attempt, migErr)
if attempt < 3 {
time.Sleep(time.Duration(attempt*2) * time.Second)
}
}
if migErr != nil {
log.Printf("[MIGRATION] WARNING: pipeline field migration failed after 3 attempts: %v", migErr)
log.Printf("[MIGRATION] Pipeline tracking will NOT work until the field is added manually or the service is restarted")
}
cancel()
}
// Persistent script stats stores (7d, 30d, all-time)
@@ -2177,6 +2216,11 @@ func main() {
defer cleanupPipeline(pKey)
}
if cfg.EnableReqLogging {
log.Printf("pipeline tracked: nsapp=%s status=%s key=%s steps=%d bytes=%d",
in.NSAPP, in.Status, pKey, bytes.Count(pipeline, []byte(`"s":`)), len(pipeline))
}
// Map input to PocketBase schema
out := TelemetryOut{
RandomID: in.RandomID,