diff --git a/.github/workflows/ci-ai.yml b/.github/workflows/ci-ai.yml index f0301ca8fa..306f4d8685 100644 --- a/.github/workflows/ci-ai.yml +++ b/.github/workflows/ci-ai.yml @@ -31,6 +31,12 @@ jobs: # because we want the Braintrust experiment to have accurate git metadata (on master it's empty) ref: ${{ github.event.pull_request.head.ref }} fetch-depth: 0 + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Stop/Start stack with Docker Compose run: | diff --git a/.github/workflows/ci-backend.yml b/.github/workflows/ci-backend.yml index d9ef0eb1e2..e286024cb2 100644 --- a/.github/workflows/ci-backend.yml +++ b/.github/workflows/ci-backend.yml @@ -54,6 +54,13 @@ jobs: # For pull requests it's not necessary to checkout the code, but we # also want this to run on master so we need to checkout - uses: actions/checkout@v4 + with: + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 id: filter @@ -114,6 +121,13 @@ jobs: steps: - uses: actions/checkout@v4 + with: + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Stop/Start stack with Docker Compose run: | @@ -156,6 +170,7 @@ jobs: uses: actions/checkout@v4 with: ref: master + clean: false - name: Install python dependencies for master run: | @@ -179,6 +194,8 @@ jobs: # Now we can consider this PR's migrations - name: Checkout this PR uses: actions/checkout@v4 + with: + clean: false - name: Install python dependencies for this PR run: | @@ -550,6 +567,12 @@ jobs: ref: ${{ github.event.pull_request.head.ref }} # Use PostHog Bot token when not on forks to enable proper snapshot updating token: ${{ github.event.pull_request.head.repo.full_name == github.repository && secrets.POSTHOG_BOT_PAT || github.token }} + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: 'Safeguard: ensure no stray Python modules at product root' run: | @@ -891,6 +914,12 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 1 + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Start stack with Docker Compose run: | diff --git a/.github/workflows/ci-dagster.yml b/.github/workflows/ci-dagster.yml index 9750539f11..cec0330474 100644 --- a/.github/workflows/ci-dagster.yml +++ b/.github/workflows/ci-dagster.yml @@ -38,6 +38,8 @@ jobs: # For pull requests it's not necessary to checkout the code, but we # also want this to run on master so we need to checkout - uses: actions/checkout@v4 + with: + clean: false - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 id: filter @@ -71,6 +73,12 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 1 + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Start stack with Docker Compose run: | diff --git a/.github/workflows/ci-e2e-playwright.yml b/.github/workflows/ci-e2e-playwright.yml index c5af58dfd9..5347006333 100644 --- a/.github/workflows/ci-e2e-playwright.yml +++ b/.github/workflows/ci-e2e-playwright.yml @@ -124,6 +124,12 @@ jobs: repository: ${{ github.event.pull_request.head.repo.full_name }} token: ${{ secrets.POSTHOG_BOT_PAT || github.token }} fetch-depth: 50 # Need enough history for flap detection to find last human commit + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Stop/Start stack with Docker Compose shell: bash diff --git a/.github/workflows/ci-hobby.yml b/.github/workflows/ci-hobby.yml index 054a0492eb..927ca2656a 100644 --- a/.github/workflows/ci-hobby.yml +++ b/.github/workflows/ci-hobby.yml @@ -27,6 +27,13 @@ jobs: name: Setup DO Hobby Instance and test steps: - uses: actions/checkout@v4 + with: + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - uses: actions/setup-python@v5 with: python-version: '3.8' diff --git a/.github/workflows/ci-nodejs.yml b/.github/workflows/ci-nodejs.yml index 7f3659a70a..af22281a05 100644 --- a/.github/workflows/ci-nodejs.yml +++ b/.github/workflows/ci-nodejs.yml @@ -32,6 +32,8 @@ jobs: # For pull requests it's not necessary to checkout the code, but we # also want this to run on master so we need to checkout - uses: actions/checkout@v4 + with: + clean: false - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 id: filter @@ -55,6 +57,8 @@ jobs: runs-on: depot-ubuntu-24.04-4 steps: - uses: actions/checkout@v4 + with: + clean: false - name: Install pnpm uses: pnpm/action-setup@a7487c7e89a18df4991f7f222e4898a00d66ddda # v4 @@ -83,6 +87,8 @@ jobs: runs-on: depot-ubuntu-24.04-4 steps: - uses: actions/checkout@v4 + with: + clean: false - name: Install pnpm uses: pnpm/action-setup@a7487c7e89a18df4991f7f222e4898a00d66ddda # v4 @@ -133,6 +139,13 @@ jobs: steps: - name: Code check out uses: actions/checkout@v4 + with: + clean: false + - name: Clean up data directories with container permissions + run: | + # Use docker to clean up files created by containers + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Stop/Start stack with Docker Compose run: | diff --git a/.github/workflows/ci-rust.yml b/.github/workflows/ci-rust.yml index 184ca68b81..69b01fa889 100644 --- a/.github/workflows/ci-rust.yml +++ b/.github/workflows/ci-rust.yml @@ -23,6 +23,8 @@ jobs: # For pull requests it's not necessary to checkout the code, but we # also want this to run on master so we need to checkout - uses: actions/checkout@v4 + with: + clean: false - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 id: filter with: @@ -56,6 +58,7 @@ jobs: with: sparse-checkout: 'rust/' sparse-checkout-cone-mode: false + clean: false - name: Install rust if: needs.changes.outputs.rust == 'true' @@ -122,6 +125,15 @@ jobs: steps: - uses: actions/checkout@v4 if: needs.changes.outputs.rust == 'true' + with: + clean: false + - name: Clean up data directories with container permissions + if: needs.changes.outputs.rust == 'true' + working-directory: . + run: | + # Use docker to clean up files created by containers (from repo root, not rust/) + [ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true + continue-on-error: true - name: Setup main repo and dependencies if: needs.changes.outputs.rust == 'true' @@ -233,6 +245,7 @@ jobs: with: sparse-checkout: 'rust/' sparse-checkout-cone-mode: false + clean: false - name: Install rust if: needs.changes.outputs.rust == 'true' @@ -280,6 +293,7 @@ jobs: with: sparse-checkout: 'rust/' sparse-checkout-cone-mode: false + clean: false - name: Install cargo-binstall if: needs.changes.outputs.rust == 'true' diff --git a/.gitignore b/.gitignore index 649287abb4..2d8329049a 100644 --- a/.gitignore +++ b/.gitignore @@ -70,6 +70,8 @@ gen/ max-test-venv/ node_modules/ object_storage/ +data/seaweedfs/ +.migration-checkpoint.json playwright/e2e-vrt/**/*-darwin.png pnpm-error.log # pyright config (keep this until we have a standardized one) diff --git a/bin/migrate-minio-to-seaweedfs b/bin/migrate-minio-to-seaweedfs new file mode 100755 index 0000000000..e23bfd92a9 --- /dev/null +++ b/bin/migrate-minio-to-seaweedfs @@ -0,0 +1,12 @@ +#!/bin/bash +set -e + +# Wrapper script for the MinIO to SeaweedFS migration tool +# This allows calling the migration from the root directory + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" + +cd "$PROJECT_ROOT/plugin-server" +exec node bin/migrate-minio-to-seaweedfs.js "$@" + diff --git a/bin/migrate-session-recordings-hobby b/bin/migrate-session-recordings-hobby new file mode 100755 index 0000000000..3ed7c89839 --- /dev/null +++ b/bin/migrate-session-recordings-hobby @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +set -e + +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo "📦 PostHog Session Recording Migration (MinIO → SeaweedFS)" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo "" +echo "This script will migrate your existing session recordings from" +echo "MinIO to SeaweedFS storage." +echo "" +echo "⚠️ Important:" +echo " - Keep this script running until completion" +echo " - The migration runs in the background and won't block new recordings" +echo " - You can monitor progress in the output" +echo "" + +read -r -p "Do you want to start the migration? [y/N] " response +if [[ ! "$response" =~ ^([yY][eE][sS]|[yY])+$ ]] +then + echo "Migration cancelled." + exit 0 +fi + +echo "" +echo "🚀 Starting migration..." +echo "" + +# Run the migration script inside the plugins container +docker-compose run --rm \ + -e OBJECT_STORAGE_ENDPOINT=http://objectstorage:19000 \ + -e OBJECT_STORAGE_ACCESS_KEY_ID=object_storage_root_user \ + -e OBJECT_STORAGE_SECRET_ACCESS_KEY=object_storage_root_password \ + -e SESSION_RECORDING_V2_S3_ENDPOINT=http://seaweedfs:8333 \ + -e SESSION_RECORDING_V2_S3_ACCESS_KEY_ID=any \ + -e SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY=any \ + plugins \ + node plugin-server/bin/migrate-minio-to-seaweedfs.js "$@" + +if [ $? -eq 0 ]; then + echo "" + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + echo "✅ Migration completed successfully!" + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + echo "" + echo "To mark the migration as complete in your .env file, run:" + echo "" + echo " sed -i 's/SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=pending_migration/SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=completed/' .env" + echo "" +else + echo "" + echo "❌ Migration failed. Check the output above for errors." + echo "" + echo "You can retry by running this script again with --resume:" + echo "" + echo " ./bin/migrate-session-recordings-hobby --resume" + echo "" + exit 1 +fi + diff --git a/bin/migrate-storage-hobby b/bin/migrate-storage-hobby new file mode 100755 index 0000000000..e21f7ae7c8 --- /dev/null +++ b/bin/migrate-storage-hobby @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +set -e + +# Wrapper script for syncing/migrating object storage between MinIO and SeaweedFS (Hobby) +# Runs inside the plugins container with compose network service names. + +# All available services +ALL_SERVICES=( + "query-cache" + "session-recordings" + "session-recordings-lts" + "media-uploads" + "exports" + "source-maps" +) + +SERVICE="" +MODE="sync" # default to bidirectional sync; use --mode migrate to one-way +EXTRA_ARGS=() + +# Parse args (supports: , --mode, --dry-run, --force, --workers, --resume, --revert, --conflict) +while [[ $# -gt 0 ]]; do + case "$1" in + --mode) + MODE="$2"; shift 2 ;; + --dry-run|--force|--resume|--revert) + EXTRA_ARGS+=("$1"); shift ;; + --workers|--conflict|--service) + EXTRA_ARGS+=("$1" "$2"); shift 2 ;; + -n) + EXTRA_ARGS+=("--dry-run"); shift ;; + query-cache|session-recordings|session-recordings-lts|media-uploads|exports|source-maps) + SERVICE="$1"; shift ;; + *) + echo "Unknown arg: $1"; exit 1 ;; + esac +done + +# Build env for endpoints: MinIO (objectstorage) -> source; SeaweedFS -> destination +# We pass these to the script; it will detect via env. +ENV_ARGS=( + -e OBJECT_STORAGE_ENDPOINT=http://objectstorage:19000 + -e OBJECT_STORAGE_ACCESS_KEY_ID=object_storage_root_user + -e OBJECT_STORAGE_SECRET_ACCESS_KEY=object_storage_root_password + -e SESSION_RECORDING_V2_S3_ENDPOINT=http://seaweedfs:8333 + -e SESSION_RECORDING_V2_S3_ACCESS_KEY_ID=any + -e SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY=any +) + +run_service() { + local service="$1" + echo "🔄 ${MODE^^}: $service (MinIO ⇄ SeaweedFS)" + docker-compose run --rm \ + "${ENV_ARGS[@]}" \ + plugins \ + node plugin-server/bin/migrate-minio-to-seaweedfs.js --service "$service" --mode "$MODE" "${EXTRA_ARGS[@]}" + echo "" +} + +if [ -z "$SERVICE" ]; then + echo "═══════════════════════════════════════════════════════════════" + echo "🔄 $MODE all services between MinIO and SeaweedFS (Hobby)" + echo "═══════════════════════════════════════════════════════════════" + echo "" + for s in "${ALL_SERVICES[@]}"; do + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + run_service "$s" + done + echo "═══════════════════════════════════════════════════════════════" + echo "✨ Completed: $MODE all services" + echo "═══════════════════════════════════════════════════════════════" +else + run_service "$SERVICE" + echo "✨ Done!" +fi + +echo "" +echo "Usage:" +echo " $0 # Sync all services" +echo " $0 --mode migrate # One-way migrate all services (MinIO → SeaweedFS)" +echo " $0 exports -n # Dry run exports only" diff --git a/bin/start-backend b/bin/start-backend index 611e211238..8a5f28c14b 100755 --- a/bin/start-backend +++ b/bin/start-backend @@ -2,7 +2,7 @@ set -e export OBJECT_STORAGE_ENDPOINT=http://localhost:19000 -export SESSION_RECORDING_V2_S3_ENDPOINT=http://localhost:19000 +export SESSION_RECORDING_V2_S3_ENDPOINT=http://localhost:8333 export CLICKHOUSE_API_USER="api" export CLICKHOUSE_API_PASSWORD="apipass" diff --git a/bin/sync-storage b/bin/sync-storage new file mode 100755 index 0000000000..b332256a67 --- /dev/null +++ b/bin/sync-storage @@ -0,0 +1,113 @@ +#!/usr/bin/env bash +set -e + +# Wrapper script for syncing object storage between MinIO and SeaweedFS +# This makes it easier to sync different services with sensible defaults + +# All available services (for explicit selection) +ALL_SERVICES=( + "query-cache" + "session-recordings" + "session-recordings-lts" + "media-uploads" + "exports" + "source-maps" +) + +# Default set when no service is specified (partial rollout): +# Only session recordings V2 and legacy LTS +DEFAULT_SERVICES=( + "session-recordings" + "session-recordings-lts" +) + +MODE="sync" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PLUGIN_SERVER_DIR="$SCRIPT_DIR/../plugin-server" + +# Ensure DEBUG is set for local dev +export DEBUG=1 + +cd "$PLUGIN_SERVER_DIR" + +# Parse arguments - handle both orders +SERVICE="" +DRY_RUN_FLAG="" + +for arg in "$@"; do + if [ "$arg" = "--dry-run" ] || [ "$arg" = "-n" ]; then + DRY_RUN_FLAG="--dry-run" + else + SERVICE="$arg" + fi +done + +# Function to sync a single service +sync_service() { + local service="$1" + local dry_run_flag="$2" + + echo "🔄 Syncing $service between MinIO and SeaweedFS..." + echo "" + + # Build the command + local cmd="node bin/migrate-minio-to-seaweedfs.js --service $service --mode $MODE" + + if [ -n "$dry_run_flag" ]; then + cmd="$cmd --dry-run" + fi + + # Execute + $cmd + echo "" +} + +# Show dry-run notice if applicable +if [ -n "$DRY_RUN_FLAG" ]; then + echo "📋 DRY RUN MODE - No changes will be made" + echo "" +fi + +# If no service specified, sync default subset (SR V2 + LTS) +if [ -z "$SERVICE" ]; then + echo "═══════════════════════════════════════════════════════════════" + echo "🔄 Syncing DEFAULT services between MinIO and SeaweedFS" + echo " (session-recordings, session-recordings-lts)" + echo "═══════════════════════════════════════════════════════════════" + echo "" + for service in "${DEFAULT_SERVICES[@]}"; do + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + sync_service "$service" "$DRY_RUN_FLAG" + done + + echo "═══════════════════════════════════════════════════════════════" + echo "✨ Default services synced!" + echo "═══════════════════════════════════════════════════════════════" +else + # Sync single service + sync_service "$SERVICE" "$DRY_RUN_FLAG" + echo "✨ Done!" +fi + +echo "" +echo "Available services:" +echo " - query-cache (ephemeral, safe to test)" +echo " - session-recordings (current V2 recordings)" +echo " - session-recordings-lts (long-term storage)" +echo " - media-uploads (user uploaded media)" +echo " - exports (CSV, PNG, PDF, videos)" +echo " - source-maps (error tracking source maps)" +echo "" +echo "Usage:" +echo " $0 # Sync DEFAULT services (SR V2 + LTS)" +echo " $0 [--dry-run] # Preview syncing DEFAULT services" +echo " $0 # Sync specific service" +echo " $0 [--dry-run] # Preview syncing specific service" +echo "" +echo "Examples:" +echo " $0 # Sync default services (SR V2 + LTS)" +echo " $0 --dry-run # Dry run for default services" +echo " $0 query-cache # Sync query cache only" +echo " $0 media-uploads -n # Dry run for media uploads only" + diff --git a/bin/upgrade-hobby b/bin/upgrade-hobby index 5907d8c703..a24dcfbe6d 100755 --- a/bin/upgrade-hobby +++ b/bin/upgrade-hobby @@ -118,6 +118,68 @@ if ! grep -q "SESSION_RECORDING_V2_METADATA_SWITCHOVER" .env; then source .env fi +# Check if session recording storage migration marker exists +if ! grep -q "SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS" .env; then + echo "" + echo "" + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + echo "📦 SESSION RECORDING STORAGE UPDATE" + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + echo "" + echo "PostHog now uses SeaweedFS for session recording storage." + echo "Your existing session recordings are stored in MinIO." + echo "" + echo "You have TWO options:" + echo "" + echo "1. 🔄 MIGRATE your existing session recordings to SeaweedFS" + echo " - Keeps all your existing recordings accessible" + echo " - Run migration script after upgrade" + echo " - New recordings will be stored in SeaweedFS" + echo " - Migration is best-effort for hobby deployments; support is limited" + echo "" + echo "2. ⚠️ ACCEPT DATA LOSS of existing session recordings" + echo " - Existing recordings will become inaccessible" + echo " - Only new recordings will be available" + echo " - Faster upgrade, no migration needed" + echo "" + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" + echo "" + read -r -p "Do you want to MIGRATE existing session recordings? [y/N] " response + if [[ "$response" =~ ^([yY][eE][sS]|[yY])+$ ]] + then + echo "" + echo "✅ You chose to MIGRATE existing recordings." + echo "" + echo "After the upgrade completes, run this command to migrate:" + echo "" + echo " ./bin/migrate-session-recordings-hobby" + echo "" + echo "Options:" + echo " --dry-run Preview what would be migrated" + echo " --resume Resume from last checkpoint if interrupted" + echo " --force Overwrite existing objects in destination" + echo " --workers Number of concurrent workers (default: 5)" + echo "" + read -r -p "Press ENTER to acknowledge and continue..." + echo "SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=pending_migration" >> .env + else + echo "" + echo "⚠️ You chose to ACCEPT DATA LOSS." + echo "Existing session recordings will not be accessible after upgrade." + echo "" + read -r -p "Are you sure? This cannot be undone. [y/N] " confirm + if [[ "$confirm" =~ ^([yY][eE][sS]|[yY])+$ ]] + then + echo "SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=data_loss_accepted" >> .env + echo "✅ Proceeding without migration." + else + echo "Upgrade cancelled. No changes made." + exit 1 + fi + fi + source .env +fi + export POSTHOG_APP_TAG="${POSTHOG_APP_TAG:-latest-release}" cd posthog diff --git a/common/hogli/manifest.yaml b/common/hogli/manifest.yaml index 2933d938ac..f31dd621ca 100644 --- a/common/hogli/manifest.yaml +++ b/common/hogli/manifest.yaml @@ -215,6 +215,20 @@ deploy: bin_script: upgrade-hobby description: Upgrade existing hobby deployment with data loss warnings and volume checks + deploy:migrate-recordings: + bin_script: migrate-session-recordings-hobby + description: Migrate session recordings from MinIO to SeaweedFS for hobby deployments + deploy:migrate-storage: + bin_script: migrate-minio-to-seaweedfs + description: "Migrate or sync object storage between MinIO and SeaweedFS\n\nServices:\ + \ session-recordings, session-recordings-lts, query-cache, \n media-uploads,\ + \ exports, source-maps\n\nModes:\n --mode migrate One-way copy (default)\n\ + \ --mode sync Bidirectional sync\n\nExamples:\n hogli deploy:migrate-storage\ + \ --service query-cache --mode sync --dry-run\n hogli deploy:migrate-storage\ + \ --service media-uploads --mode migrate" + deploy:migrate-storage-hobby: + bin_script: migrate-storage-hobby + description: "Hobby: Migrate or sync ALL services between MinIO and SeaweedFS via docker-compose\n\nExamples:\n hogli deploy:migrate-storage-hobby # Sync all services\n hogli deploy:migrate-storage-hobby --mode migrate # One-way migrate all (MinIO → SeaweedFS)\n hogli deploy:migrate-storage-hobby exports -n # Dry run exports only" build: build:frontend: cmd: pnpm --filter=@posthog/frontend build @@ -417,7 +431,11 @@ tools: hidden: true create:notebook:node: bin_script: create-notebook-node.sh - description: 'Create a new NotebookNode file and update types and editor references' + description: Create a new NotebookNode file and update types and editor references + hidden: true + sync:storage: + bin_script: sync-storage + description: 'TODO: add description for sync-storage' hidden: true utilities: utilities:posthog-worktree: diff --git a/common/ingestion/acceptance_tests/utils.py b/common/ingestion/acceptance_tests/utils.py index 2d6b416055..55b94314a2 100644 --- a/common/ingestion/acceptance_tests/utils.py +++ b/common/ingestion/acceptance_tests/utils.py @@ -10,7 +10,7 @@ def get_service_url(service: str = "proxy") -> str: service_urls = { "proxy": "http://localhost:8010", - "s3": "http://localhost:19000", + "s3": "http://localhost:8333", "clickhouse": "http://localhost:8123", } diff --git a/docker-compose.base.yml b/docker-compose.base.yml index db4efc99d1..2ca16079bc 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -485,6 +485,14 @@ services: volumes: - '/var/run/docker.sock:/var/run/docker.sock' + seaweedfs: + container_name: '${SEAWEEDFS_DOCKER_NAME:-seaweedfs-main}' + image: chrislusf/seaweedfs:latest + ports: + - '127.0.0.1:8333:8333' # S3 API + - '127.0.0.1:9333:9333' # Master server (for admin UI) + command: server -s3 -s3.port=8333 -dir=/data + networks: otel_network: driver: bridge diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 896a0e1071..46945cfd91 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -271,9 +271,20 @@ services: file: docker-compose.base.yml service: localstack + seaweedfs: + extends: + file: docker-compose.base.yml + service: seaweedfs + command: server -s3 -s3.port=8333 -filer -dir=/data + ports: + - '127.0.0.1:8888:8888' # SeaweedFS Filer UI + volumes: + - 'seaweedfs-data:/data' + networks: otel_network: driver: bridge volumes: redpanda-data: + seaweedfs-data: diff --git a/docker-compose.hobby.yml b/docker-compose.hobby.yml index b2d9cbeb03..3123ed0f54 100644 --- a/docker-compose.hobby.yml +++ b/docker-compose.hobby.yml @@ -85,7 +85,9 @@ services: OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000 - SESSION_RECORDING_V2_S3_ENDPOINT: http://objectstorage:19000 + SESSION_RECORDING_V2_S3_ENDPOINT: http://seaweedfs:8333 + SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any' + SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any' OBJECT_STORAGE_ENABLED: true ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS image: $REGISTRY_URL:$POSTHOG_APP_TAG @@ -103,10 +105,10 @@ services: SECRET_KEY: $POSTHOG_SECRET OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' - SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user' - SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password' + SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any' + SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any' OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000 - SESSION_RECORDING_V2_S3_ENDPOINT: http://objectstorage:19000 + SESSION_RECORDING_V2_S3_ENDPOINT: http://seaweedfs:8333 OBJECT_STORAGE_ENABLED: true ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS OTEL_SERVICE_NAME: 'posthog' @@ -118,6 +120,7 @@ services: - clickhouse - kafka - objectstorage + - seaweedfs plugins: extends: @@ -129,10 +132,10 @@ services: SECRET_KEY: $POSTHOG_SECRET OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' - SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user' - SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password' + SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any' + SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any' OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000 - SESSION_RECORDING_V2_S3_ENDPOINT: http://objectstorage:19000 + SESSION_RECORDING_V2_S3_ENDPOINT: http://seaweedfs:8333 OBJECT_STORAGE_ENABLED: true CDP_REDIS_HOST: redis7 CDP_REDIS_PORT: 6379 @@ -148,6 +151,7 @@ services: - clickhouse - kafka - objectstorage + - seaweedfs proxy: extends: @@ -175,6 +179,17 @@ services: - '19000:19000' - '19001:19001' + seaweedfs: + extends: + file: docker-compose.base.yml + service: seaweedfs + restart: on-failure + volumes: + - seaweedfs:/data + ports: + - '8333:8333' + - '9333:9333' + asyncmigrationscheck: extends: file: docker-compose.base.yml @@ -234,6 +249,7 @@ services: - clickhouse - kafka - objectstorage + - seaweedfs - temporal cyclotron-janitor: @@ -298,6 +314,7 @@ volumes: zookeeper-datalog: zookeeper-logs: objectstorage: + seaweedfs: postgres-data: clickhouse-data: caddy-data: diff --git a/plugin-server/bin/migrate-minio-to-seaweedfs.js b/plugin-server/bin/migrate-minio-to-seaweedfs.js new file mode 100755 index 0000000000..da1e809496 --- /dev/null +++ b/plugin-server/bin/migrate-minio-to-seaweedfs.js @@ -0,0 +1,961 @@ +#!/usr/bin/env node + +/** + * Migration and sync script for object storage between MinIO and SeaweedFS + * + * This script supports: + * - One-way migration (MinIO → SeaweedFS or SeaweedFS → MinIO) + * - Bidirectional sync (keeps both storages in sync) + * - Multiple services (session recordings, exports, media uploads, etc.) + * + * Usage: + * node bin/migrate-minio-to-seaweedfs.js [options] + * + * Options: + * --service Service to migrate (default: session-recordings) + * --mode Mode: migrate | sync (default: migrate) + * --force Overwrite existing objects in destination + * --dry-run Show what would be migrated without copying + * --workers Number of concurrent workers (default: 5) + * --resume Resume from last checkpoint + * --revert Copy from SeaweedFS back to MinIO (reverse direction) + * --conflict Conflict resolution: newest | largest | skip (default: newest) + * --help Show this help message + * + * Modes: + * migrate One-way copy from source to destination + * sync Bidirectional sync - copies missing objects in both directions + * + * Services: + * session-recordings Session recording blobs (V2) + * session-recordings-lts Long-term storage session recordings + * query-cache Query result cache + * media-uploads User uploaded media + * exports Exported assets (CSV, PNG, PDF, videos) + * source-maps Error tracking source maps + */ + +const { + S3Client, + ListObjectsV2Command, + GetObjectCommand, + PutObjectCommand, + HeadObjectCommand, + CreateBucketCommand, +} = require('@aws-sdk/client-s3') +const { existsSync, readFileSync, writeFileSync } = require('fs') + +// Service configurations +const SERVICES = { + 'session-recordings': { + bucket: 'posthog', + prefix: 'session_recordings/', + description: 'Session recording blobs (V2)', + bidirectional: true, + conflictResolution: 'newest', + }, + 'session-recordings-lts': { + bucket: 'posthog', + prefix: 'session_recordings_lts/', + description: 'Long-term storage session recordings', + bidirectional: true, + conflictResolution: 'newest', + }, + 'query-cache': { + bucket: 'posthog', + prefix: 'query_cache/', + description: 'Query result cache (ephemeral)', + bidirectional: true, + conflictResolution: 'skip', // Cache can be regenerated + }, + 'media-uploads': { + bucket: 'posthog', + prefix: 'media_uploads/', + description: 'User uploaded media files', + bidirectional: true, + conflictResolution: 'largest', // Keep largest to avoid corrupted files + critical: true, + }, + exports: { + bucket: 'posthog', + prefix: 'exports/', + description: 'Exported assets (CSV, PNG, PDF, videos)', + bidirectional: true, + conflictResolution: 'newest', + critical: true, + }, + 'source-maps': { + bucket: 'posthog', + prefix: 'symbolsets/', + description: 'Error tracking source maps', + bidirectional: true, + conflictResolution: 'newest', + critical: true, + }, +} + +// Checkpoint file for resumable migrations +const CHECKPOINT_FILE = '.migration-checkpoint.json' + +class Checkpoint { + constructor(serviceName) { + this.serviceName = serviceName + this.data = this.load() + } + + load() { + if (existsSync(CHECKPOINT_FILE)) { + try { + return JSON.parse(readFileSync(CHECKPOINT_FILE, 'utf-8')) + } catch (err) { + console.warn('⚠️ Failed to load checkpoint, starting fresh') + return {} + } + } + return {} + } + + save() { + writeFileSync(CHECKPOINT_FILE, JSON.stringify(this.data, null, 2)) + } + + getServiceData(serviceName) { + if (!this.data[serviceName]) { + this.data[serviceName] = { + completed: [], + failed: {}, + lastKey: null, + startTime: Date.now(), + } + } + return this.data[serviceName] + } + + markCompleted(serviceName, key) { + const data = this.getServiceData(serviceName) + data.completed.push(key) + data.lastKey = key + } + + markFailed(serviceName, key, error) { + const data = this.getServiceData(serviceName) + data.failed[key] = error.message || String(error) + } + + markSkipped(serviceName) { + const data = this.getServiceData(serviceName) + data.skipped = (data.skipped || 0) + 1 + } + + isCompleted(serviceName, key) { + const data = this.getServiceData(serviceName) + return data.completed.includes(key) + } + + getLastKey(serviceName) { + const data = this.getServiceData(serviceName) + return data.lastKey + } +} + +class ProgressTracker { + constructor(total) { + this.total = total + this.completed = 0 + this.failed = 0 + this.skipped = 0 + this.bytesTransferred = 0 + this.startTime = Date.now() + } + + increment(type, bytes = 0) { + this[type]++ + if (bytes > 0) { + this.bytesTransferred += bytes + } + this.print() + } + + print() { + const processed = this.completed + this.failed + this.skipped + const percent = Math.round((processed / this.total) * 100) + const elapsed = (Date.now() - this.startTime) / 1000 + const rate = elapsed > 0 ? processed / elapsed : 0 + const remaining = rate > 0 ? (this.total - processed) / rate : 0 + const mbTransferred = (this.bytesTransferred / 1024 / 1024).toFixed(2) + const mbPerSec = elapsed > 0 ? (this.bytesTransferred / 1024 / 1024 / elapsed).toFixed(2) : '0.00' + + const barLength = 30 + const filledLength = Math.floor((percent / 100) * barLength) + const bar = '█'.repeat(filledLength) + '░'.repeat(barLength - filledLength) + + process.stdout.write( + `\r[${bar}] ${percent}% | ${processed}/${this.total} objects | ` + + `⏱ ${Math.floor(elapsed)}s | Est. ${Math.floor(remaining)}s remaining | ` + + `📊 ${mbTransferred} MB (${mbPerSec} MB/s) | ✓ ${this.completed} | ✗ ${this.failed} | ⊘ ${this.skipped}` + ) + } + + finish() { + process.stdout.write('\n') + } +} + +function parseArgs() { + const args = process.argv.slice(2) + const options = { + service: 'session-recordings', + mode: 'migrate', + force: false, + dryRun: false, + workers: 5, + resume: false, + revert: false, + conflictResolution: 'newest', + } + + for (let i = 0; i < args.length; i++) { + switch (args[i]) { + case '--service': + options.service = args[++i] + break + case '--mode': + options.mode = args[++i] + if (!['migrate', 'sync'].includes(options.mode)) { + console.error(`Invalid mode: ${options.mode}. Must be 'migrate' or 'sync'`) + process.exit(1) + } + break + case '--force': + options.force = true + break + case '--dry-run': + options.dryRun = true + break + case '--workers': + options.workers = parseInt(args[++i], 10) + break + case '--resume': + options.resume = true + break + case '--revert': + options.revert = true + break + case '--conflict': + options.conflictResolution = args[++i] + if (!['newest', 'largest', 'skip'].includes(options.conflictResolution)) { + console.error( + `Invalid conflict resolution: ${options.conflictResolution}. Must be 'newest', 'largest', or 'skip'` + ) + process.exit(1) + } + break + case '--help': + console.log(__doc__) + process.exit(0) + default: + console.error(`Unknown option: ${args[i]}`) + process.exit(1) + } + } + + return options +} + +function ensureLocalDevelopmentOnly(minioEndpoint, seaweedfsEndpoint) { + // Check endpoints are localhost + const localPatterns = [/^https?:\/\/(localhost|127\.0\.0\.1|::1)(:\d+)?/, /^https?:\/\/[^.]+:\d+$/] + + const isMinioLocal = localPatterns.some((p) => p.test(minioEndpoint)) + const isSeaweedFSLocal = localPatterns.some((p) => p.test(seaweedfsEndpoint)) + + if (!isMinioLocal || !isSeaweedFSLocal) { + console.error('❌ SAFETY CHECK FAILED: Non-local endpoint detected') + console.error(` MinIO endpoint: ${minioEndpoint}`) + console.error(` SeaweedFS endpoint: ${seaweedfsEndpoint}`) + console.error(' This script is for LOCAL DEVELOPMENT ONLY.') + process.exit(1) + } + + // Check NODE_ENV or DEBUG + const nodeEnv = process.env.NODE_ENV?.toLowerCase() + const isDebug = ['y', 'yes', 't', 'true', 'on', '1'].includes(String(process.env.DEBUG).toLowerCase()) + + let isDev = false + if (nodeEnv) { + isDev = nodeEnv.startsWith('dev') || nodeEnv.startsWith('test') + } else if (isDebug) { + isDev = true + } + + if (!isDev) { + console.error('❌ SAFETY CHECK FAILED: Not running in development environment') + console.error(` NODE_ENV: ${process.env.NODE_ENV || 'not set'}`) + console.error(` DEBUG: ${process.env.DEBUG || 'not set'}`) + console.error(' This script is for LOCAL DEVELOPMENT ONLY.') + console.error(' Set NODE_ENV=development or DEBUG=1 to run.') + process.exit(1) + } + + // Check for AWS production indicators + const awsIndicators = ['AWS_EXECUTION_ENV', 'AWS_LAMBDA_FUNCTION_NAME', 'ECS_CONTAINER_METADATA_URI'] + const foundIndicators = awsIndicators.filter((key) => process.env[key]) + + if (foundIndicators.length > 0) { + console.error('❌ SAFETY CHECK FAILED: AWS production environment detected') + console.error(` Found: ${foundIndicators.join(', ')}`) + console.error(' This script is for LOCAL DEVELOPMENT ONLY.') + process.exit(1) + } + + console.log('✅ Safety checks passed: Local development environment confirmed') +} + +async function createS3Client(endpoint, accessKeyId, secretAccessKey) { + return new S3Client({ + endpoint, + region: 'us-east-1', + credentials: { + accessKeyId, + secretAccessKey, + }, + forcePathStyle: true, + }) +} + +async function ensureBucketExists(client, bucket) { + try { + // Try to list objects to check if bucket exists + await client.send(new ListObjectsV2Command({ Bucket: bucket, MaxKeys: 1 })) + console.log(`✅ Bucket '${bucket}' exists`) + } catch (err) { + if (err.name === 'NoSuchBucket' || err.name === 'NotFound' || err.$metadata?.httpStatusCode === 404) { + console.log(`📦 Creating bucket '${bucket}'...`) + try { + await client.send(new CreateBucketCommand({ Bucket: bucket })) + console.log(`✅ Bucket '${bucket}' created`) + } catch (createErr) { + // Ignore error if bucket already exists (race condition) + if (createErr.name === 'BucketAlreadyOwnedByYou' || createErr.name === 'BucketAlreadyExists') { + console.log(`✅ Bucket '${bucket}' already exists`) + } else { + console.error(`❌ Failed to create bucket '${bucket}':`, createErr.message) + throw createErr + } + } + } else { + // Ignore other errors (bucket might exist) + console.log(`✅ Assuming bucket '${bucket}' exists`) + } + } +} + +async function testConnectivity(client, endpoint, bucket) { + try { + const command = new ListObjectsV2Command({ Bucket: bucket, MaxKeys: 1 }) + await client.send(command) + console.log(`✅ Connected to ${endpoint}`) + return true + } catch (err) { + console.error(`❌ Cannot connect to ${endpoint} bucket '${bucket}': ${err.message}`) + return false + } +} + +async function objectExists(client, bucket, key) { + try { + await client.send(new HeadObjectCommand({ Bucket: bucket, Key: key })) + return true + } catch (err) { + if (err.name === 'NotFound' || err.$metadata?.httpStatusCode === 404) { + return false + } + throw err + } +} + +async function getObjectMetadata(client, bucket, key) { + try { + const response = await client.send(new HeadObjectCommand({ Bucket: bucket, Key: key })) + return { + Size: response.ContentLength, + LastModified: response.LastModified, + ETag: response.ETag, + } + } catch (err) { + if (err.name === 'NotFound' || err.$metadata?.httpStatusCode === 404) { + return null + } + throw err + } +} + +async function objectsAreSame(sourceClient, destClient, bucket, key) { + const [sourceMetadata, destMetadata] = await Promise.all([ + getObjectMetadata(sourceClient, bucket, key), + getObjectMetadata(destClient, bucket, key), + ]) + + // If destination doesn't exist, they're not the same + if (!destMetadata) { + return false + } + + // If source doesn't exist (shouldn't happen), consider them different + if (!sourceMetadata) { + return false + } + + // Use the needsSync logic (inverted) + return !needsSync(sourceMetadata, destMetadata) +} + +async function copyObject(sourceClient, destClient, bucket, key) { + const getCommand = new GetObjectCommand({ Bucket: bucket, Key: key }) + const response = await sourceClient.send(getCommand) + + const chunks = [] + for await (const chunk of response.Body) { + chunks.push(chunk) + } + const buffer = Buffer.concat(chunks) + + const putCommand = new PutObjectCommand({ + Bucket: bucket, + Key: key, + Body: buffer, + ContentType: response.ContentType || 'application/octet-stream', + }) + await destClient.send(putCommand) + + return { size: buffer.length } +} + +async function listAllObjectsWithMetadata(client, bucket, prefix) { + const allObjects = [] + let continuationToken = undefined + + do { + const command = new ListObjectsV2Command({ + Bucket: bucket, + Prefix: prefix, + ContinuationToken: continuationToken, + }) + const response = await client.send(command) + + if (response.Contents) { + allObjects.push(...response.Contents) + } + + continuationToken = response.NextContinuationToken + } while (continuationToken) + + return allObjects +} + +function needsSync(objA, objB) { + // Different size = different content + if (objA.Size !== objB.Size) return true + + // Different modification time (with tolerance of 2 seconds for clock skew) + const timeDiff = Math.abs(new Date(objA.LastModified) - new Date(objB.LastModified)) + if (timeDiff > 2000) return true + + // ETag comparison if available (not all S3 implementations provide this) + if (objA.ETag && objB.ETag && objA.ETag !== objB.ETag) return true + + return false +} + +async function resolveConflict(conflict, strategy, minioClient, seaweedfsClient, bucket) { + const { key, minioObj, seaweedfsObj } = conflict + + let winnerObj, winnerClient, loserClient, winnerName + + switch (strategy) { + case 'newest': + const minioTime = new Date(minioObj.LastModified) + const seaweedfsTime = new Date(seaweedfsObj.LastModified) + if (minioTime > seaweedfsTime) { + winnerObj = minioObj + winnerClient = minioClient + loserClient = seaweedfsClient + winnerName = 'MinIO' + } else { + winnerObj = seaweedfsObj + winnerClient = seaweedfsClient + loserClient = minioClient + winnerName = 'SeaweedFS' + } + break + + case 'largest': + if (minioObj.Size > seaweedfsObj.Size) { + winnerObj = minioObj + winnerClient = minioClient + loserClient = seaweedfsClient + winnerName = 'MinIO' + } else { + winnerObj = seaweedfsObj + winnerClient = seaweedfsClient + loserClient = minioClient + winnerName = 'SeaweedFS' + } + break + + case 'skip': + return { action: 'skipped', key } + + default: + throw new Error(`Unknown conflict resolution strategy: ${strategy}`) + } + + // Copy winner to loser + await copyObject(winnerClient, loserClient, bucket, key) + return { action: 'resolved', key, winner: winnerName, size: winnerObj.Size } +} + +async function migrateService(serviceName, config, options, checkpoint) { + const direction = options.revert ? 'SeaweedFS → MinIO' : 'MinIO → SeaweedFS' + const sourceName = options.revert ? 'SeaweedFS' : 'MinIO' + const destName = options.revert ? 'MinIO' : 'SeaweedFS' + + console.log(`\n${'='.repeat(80)}`) + console.log(`📦 Migrating: ${serviceName}`) + console.log(` ${config.description}`) + console.log(` Direction: ${direction}`) + console.log(` Bucket: ${config.bucket}`) + console.log(` Prefix: ${config.prefix}`) + console.log(`${'='.repeat(80)}\n`) + + // Resolve endpoints and credentials from env (fallback to localhost defaults) + const MINIO_ENDPOINT = process.env.OBJECT_STORAGE_ENDPOINT || 'http://localhost:19000' + const MINIO_KEY = process.env.OBJECT_STORAGE_ACCESS_KEY_ID || 'object_storage_root_user' + const MINIO_SECRET = process.env.OBJECT_STORAGE_SECRET_ACCESS_KEY || 'object_storage_root_password' + + const SW_ENDPOINT = + process.env.SESSION_RECORDING_V2_S3_ENDPOINT || process.env.SEAWEEDFS_ENDPOINT || 'http://localhost:8333' + const SW_KEY = process.env.SESSION_RECORDING_V2_S3_ACCESS_KEY_ID || process.env.SEAWEEDFS_ACCESS_KEY_ID || 'any' + const SW_SECRET = + process.env.SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY || process.env.SEAWEEDFS_SECRET_ACCESS_KEY || 'any' + + // Create S3 clients + const minioClient = await createS3Client(MINIO_ENDPOINT, MINIO_KEY, MINIO_SECRET) + const seaweedfsClient = await createS3Client(SW_ENDPOINT, SW_KEY, SW_SECRET) + + // Determine source and destination based on direction + const sourceClient = options.revert ? seaweedfsClient : minioClient + const destClient = options.revert ? minioClient : seaweedfsClient + + // Ensure bucket exists in destination + await ensureBucketExists(destClient, config.bucket) + + // Test connectivity + const minioOk = await testConnectivity(minioClient, 'MinIO', config.bucket) + const seaweedfsOk = await testConnectivity(seaweedfsClient, 'SeaweedFS', config.bucket) + + if (!minioOk || !seaweedfsOk) { + throw new Error('Failed to connect to storage backends') + } + + // List objects from source + console.log(`📋 Listing objects from ${sourceName}...`) + let allObjects = [] + let continuationToken = undefined + + do { + const command = new ListObjectsV2Command({ + Bucket: config.bucket, + Prefix: config.prefix, + ContinuationToken: continuationToken, + }) + const response = await sourceClient.send(command) + + if (response.Contents) { + allObjects = allObjects.concat(response.Contents) + } + + continuationToken = response.NextContinuationToken + } while (continuationToken) + + console.log(`✅ Found ${allObjects.length} objects`) + + if (allObjects.length === 0) { + console.log('✨ No objects to migrate') + return + } + + // Filter objects based on checkpoint and resume + let objectsToProcess = allObjects + if (options.resume) { + const lastKey = checkpoint.getLastKey(serviceName) + if (lastKey) { + const lastIndex = allObjects.findIndex((obj) => obj.Key === lastKey) + if (lastIndex >= 0) { + objectsToProcess = allObjects.slice(lastIndex + 1) + console.log(`📍 Resuming from key: ${lastKey}`) + console.log(` ${objectsToProcess.length} objects remaining`) + } + } + } + + if (options.dryRun) { + console.log('\n🔍 DRY RUN MODE - No objects will be copied\n') + console.log('Objects that would be migrated:') + objectsToProcess.slice(0, 10).forEach((obj) => { + console.log(` - ${obj.Key} (${(obj.Size / 1024).toFixed(2)} KB)`) + }) + if (objectsToProcess.length > 10) { + console.log(` ... and ${objectsToProcess.length - 10} more`) + } + return + } + + // Copy objects with progress tracking + console.log(`\n🚀 Starting migration with ${options.workers} workers...\n`) + const progress = new ProgressTracker(objectsToProcess.length) + const failedObjects = [] + + // Worker pool + const workers = [] + const queue = [...objectsToProcess] + + for (let i = 0; i < options.workers; i++) { + workers.push( + (async () => { + while (true) { + const obj = queue.shift() + if (!obj) break + + try { + // Check if objects are identical (same size, timestamp, content) + const areSame = await objectsAreSame(sourceClient, destClient, config.bucket, obj.Key) + if (areSame && !options.force) { + checkpoint.markSkipped(serviceName) + progress.increment('skipped') + } else { + const result = await copyObject(sourceClient, destClient, config.bucket, obj.Key) + checkpoint.markCompleted(serviceName, obj.Key) + progress.increment('completed', result.size) + } + } catch (err) { + checkpoint.markFailed(serviceName, obj.Key, err) + progress.increment('failed') + failedObjects.push({ key: obj.Key, error: err.message }) + } + + // Save checkpoint every 100 objects + if ((progress.completed + progress.failed + progress.skipped) % 100 === 0) { + checkpoint.save() + } + } + })() + ) + } + + await Promise.all(workers) + progress.finish() + + // Final checkpoint save + checkpoint.save() + + // Summary + console.log(`\n${'='.repeat(80)}`) + console.log('📊 Migration Summary') + console.log(`${'='.repeat(80)}`) + console.log(`✅ Completed: ${progress.completed}`) + console.log(`⊘ Skipped: ${progress.skipped} (already identical in destination)`) + console.log(`✗ Failed: ${progress.failed}`) + console.log(`📦 Data transferred: ${(progress.bytesTransferred / 1024 / 1024).toFixed(2)} MB`) + console.log(`⏱ Total time: ${Math.floor((Date.now() - progress.startTime) / 1000)}s`) + + if (failedObjects.length > 0) { + console.log(`\n❌ Failed objects:`) + failedObjects.slice(0, 10).forEach((obj) => { + console.log(` ${obj.key}: ${obj.error}`) + }) + if (failedObjects.length > 10) { + console.log(` ... and ${failedObjects.length - 10} more`) + } + console.log(`\n💡 Run with --resume to retry failed objects`) + } +} + +async function syncService(serviceName, config, options, checkpoint) { + console.log(`\n${'='.repeat(80)}`) + console.log(`🔄 Syncing: ${serviceName}`) + console.log(` ${config.description}`) + console.log(` Mode: Bidirectional Sync`) + console.log(` Bucket: ${config.bucket}`) + console.log(` Prefix: ${config.prefix}`) + console.log(`${'='.repeat(80)}\n`) + + // Create S3 clients + const minioClient = await createS3Client( + 'http://localhost:19000', + 'object_storage_root_user', + 'object_storage_root_password' + ) + const seaweedfsClient = await createS3Client('http://localhost:8333', 'any', 'any') + + // Ensure bucket exists in both + await ensureBucketExists(minioClient, config.bucket) + await ensureBucketExists(seaweedfsClient, config.bucket) + + // Test connectivity + const minioOk = await testConnectivity(minioClient, 'MinIO', config.bucket) + const seaweedfsOk = await testConnectivity(seaweedfsClient, 'SeaweedFS', config.bucket) + + if (!minioOk || !seaweedfsOk) { + throw new Error('Failed to connect to storage backends') + } + + // List objects from both sides + console.log(`📋 Listing objects from both storages...`) + const [minioObjects, seaweedfsObjects] = await Promise.all([ + listAllObjectsWithMetadata(minioClient, config.bucket, config.prefix), + listAllObjectsWithMetadata(seaweedfsClient, config.bucket, config.prefix), + ]) + + console.log(`✅ MinIO: ${minioObjects.length} objects`) + console.log(`✅ SeaweedFS: ${seaweedfsObjects.length} objects`) + + // Build key maps + const minioMap = new Map(minioObjects.map((o) => [o.Key, o])) + const seaweedfsMap = new Map(seaweedfsObjects.map((o) => [o.Key, o])) + + // Find differences + const onlyInMinio = [] + const onlyInSeaweedfs = [] + const conflicts = [] + + for (const [key, minioObj] of minioMap) { + if (!seaweedfsMap.has(key)) { + onlyInMinio.push(minioObj) + } else { + const seaweedfsObj = seaweedfsMap.get(key) + if (needsSync(minioObj, seaweedfsObj)) { + conflicts.push({ key, minioObj, seaweedfsObj }) + } + } + } + + for (const [key, seaweedfsObj] of seaweedfsMap) { + if (!minioMap.has(key)) { + onlyInSeaweedfs.push(seaweedfsObj) + } + } + + // Calculate objects already in sync + const totalObjects = minioObjects.length + seaweedfsObjects.length + const inBothStorages = minioObjects.filter((obj) => seaweedfsMap.has(obj.Key)).length + const alreadyInSync = inBothStorages - conflicts.length + + console.log(`\n📊 Sync Analysis:`) + console.log(` Total objects: ${totalObjects}`) + console.log(` Already in sync: ${alreadyInSync} ✓`) + console.log(` MinIO → SeaweedFS: ${onlyInMinio.length} objects`) + console.log(` SeaweedFS → MinIO: ${onlyInSeaweedfs.length} objects`) + console.log(` Conflicts to resolve: ${conflicts.length} objects`) + + const totalOperations = onlyInMinio.length + onlyInSeaweedfs.length + conflicts.length + + if (totalOperations === 0) { + console.log(`\n✨ Storages are already in sync! No changes needed.`) + return + } + + if (options.dryRun) { + console.log('\n🔍 DRY RUN MODE - No objects will be copied\n') + if (onlyInMinio.length > 0) { + console.log('Would copy MinIO → SeaweedFS:') + onlyInMinio.slice(0, 5).forEach((obj) => { + console.log(` - ${obj.Key} (${(obj.Size / 1024).toFixed(2)} KB)`) + }) + if (onlyInMinio.length > 5) console.log(` ... and ${onlyInMinio.length - 5} more`) + } + if (onlyInSeaweedfs.length > 0) { + console.log('\nWould copy SeaweedFS → MinIO:') + onlyInSeaweedfs.slice(0, 5).forEach((obj) => { + console.log(` - ${obj.Key} (${(obj.Size / 1024).toFixed(2)} KB)`) + }) + if (onlyInSeaweedfs.length > 5) console.log(` ... and ${onlyInSeaweedfs.length - 5} more`) + } + if (conflicts.length > 0) { + const resolution = options.conflictResolution || config.conflictResolution + console.log(`\nWould resolve ${conflicts.length} conflicts using strategy: ${resolution}`) + conflicts.slice(0, 3).forEach((c) => { + console.log(` - ${c.key}:`) + console.log(` MinIO: ${(c.minioObj.Size / 1024).toFixed(2)} KB, ${c.minioObj.LastModified}`) + console.log( + ` SeaweedFS: ${(c.seaweedfsObj.Size / 1024).toFixed(2)} KB, ${c.seaweedfsObj.LastModified}` + ) + }) + if (conflicts.length > 3) console.log(` ... and ${conflicts.length - 3} more`) + } + return + } + + // Sync in both directions + console.log(`\n🚀 Starting bidirectional sync with ${options.workers} workers...\n`) + const progress = new ProgressTracker(totalOperations) + const failedObjects = [] + + // Copy MinIO → SeaweedFS + console.log(`📤 Copying ${onlyInMinio.length} objects from MinIO to SeaweedFS...`) + const queue1 = [...onlyInMinio] + const workers1 = [] + for (let i = 0; i < options.workers; i++) { + workers1.push( + (async () => { + while (true) { + const obj = queue1.shift() + if (!obj) break + try { + const result = await copyObject(minioClient, seaweedfsClient, config.bucket, obj.Key) + progress.increment('completed', result.size) + } catch (err) { + progress.increment('failed') + failedObjects.push({ key: obj.Key, error: err.message, direction: 'MinIO→SeaweedFS' }) + } + } + })() + ) + } + await Promise.all(workers1) + + // Copy SeaweedFS → MinIO + console.log(`📥 Copying ${onlyInSeaweedfs.length} objects from SeaweedFS to MinIO...`) + const queue2 = [...onlyInSeaweedfs] + const workers2 = [] + for (let i = 0; i < options.workers; i++) { + workers2.push( + (async () => { + while (true) { + const obj = queue2.shift() + if (!obj) break + try { + const result = await copyObject(seaweedfsClient, minioClient, config.bucket, obj.Key) + progress.increment('completed', result.size) + } catch (err) { + progress.increment('failed') + failedObjects.push({ key: obj.Key, error: err.message, direction: 'SeaweedFS→MinIO' }) + } + } + })() + ) + } + await Promise.all(workers2) + + // Resolve conflicts + if (conflicts.length > 0) { + const resolution = options.conflictResolution || config.conflictResolution + console.log(`\n⚔️ Resolving ${conflicts.length} conflicts using strategy: ${resolution}...`) + const queue3 = [...conflicts] + const workers3 = [] + for (let i = 0; i < options.workers; i++) { + workers3.push( + (async () => { + while (true) { + const conflict = queue3.shift() + if (!conflict) break + try { + const result = await resolveConflict( + conflict, + resolution, + minioClient, + seaweedfsClient, + config.bucket + ) + if (result.action === 'skipped') { + progress.increment('skipped') + } else { + progress.increment('completed', result.size) + } + } catch (err) { + progress.increment('failed') + failedObjects.push({ key: conflict.key, error: err.message, direction: 'conflict' }) + } + } + })() + ) + } + await Promise.all(workers3) + } + + progress.finish() + + // Summary + console.log(`\n${'='.repeat(80)}`) + console.log('📊 Sync Summary') + console.log(`${'='.repeat(80)}`) + console.log(`✅ Synced: ${progress.completed}`) + console.log(`⊘ Skipped: ${progress.skipped}`) + console.log(`✗ Failed: ${progress.failed}`) + console.log(`📦 Data transferred: ${(progress.bytesTransferred / 1024 / 1024).toFixed(2)} MB`) + console.log(`⏱ Total time: ${Math.floor((Date.now() - progress.startTime) / 1000)}s`) + + if (failedObjects.length > 0) { + console.log(`\n❌ Failed objects:`) + failedObjects.slice(0, 10).forEach((obj) => { + console.log(` [${obj.direction}] ${obj.key}: ${obj.error}`) + }) + if (failedObjects.length > 10) { + console.log(` ... and ${failedObjects.length - 10} more`) + } + } + + if (config.critical && failedObjects.length > 0) { + console.log(`\n⚠️ WARNING: This is a CRITICAL service and ${failedObjects.length} objects failed to sync!`) + } +} + +async function main() { + const options = parseArgs() + + let title + if (options.mode === 'sync') { + title = 'Bidirectional Storage Sync Tool' + } else { + title = options.revert ? 'SeaweedFS to MinIO Migration Tool' : 'MinIO to SeaweedFS Migration Tool' + } + console.log(`🔄 ${title}`) + console.log('=====================================\n') + + // Validate service + const config = SERVICES[options.service] + if (!config) { + console.error(`❌ Unknown service: ${options.service}`) + console.error(`Available services: ${Object.keys(SERVICES).join(', ')}`) + process.exit(1) + } + + // Safety checks + const minioEndpoint = process.env.OBJECT_STORAGE_ENDPOINT || 'http://localhost:19000' + const seaweedEndpoint = + process.env.SESSION_RECORDING_V2_S3_ENDPOINT || process.env.SEAWEEDFS_ENDPOINT || 'http://localhost:8333' + ensureLocalDevelopmentOnly(minioEndpoint, seaweedEndpoint) + + // Initialize checkpoint + const checkpoint = new Checkpoint(options.service) + + try { + if (options.mode === 'sync') { + if (!config.bidirectional) { + console.warn(`⚠️ Warning: Service '${options.service}' is not configured for bidirectional sync.`) + console.warn(` Proceeding anyway, but this service may not be suitable for sync mode.`) + } + await syncService(options.service, config, options, checkpoint) + console.log('\n✨ Sync completed successfully!\n') + } else { + await migrateService(options.service, config, options, checkpoint) + console.log('\n✨ Migration completed successfully!\n') + } + } catch (err) { + console.error(`\n❌ ${options.mode === 'sync' ? 'Sync' : 'Migration'} failed:`, err.message) + console.error(err.stack) + process.exit(1) + } +} + +main() diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index f536004b7c..dc4a7fefe3 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -276,15 +276,15 @@ export function getDefaultConfig(): PluginsServerConfig { PERSON_JSONB_SIZE_ESTIMATE_ENABLE: 0, // defaults to off // Session recording V2 - SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB + SESSION_RECORDING_MAX_BATCH_SIZE_KB: isDevEnv() ? 2 * 1024 : 100 * 1024, // 2MB on dev, 100MB on prod and test SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds SESSION_RECORDING_V2_S3_BUCKET: 'posthog', SESSION_RECORDING_V2_S3_PREFIX: 'session_recordings', - SESSION_RECORDING_V2_S3_ENDPOINT: 'http://localhost:19000', + SESSION_RECORDING_V2_S3_ENDPOINT: 'http://localhost:8333', SESSION_RECORDING_V2_S3_REGION: 'us-east-1', - SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user', - SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password', - SESSION_RECORDING_V2_S3_TIMEOUT_MS: 30000, + SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any', + SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any', + SESSION_RECORDING_V2_S3_TIMEOUT_MS: isDevEnv() ? 120000 : 30000, SESSION_RECORDING_V2_REPLAY_EVENTS_KAFKA_TOPIC: 'clickhouse_session_replay_events', SESSION_RECORDING_V2_CONSOLE_LOG_ENTRIES_KAFKA_TOPIC: 'log_entries', SESSION_RECORDING_V2_CONSOLE_LOG_STORE_SYNC_BATCH_LIMIT: 1000, diff --git a/posthog/settings/session_replay_v2.py b/posthog/settings/session_replay_v2.py index 9891475426..24ec534b18 100644 --- a/posthog/settings/session_replay_v2.py +++ b/posthog/settings/session_replay_v2.py @@ -6,12 +6,10 @@ from posthog.settings.base_variables import DEBUG, TEST from posthog.settings.utils import str_to_bool if TEST or DEBUG: - SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "http://objectstorage:19000") - SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: Optional[str] = os.getenv( - "SESSION_RECORDING_V2_S3_ACCESS_KEY_ID", "object_storage_root_user" - ) + SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "http://seaweedfs:8333") + SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: Optional[str] = os.getenv("SESSION_RECORDING_V2_S3_ACCESS_KEY_ID", "any") SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: Optional[str] = os.getenv( - "SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY", "object_storage_root_password" + "SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY", "any" ) else: SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "")