feat: migrate replay from minio to seaweedfs (#41019)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Alex V
2025-11-10 16:50:05 +01:00
committed by GitHub
parent 7f3efd6290
commit a227a16675
22 changed files with 1446 additions and 20 deletions

View File

@@ -31,6 +31,12 @@ jobs:
# because we want the Braintrust experiment to have accurate git metadata (on master it's empty) # because we want the Braintrust experiment to have accurate git metadata (on master it's empty)
ref: ${{ github.event.pull_request.head.ref }} ref: ${{ github.event.pull_request.head.ref }}
fetch-depth: 0 fetch-depth: 0
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Stop/Start stack with Docker Compose - name: Stop/Start stack with Docker Compose
run: | run: |

View File

@@ -54,6 +54,13 @@ jobs:
# For pull requests it's not necessary to checkout the code, but we # For pull requests it's not necessary to checkout the code, but we
# also want this to run on master so we need to checkout # also want this to run on master so we need to checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2
id: filter id: filter
@@ -114,6 +121,13 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Stop/Start stack with Docker Compose - name: Stop/Start stack with Docker Compose
run: | run: |
@@ -156,6 +170,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
ref: master ref: master
clean: false
- name: Install python dependencies for master - name: Install python dependencies for master
run: | run: |
@@ -179,6 +194,8 @@ jobs:
# Now we can consider this PR's migrations # Now we can consider this PR's migrations
- name: Checkout this PR - name: Checkout this PR
uses: actions/checkout@v4 uses: actions/checkout@v4
with:
clean: false
- name: Install python dependencies for this PR - name: Install python dependencies for this PR
run: | run: |
@@ -550,6 +567,12 @@ jobs:
ref: ${{ github.event.pull_request.head.ref }} ref: ${{ github.event.pull_request.head.ref }}
# Use PostHog Bot token when not on forks to enable proper snapshot updating # Use PostHog Bot token when not on forks to enable proper snapshot updating
token: ${{ github.event.pull_request.head.repo.full_name == github.repository && secrets.POSTHOG_BOT_PAT || github.token }} token: ${{ github.event.pull_request.head.repo.full_name == github.repository && secrets.POSTHOG_BOT_PAT || github.token }}
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: 'Safeguard: ensure no stray Python modules at product root' - name: 'Safeguard: ensure no stray Python modules at product root'
run: | run: |
@@ -891,6 +914,12 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
fetch-depth: 1 fetch-depth: 1
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Start stack with Docker Compose - name: Start stack with Docker Compose
run: | run: |

View File

@@ -38,6 +38,8 @@ jobs:
# For pull requests it's not necessary to checkout the code, but we # For pull requests it's not necessary to checkout the code, but we
# also want this to run on master so we need to checkout # also want this to run on master so we need to checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2
id: filter id: filter
@@ -71,6 +73,12 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
fetch-depth: 1 fetch-depth: 1
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Start stack with Docker Compose - name: Start stack with Docker Compose
run: | run: |

View File

@@ -124,6 +124,12 @@ jobs:
repository: ${{ github.event.pull_request.head.repo.full_name }} repository: ${{ github.event.pull_request.head.repo.full_name }}
token: ${{ secrets.POSTHOG_BOT_PAT || github.token }} token: ${{ secrets.POSTHOG_BOT_PAT || github.token }}
fetch-depth: 50 # Need enough history for flap detection to find last human commit fetch-depth: 50 # Need enough history for flap detection to find last human commit
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Stop/Start stack with Docker Compose - name: Stop/Start stack with Docker Compose
shell: bash shell: bash

View File

@@ -27,6 +27,13 @@ jobs:
name: Setup DO Hobby Instance and test name: Setup DO Hobby Instance and test
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- uses: actions/setup-python@v5 - uses: actions/setup-python@v5
with: with:
python-version: '3.8' python-version: '3.8'

View File

@@ -32,6 +32,8 @@ jobs:
# For pull requests it's not necessary to checkout the code, but we # For pull requests it's not necessary to checkout the code, but we
# also want this to run on master so we need to checkout # also want this to run on master so we need to checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2
id: filter id: filter
@@ -55,6 +57,8 @@ jobs:
runs-on: depot-ubuntu-24.04-4 runs-on: depot-ubuntu-24.04-4
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- name: Install pnpm - name: Install pnpm
uses: pnpm/action-setup@a7487c7e89a18df4991f7f222e4898a00d66ddda # v4 uses: pnpm/action-setup@a7487c7e89a18df4991f7f222e4898a00d66ddda # v4
@@ -83,6 +87,8 @@ jobs:
runs-on: depot-ubuntu-24.04-4 runs-on: depot-ubuntu-24.04-4
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- name: Install pnpm - name: Install pnpm
uses: pnpm/action-setup@a7487c7e89a18df4991f7f222e4898a00d66ddda # v4 uses: pnpm/action-setup@a7487c7e89a18df4991f7f222e4898a00d66ddda # v4
@@ -133,6 +139,13 @@ jobs:
steps: steps:
- name: Code check out - name: Code check out
uses: actions/checkout@v4 uses: actions/checkout@v4
with:
clean: false
- name: Clean up data directories with container permissions
run: |
# Use docker to clean up files created by containers
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Stop/Start stack with Docker Compose - name: Stop/Start stack with Docker Compose
run: | run: |

View File

@@ -23,6 +23,8 @@ jobs:
# For pull requests it's not necessary to checkout the code, but we # For pull requests it's not necessary to checkout the code, but we
# also want this to run on master so we need to checkout # also want this to run on master so we need to checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
clean: false
- uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2 - uses: dorny/paths-filter@4512585405083f25c027a35db413c2b3b9006d50 # v2
id: filter id: filter
with: with:
@@ -56,6 +58,7 @@ jobs:
with: with:
sparse-checkout: 'rust/' sparse-checkout: 'rust/'
sparse-checkout-cone-mode: false sparse-checkout-cone-mode: false
clean: false
- name: Install rust - name: Install rust
if: needs.changes.outputs.rust == 'true' if: needs.changes.outputs.rust == 'true'
@@ -122,6 +125,15 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
if: needs.changes.outputs.rust == 'true' if: needs.changes.outputs.rust == 'true'
with:
clean: false
- name: Clean up data directories with container permissions
if: needs.changes.outputs.rust == 'true'
working-directory: .
run: |
# Use docker to clean up files created by containers (from repo root, not rust/)
[ -d "data" ] && docker run --rm -v "$(pwd)/data:/data" alpine sh -c "rm -rf /data/seaweedfs /data/minio" || true
continue-on-error: true
- name: Setup main repo and dependencies - name: Setup main repo and dependencies
if: needs.changes.outputs.rust == 'true' if: needs.changes.outputs.rust == 'true'
@@ -233,6 +245,7 @@ jobs:
with: with:
sparse-checkout: 'rust/' sparse-checkout: 'rust/'
sparse-checkout-cone-mode: false sparse-checkout-cone-mode: false
clean: false
- name: Install rust - name: Install rust
if: needs.changes.outputs.rust == 'true' if: needs.changes.outputs.rust == 'true'
@@ -280,6 +293,7 @@ jobs:
with: with:
sparse-checkout: 'rust/' sparse-checkout: 'rust/'
sparse-checkout-cone-mode: false sparse-checkout-cone-mode: false
clean: false
- name: Install cargo-binstall - name: Install cargo-binstall
if: needs.changes.outputs.rust == 'true' if: needs.changes.outputs.rust == 'true'

2
.gitignore vendored
View File

@@ -70,6 +70,8 @@ gen/
max-test-venv/ max-test-venv/
node_modules/ node_modules/
object_storage/ object_storage/
data/seaweedfs/
.migration-checkpoint.json
playwright/e2e-vrt/**/*-darwin.png playwright/e2e-vrt/**/*-darwin.png
pnpm-error.log pnpm-error.log
# pyright config (keep this until we have a standardized one) # pyright config (keep this until we have a standardized one)

12
bin/migrate-minio-to-seaweedfs Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
set -e
# Wrapper script for the MinIO to SeaweedFS migration tool
# This allows calling the migration from the root directory
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
cd "$PROJECT_ROOT/plugin-server"
exec node bin/migrate-minio-to-seaweedfs.js "$@"

View File

@@ -0,0 +1,60 @@
#!/usr/bin/env bash
set -e
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "📦 PostHog Session Recording Migration (MinIO → SeaweedFS)"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
echo "This script will migrate your existing session recordings from"
echo "MinIO to SeaweedFS storage."
echo ""
echo "⚠️ Important:"
echo " - Keep this script running until completion"
echo " - The migration runs in the background and won't block new recordings"
echo " - You can monitor progress in the output"
echo ""
read -r -p "Do you want to start the migration? [y/N] " response
if [[ ! "$response" =~ ^([yY][eE][sS]|[yY])+$ ]]
then
echo "Migration cancelled."
exit 0
fi
echo ""
echo "🚀 Starting migration..."
echo ""
# Run the migration script inside the plugins container
docker-compose run --rm \
-e OBJECT_STORAGE_ENDPOINT=http://objectstorage:19000 \
-e OBJECT_STORAGE_ACCESS_KEY_ID=object_storage_root_user \
-e OBJECT_STORAGE_SECRET_ACCESS_KEY=object_storage_root_password \
-e SESSION_RECORDING_V2_S3_ENDPOINT=http://seaweedfs:8333 \
-e SESSION_RECORDING_V2_S3_ACCESS_KEY_ID=any \
-e SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY=any \
plugins \
node plugin-server/bin/migrate-minio-to-seaweedfs.js "$@"
if [ $? -eq 0 ]; then
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "✅ Migration completed successfully!"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
echo "To mark the migration as complete in your .env file, run:"
echo ""
echo " sed -i 's/SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=pending_migration/SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=completed/' .env"
echo ""
else
echo ""
echo "❌ Migration failed. Check the output above for errors."
echo ""
echo "You can retry by running this script again with --resume:"
echo ""
echo " ./bin/migrate-session-recordings-hobby --resume"
echo ""
exit 1
fi

81
bin/migrate-storage-hobby Executable file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env bash
set -e
# Wrapper script for syncing/migrating object storage between MinIO and SeaweedFS (Hobby)
# Runs inside the plugins container with compose network service names.
# All available services
ALL_SERVICES=(
"query-cache"
"session-recordings"
"session-recordings-lts"
"media-uploads"
"exports"
"source-maps"
)
SERVICE=""
MODE="sync" # default to bidirectional sync; use --mode migrate to one-way
EXTRA_ARGS=()
# Parse args (supports: <service>, --mode, --dry-run, --force, --workers, --resume, --revert, --conflict)
while [[ $# -gt 0 ]]; do
case "$1" in
--mode)
MODE="$2"; shift 2 ;;
--dry-run|--force|--resume|--revert)
EXTRA_ARGS+=("$1"); shift ;;
--workers|--conflict|--service)
EXTRA_ARGS+=("$1" "$2"); shift 2 ;;
-n)
EXTRA_ARGS+=("--dry-run"); shift ;;
query-cache|session-recordings|session-recordings-lts|media-uploads|exports|source-maps)
SERVICE="$1"; shift ;;
*)
echo "Unknown arg: $1"; exit 1 ;;
esac
done
# Build env for endpoints: MinIO (objectstorage) -> source; SeaweedFS -> destination
# We pass these to the script; it will detect via env.
ENV_ARGS=(
-e OBJECT_STORAGE_ENDPOINT=http://objectstorage:19000
-e OBJECT_STORAGE_ACCESS_KEY_ID=object_storage_root_user
-e OBJECT_STORAGE_SECRET_ACCESS_KEY=object_storage_root_password
-e SESSION_RECORDING_V2_S3_ENDPOINT=http://seaweedfs:8333
-e SESSION_RECORDING_V2_S3_ACCESS_KEY_ID=any
-e SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY=any
)
run_service() {
local service="$1"
echo "🔄 ${MODE^^}: $service (MinIO ⇄ SeaweedFS)"
docker-compose run --rm \
"${ENV_ARGS[@]}" \
plugins \
node plugin-server/bin/migrate-minio-to-seaweedfs.js --service "$service" --mode "$MODE" "${EXTRA_ARGS[@]}"
echo ""
}
if [ -z "$SERVICE" ]; then
echo "═══════════════════════════════════════════════════════════════"
echo "🔄 $MODE all services between MinIO and SeaweedFS (Hobby)"
echo "═══════════════════════════════════════════════════════════════"
echo ""
for s in "${ALL_SERVICES[@]}"; do
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
run_service "$s"
done
echo "═══════════════════════════════════════════════════════════════"
echo "✨ Completed: $MODE all services"
echo "═══════════════════════════════════════════════════════════════"
else
run_service "$SERVICE"
echo "✨ Done!"
fi
echo ""
echo "Usage:"
echo " $0 # Sync all services"
echo " $0 --mode migrate # One-way migrate all services (MinIO → SeaweedFS)"
echo " $0 exports -n # Dry run exports only"

View File

@@ -2,7 +2,7 @@
set -e set -e
export OBJECT_STORAGE_ENDPOINT=http://localhost:19000 export OBJECT_STORAGE_ENDPOINT=http://localhost:19000
export SESSION_RECORDING_V2_S3_ENDPOINT=http://localhost:19000 export SESSION_RECORDING_V2_S3_ENDPOINT=http://localhost:8333
export CLICKHOUSE_API_USER="api" export CLICKHOUSE_API_USER="api"
export CLICKHOUSE_API_PASSWORD="apipass" export CLICKHOUSE_API_PASSWORD="apipass"

113
bin/sync-storage Executable file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env bash
set -e
# Wrapper script for syncing object storage between MinIO and SeaweedFS
# This makes it easier to sync different services with sensible defaults
# All available services (for explicit selection)
ALL_SERVICES=(
"query-cache"
"session-recordings"
"session-recordings-lts"
"media-uploads"
"exports"
"source-maps"
)
# Default set when no service is specified (partial rollout):
# Only session recordings V2 and legacy LTS
DEFAULT_SERVICES=(
"session-recordings"
"session-recordings-lts"
)
MODE="sync"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PLUGIN_SERVER_DIR="$SCRIPT_DIR/../plugin-server"
# Ensure DEBUG is set for local dev
export DEBUG=1
cd "$PLUGIN_SERVER_DIR"
# Parse arguments - handle both orders
SERVICE=""
DRY_RUN_FLAG=""
for arg in "$@"; do
if [ "$arg" = "--dry-run" ] || [ "$arg" = "-n" ]; then
DRY_RUN_FLAG="--dry-run"
else
SERVICE="$arg"
fi
done
# Function to sync a single service
sync_service() {
local service="$1"
local dry_run_flag="$2"
echo "🔄 Syncing $service between MinIO and SeaweedFS..."
echo ""
# Build the command
local cmd="node bin/migrate-minio-to-seaweedfs.js --service $service --mode $MODE"
if [ -n "$dry_run_flag" ]; then
cmd="$cmd --dry-run"
fi
# Execute
$cmd
echo ""
}
# Show dry-run notice if applicable
if [ -n "$DRY_RUN_FLAG" ]; then
echo "📋 DRY RUN MODE - No changes will be made"
echo ""
fi
# If no service specified, sync default subset (SR V2 + LTS)
if [ -z "$SERVICE" ]; then
echo "═══════════════════════════════════════════════════════════════"
echo "🔄 Syncing DEFAULT services between MinIO and SeaweedFS"
echo " (session-recordings, session-recordings-lts)"
echo "═══════════════════════════════════════════════════════════════"
echo ""
for service in "${DEFAULT_SERVICES[@]}"; do
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
sync_service "$service" "$DRY_RUN_FLAG"
done
echo "═══════════════════════════════════════════════════════════════"
echo "✨ Default services synced!"
echo "═══════════════════════════════════════════════════════════════"
else
# Sync single service
sync_service "$SERVICE" "$DRY_RUN_FLAG"
echo "✨ Done!"
fi
echo ""
echo "Available services:"
echo " - query-cache (ephemeral, safe to test)"
echo " - session-recordings (current V2 recordings)"
echo " - session-recordings-lts (long-term storage)"
echo " - media-uploads (user uploaded media)"
echo " - exports (CSV, PNG, PDF, videos)"
echo " - source-maps (error tracking source maps)"
echo ""
echo "Usage:"
echo " $0 # Sync DEFAULT services (SR V2 + LTS)"
echo " $0 [--dry-run] # Preview syncing DEFAULT services"
echo " $0 <service> # Sync specific service"
echo " $0 <service> [--dry-run] # Preview syncing specific service"
echo ""
echo "Examples:"
echo " $0 # Sync default services (SR V2 + LTS)"
echo " $0 --dry-run # Dry run for default services"
echo " $0 query-cache # Sync query cache only"
echo " $0 media-uploads -n # Dry run for media uploads only"

View File

@@ -118,6 +118,68 @@ if ! grep -q "SESSION_RECORDING_V2_METADATA_SWITCHOVER" .env; then
source .env source .env
fi fi
# Check if session recording storage migration marker exists
if ! grep -q "SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS" .env; then
echo ""
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "📦 SESSION RECORDING STORAGE UPDATE"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
echo "PostHog now uses SeaweedFS for session recording storage."
echo "Your existing session recordings are stored in MinIO."
echo ""
echo "You have TWO options:"
echo ""
echo "1. 🔄 MIGRATE your existing session recordings to SeaweedFS"
echo " - Keeps all your existing recordings accessible"
echo " - Run migration script after upgrade"
echo " - New recordings will be stored in SeaweedFS"
echo " - Migration is best-effort for hobby deployments; support is limited"
echo ""
echo "2. ⚠️ ACCEPT DATA LOSS of existing session recordings"
echo " - Existing recordings will become inaccessible"
echo " - Only new recordings will be available"
echo " - Faster upgrade, no migration needed"
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
read -r -p "Do you want to MIGRATE existing session recordings? [y/N] " response
if [[ "$response" =~ ^([yY][eE][sS]|[yY])+$ ]]
then
echo ""
echo "✅ You chose to MIGRATE existing recordings."
echo ""
echo "After the upgrade completes, run this command to migrate:"
echo ""
echo " ./bin/migrate-session-recordings-hobby"
echo ""
echo "Options:"
echo " --dry-run Preview what would be migrated"
echo " --resume Resume from last checkpoint if interrupted"
echo " --force Overwrite existing objects in destination"
echo " --workers <n> Number of concurrent workers (default: 5)"
echo ""
read -r -p "Press ENTER to acknowledge and continue..."
echo "SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=pending_migration" >> .env
else
echo ""
echo "⚠️ You chose to ACCEPT DATA LOSS."
echo "Existing session recordings will not be accessible after upgrade."
echo ""
read -r -p "Are you sure? This cannot be undone. [y/N] " confirm
if [[ "$confirm" =~ ^([yY][eE][sS]|[yY])+$ ]]
then
echo "SESSION_RECORDING_STORAGE_MIGRATED_TO_SEAWEEDFS=data_loss_accepted" >> .env
echo "✅ Proceeding without migration."
else
echo "Upgrade cancelled. No changes made."
exit 1
fi
fi
source .env
fi
export POSTHOG_APP_TAG="${POSTHOG_APP_TAG:-latest-release}" export POSTHOG_APP_TAG="${POSTHOG_APP_TAG:-latest-release}"
cd posthog cd posthog

View File

@@ -215,6 +215,20 @@ deploy:
bin_script: upgrade-hobby bin_script: upgrade-hobby
description: Upgrade existing hobby deployment with data loss warnings and volume description: Upgrade existing hobby deployment with data loss warnings and volume
checks checks
deploy:migrate-recordings:
bin_script: migrate-session-recordings-hobby
description: Migrate session recordings from MinIO to SeaweedFS for hobby deployments
deploy:migrate-storage:
bin_script: migrate-minio-to-seaweedfs
description: "Migrate or sync object storage between MinIO and SeaweedFS\n\nServices:\
\ session-recordings, session-recordings-lts, query-cache, \n media-uploads,\
\ exports, source-maps\n\nModes:\n --mode migrate One-way copy (default)\n\
\ --mode sync Bidirectional sync\n\nExamples:\n hogli deploy:migrate-storage\
\ --service query-cache --mode sync --dry-run\n hogli deploy:migrate-storage\
\ --service media-uploads --mode migrate"
deploy:migrate-storage-hobby:
bin_script: migrate-storage-hobby
description: "Hobby: Migrate or sync ALL services between MinIO and SeaweedFS via docker-compose\n\nExamples:\n hogli deploy:migrate-storage-hobby # Sync all services\n hogli deploy:migrate-storage-hobby --mode migrate # One-way migrate all (MinIO → SeaweedFS)\n hogli deploy:migrate-storage-hobby exports -n # Dry run exports only"
build: build:
build:frontend: build:frontend:
cmd: pnpm --filter=@posthog/frontend build cmd: pnpm --filter=@posthog/frontend build
@@ -417,7 +431,11 @@ tools:
hidden: true hidden: true
create:notebook:node: create:notebook:node:
bin_script: create-notebook-node.sh bin_script: create-notebook-node.sh
description: 'Create a new NotebookNode file and update types and editor references' description: Create a new NotebookNode file and update types and editor references
hidden: true
sync:storage:
bin_script: sync-storage
description: 'TODO: add description for sync-storage'
hidden: true hidden: true
utilities: utilities:
utilities:posthog-worktree: utilities:posthog-worktree:

View File

@@ -10,7 +10,7 @@ def get_service_url(service: str = "proxy") -> str:
service_urls = { service_urls = {
"proxy": "http://localhost:8010", "proxy": "http://localhost:8010",
"s3": "http://localhost:19000", "s3": "http://localhost:8333",
"clickhouse": "http://localhost:8123", "clickhouse": "http://localhost:8123",
} }

View File

@@ -485,6 +485,14 @@ services:
volumes: volumes:
- '/var/run/docker.sock:/var/run/docker.sock' - '/var/run/docker.sock:/var/run/docker.sock'
seaweedfs:
container_name: '${SEAWEEDFS_DOCKER_NAME:-seaweedfs-main}'
image: chrislusf/seaweedfs:latest
ports:
- '127.0.0.1:8333:8333' # S3 API
- '127.0.0.1:9333:9333' # Master server (for admin UI)
command: server -s3 -s3.port=8333 -dir=/data
networks: networks:
otel_network: otel_network:
driver: bridge driver: bridge

View File

@@ -271,9 +271,20 @@ services:
file: docker-compose.base.yml file: docker-compose.base.yml
service: localstack service: localstack
seaweedfs:
extends:
file: docker-compose.base.yml
service: seaweedfs
command: server -s3 -s3.port=8333 -filer -dir=/data
ports:
- '127.0.0.1:8888:8888' # SeaweedFS Filer UI
volumes:
- 'seaweedfs-data:/data'
networks: networks:
otel_network: otel_network:
driver: bridge driver: bridge
volumes: volumes:
redpanda-data: redpanda-data:
seaweedfs-data:

View File

@@ -85,7 +85,9 @@ services:
OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user'
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password'
OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000 OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000
SESSION_RECORDING_V2_S3_ENDPOINT: http://objectstorage:19000 SESSION_RECORDING_V2_S3_ENDPOINT: http://seaweedfs:8333
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any'
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any'
OBJECT_STORAGE_ENABLED: true OBJECT_STORAGE_ENABLED: true
ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS
image: $REGISTRY_URL:$POSTHOG_APP_TAG image: $REGISTRY_URL:$POSTHOG_APP_TAG
@@ -103,10 +105,10 @@ services:
SECRET_KEY: $POSTHOG_SECRET SECRET_KEY: $POSTHOG_SECRET
OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user'
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password'
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user' SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any'
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password' SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any'
OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000 OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000
SESSION_RECORDING_V2_S3_ENDPOINT: http://objectstorage:19000 SESSION_RECORDING_V2_S3_ENDPOINT: http://seaweedfs:8333
OBJECT_STORAGE_ENABLED: true OBJECT_STORAGE_ENABLED: true
ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS ENCRYPTION_SALT_KEYS: $ENCRYPTION_SALT_KEYS
OTEL_SERVICE_NAME: 'posthog' OTEL_SERVICE_NAME: 'posthog'
@@ -118,6 +120,7 @@ services:
- clickhouse - clickhouse
- kafka - kafka
- objectstorage - objectstorage
- seaweedfs
plugins: plugins:
extends: extends:
@@ -129,10 +132,10 @@ services:
SECRET_KEY: $POSTHOG_SECRET SECRET_KEY: $POSTHOG_SECRET
OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user'
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password'
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user' SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any'
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password' SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any'
OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000 OBJECT_STORAGE_ENDPOINT: http://objectstorage:19000
SESSION_RECORDING_V2_S3_ENDPOINT: http://objectstorage:19000 SESSION_RECORDING_V2_S3_ENDPOINT: http://seaweedfs:8333
OBJECT_STORAGE_ENABLED: true OBJECT_STORAGE_ENABLED: true
CDP_REDIS_HOST: redis7 CDP_REDIS_HOST: redis7
CDP_REDIS_PORT: 6379 CDP_REDIS_PORT: 6379
@@ -148,6 +151,7 @@ services:
- clickhouse - clickhouse
- kafka - kafka
- objectstorage - objectstorage
- seaweedfs
proxy: proxy:
extends: extends:
@@ -175,6 +179,17 @@ services:
- '19000:19000' - '19000:19000'
- '19001:19001' - '19001:19001'
seaweedfs:
extends:
file: docker-compose.base.yml
service: seaweedfs
restart: on-failure
volumes:
- seaweedfs:/data
ports:
- '8333:8333'
- '9333:9333'
asyncmigrationscheck: asyncmigrationscheck:
extends: extends:
file: docker-compose.base.yml file: docker-compose.base.yml
@@ -234,6 +249,7 @@ services:
- clickhouse - clickhouse
- kafka - kafka
- objectstorage - objectstorage
- seaweedfs
- temporal - temporal
cyclotron-janitor: cyclotron-janitor:
@@ -298,6 +314,7 @@ volumes:
zookeeper-datalog: zookeeper-datalog:
zookeeper-logs: zookeeper-logs:
objectstorage: objectstorage:
seaweedfs:
postgres-data: postgres-data:
clickhouse-data: clickhouse-data:
caddy-data: caddy-data:

View File

@@ -0,0 +1,961 @@
#!/usr/bin/env node
/**
* Migration and sync script for object storage between MinIO and SeaweedFS
*
* This script supports:
* - One-way migration (MinIO → SeaweedFS or SeaweedFS → MinIO)
* - Bidirectional sync (keeps both storages in sync)
* - Multiple services (session recordings, exports, media uploads, etc.)
*
* Usage:
* node bin/migrate-minio-to-seaweedfs.js [options]
*
* Options:
* --service <name> Service to migrate (default: session-recordings)
* --mode <mode> Mode: migrate | sync (default: migrate)
* --force Overwrite existing objects in destination
* --dry-run Show what would be migrated without copying
* --workers <n> Number of concurrent workers (default: 5)
* --resume Resume from last checkpoint
* --revert Copy from SeaweedFS back to MinIO (reverse direction)
* --conflict <strategy> Conflict resolution: newest | largest | skip (default: newest)
* --help Show this help message
*
* Modes:
* migrate One-way copy from source to destination
* sync Bidirectional sync - copies missing objects in both directions
*
* Services:
* session-recordings Session recording blobs (V2)
* session-recordings-lts Long-term storage session recordings
* query-cache Query result cache
* media-uploads User uploaded media
* exports Exported assets (CSV, PNG, PDF, videos)
* source-maps Error tracking source maps
*/
const {
S3Client,
ListObjectsV2Command,
GetObjectCommand,
PutObjectCommand,
HeadObjectCommand,
CreateBucketCommand,
} = require('@aws-sdk/client-s3')
const { existsSync, readFileSync, writeFileSync } = require('fs')
// Service configurations
const SERVICES = {
'session-recordings': {
bucket: 'posthog',
prefix: 'session_recordings/',
description: 'Session recording blobs (V2)',
bidirectional: true,
conflictResolution: 'newest',
},
'session-recordings-lts': {
bucket: 'posthog',
prefix: 'session_recordings_lts/',
description: 'Long-term storage session recordings',
bidirectional: true,
conflictResolution: 'newest',
},
'query-cache': {
bucket: 'posthog',
prefix: 'query_cache/',
description: 'Query result cache (ephemeral)',
bidirectional: true,
conflictResolution: 'skip', // Cache can be regenerated
},
'media-uploads': {
bucket: 'posthog',
prefix: 'media_uploads/',
description: 'User uploaded media files',
bidirectional: true,
conflictResolution: 'largest', // Keep largest to avoid corrupted files
critical: true,
},
exports: {
bucket: 'posthog',
prefix: 'exports/',
description: 'Exported assets (CSV, PNG, PDF, videos)',
bidirectional: true,
conflictResolution: 'newest',
critical: true,
},
'source-maps': {
bucket: 'posthog',
prefix: 'symbolsets/',
description: 'Error tracking source maps',
bidirectional: true,
conflictResolution: 'newest',
critical: true,
},
}
// Checkpoint file for resumable migrations
const CHECKPOINT_FILE = '.migration-checkpoint.json'
class Checkpoint {
constructor(serviceName) {
this.serviceName = serviceName
this.data = this.load()
}
load() {
if (existsSync(CHECKPOINT_FILE)) {
try {
return JSON.parse(readFileSync(CHECKPOINT_FILE, 'utf-8'))
} catch (err) {
console.warn('⚠️ Failed to load checkpoint, starting fresh')
return {}
}
}
return {}
}
save() {
writeFileSync(CHECKPOINT_FILE, JSON.stringify(this.data, null, 2))
}
getServiceData(serviceName) {
if (!this.data[serviceName]) {
this.data[serviceName] = {
completed: [],
failed: {},
lastKey: null,
startTime: Date.now(),
}
}
return this.data[serviceName]
}
markCompleted(serviceName, key) {
const data = this.getServiceData(serviceName)
data.completed.push(key)
data.lastKey = key
}
markFailed(serviceName, key, error) {
const data = this.getServiceData(serviceName)
data.failed[key] = error.message || String(error)
}
markSkipped(serviceName) {
const data = this.getServiceData(serviceName)
data.skipped = (data.skipped || 0) + 1
}
isCompleted(serviceName, key) {
const data = this.getServiceData(serviceName)
return data.completed.includes(key)
}
getLastKey(serviceName) {
const data = this.getServiceData(serviceName)
return data.lastKey
}
}
class ProgressTracker {
constructor(total) {
this.total = total
this.completed = 0
this.failed = 0
this.skipped = 0
this.bytesTransferred = 0
this.startTime = Date.now()
}
increment(type, bytes = 0) {
this[type]++
if (bytes > 0) {
this.bytesTransferred += bytes
}
this.print()
}
print() {
const processed = this.completed + this.failed + this.skipped
const percent = Math.round((processed / this.total) * 100)
const elapsed = (Date.now() - this.startTime) / 1000
const rate = elapsed > 0 ? processed / elapsed : 0
const remaining = rate > 0 ? (this.total - processed) / rate : 0
const mbTransferred = (this.bytesTransferred / 1024 / 1024).toFixed(2)
const mbPerSec = elapsed > 0 ? (this.bytesTransferred / 1024 / 1024 / elapsed).toFixed(2) : '0.00'
const barLength = 30
const filledLength = Math.floor((percent / 100) * barLength)
const bar = '█'.repeat(filledLength) + '░'.repeat(barLength - filledLength)
process.stdout.write(
`\r[${bar}] ${percent}% | ${processed}/${this.total} objects | ` +
`${Math.floor(elapsed)}s | Est. ${Math.floor(remaining)}s remaining | ` +
`📊 ${mbTransferred} MB (${mbPerSec} MB/s) | ✓ ${this.completed} | ✗ ${this.failed} | ⊘ ${this.skipped}`
)
}
finish() {
process.stdout.write('\n')
}
}
function parseArgs() {
const args = process.argv.slice(2)
const options = {
service: 'session-recordings',
mode: 'migrate',
force: false,
dryRun: false,
workers: 5,
resume: false,
revert: false,
conflictResolution: 'newest',
}
for (let i = 0; i < args.length; i++) {
switch (args[i]) {
case '--service':
options.service = args[++i]
break
case '--mode':
options.mode = args[++i]
if (!['migrate', 'sync'].includes(options.mode)) {
console.error(`Invalid mode: ${options.mode}. Must be 'migrate' or 'sync'`)
process.exit(1)
}
break
case '--force':
options.force = true
break
case '--dry-run':
options.dryRun = true
break
case '--workers':
options.workers = parseInt(args[++i], 10)
break
case '--resume':
options.resume = true
break
case '--revert':
options.revert = true
break
case '--conflict':
options.conflictResolution = args[++i]
if (!['newest', 'largest', 'skip'].includes(options.conflictResolution)) {
console.error(
`Invalid conflict resolution: ${options.conflictResolution}. Must be 'newest', 'largest', or 'skip'`
)
process.exit(1)
}
break
case '--help':
console.log(__doc__)
process.exit(0)
default:
console.error(`Unknown option: ${args[i]}`)
process.exit(1)
}
}
return options
}
function ensureLocalDevelopmentOnly(minioEndpoint, seaweedfsEndpoint) {
// Check endpoints are localhost
const localPatterns = [/^https?:\/\/(localhost|127\.0\.0\.1|::1)(:\d+)?/, /^https?:\/\/[^.]+:\d+$/]
const isMinioLocal = localPatterns.some((p) => p.test(minioEndpoint))
const isSeaweedFSLocal = localPatterns.some((p) => p.test(seaweedfsEndpoint))
if (!isMinioLocal || !isSeaweedFSLocal) {
console.error('❌ SAFETY CHECK FAILED: Non-local endpoint detected')
console.error(` MinIO endpoint: ${minioEndpoint}`)
console.error(` SeaweedFS endpoint: ${seaweedfsEndpoint}`)
console.error(' This script is for LOCAL DEVELOPMENT ONLY.')
process.exit(1)
}
// Check NODE_ENV or DEBUG
const nodeEnv = process.env.NODE_ENV?.toLowerCase()
const isDebug = ['y', 'yes', 't', 'true', 'on', '1'].includes(String(process.env.DEBUG).toLowerCase())
let isDev = false
if (nodeEnv) {
isDev = nodeEnv.startsWith('dev') || nodeEnv.startsWith('test')
} else if (isDebug) {
isDev = true
}
if (!isDev) {
console.error('❌ SAFETY CHECK FAILED: Not running in development environment')
console.error(` NODE_ENV: ${process.env.NODE_ENV || 'not set'}`)
console.error(` DEBUG: ${process.env.DEBUG || 'not set'}`)
console.error(' This script is for LOCAL DEVELOPMENT ONLY.')
console.error(' Set NODE_ENV=development or DEBUG=1 to run.')
process.exit(1)
}
// Check for AWS production indicators
const awsIndicators = ['AWS_EXECUTION_ENV', 'AWS_LAMBDA_FUNCTION_NAME', 'ECS_CONTAINER_METADATA_URI']
const foundIndicators = awsIndicators.filter((key) => process.env[key])
if (foundIndicators.length > 0) {
console.error('❌ SAFETY CHECK FAILED: AWS production environment detected')
console.error(` Found: ${foundIndicators.join(', ')}`)
console.error(' This script is for LOCAL DEVELOPMENT ONLY.')
process.exit(1)
}
console.log('✅ Safety checks passed: Local development environment confirmed')
}
async function createS3Client(endpoint, accessKeyId, secretAccessKey) {
return new S3Client({
endpoint,
region: 'us-east-1',
credentials: {
accessKeyId,
secretAccessKey,
},
forcePathStyle: true,
})
}
async function ensureBucketExists(client, bucket) {
try {
// Try to list objects to check if bucket exists
await client.send(new ListObjectsV2Command({ Bucket: bucket, MaxKeys: 1 }))
console.log(`✅ Bucket '${bucket}' exists`)
} catch (err) {
if (err.name === 'NoSuchBucket' || err.name === 'NotFound' || err.$metadata?.httpStatusCode === 404) {
console.log(`📦 Creating bucket '${bucket}'...`)
try {
await client.send(new CreateBucketCommand({ Bucket: bucket }))
console.log(`✅ Bucket '${bucket}' created`)
} catch (createErr) {
// Ignore error if bucket already exists (race condition)
if (createErr.name === 'BucketAlreadyOwnedByYou' || createErr.name === 'BucketAlreadyExists') {
console.log(`✅ Bucket '${bucket}' already exists`)
} else {
console.error(`❌ Failed to create bucket '${bucket}':`, createErr.message)
throw createErr
}
}
} else {
// Ignore other errors (bucket might exist)
console.log(`✅ Assuming bucket '${bucket}' exists`)
}
}
}
async function testConnectivity(client, endpoint, bucket) {
try {
const command = new ListObjectsV2Command({ Bucket: bucket, MaxKeys: 1 })
await client.send(command)
console.log(`✅ Connected to ${endpoint}`)
return true
} catch (err) {
console.error(`❌ Cannot connect to ${endpoint} bucket '${bucket}': ${err.message}`)
return false
}
}
async function objectExists(client, bucket, key) {
try {
await client.send(new HeadObjectCommand({ Bucket: bucket, Key: key }))
return true
} catch (err) {
if (err.name === 'NotFound' || err.$metadata?.httpStatusCode === 404) {
return false
}
throw err
}
}
async function getObjectMetadata(client, bucket, key) {
try {
const response = await client.send(new HeadObjectCommand({ Bucket: bucket, Key: key }))
return {
Size: response.ContentLength,
LastModified: response.LastModified,
ETag: response.ETag,
}
} catch (err) {
if (err.name === 'NotFound' || err.$metadata?.httpStatusCode === 404) {
return null
}
throw err
}
}
async function objectsAreSame(sourceClient, destClient, bucket, key) {
const [sourceMetadata, destMetadata] = await Promise.all([
getObjectMetadata(sourceClient, bucket, key),
getObjectMetadata(destClient, bucket, key),
])
// If destination doesn't exist, they're not the same
if (!destMetadata) {
return false
}
// If source doesn't exist (shouldn't happen), consider them different
if (!sourceMetadata) {
return false
}
// Use the needsSync logic (inverted)
return !needsSync(sourceMetadata, destMetadata)
}
async function copyObject(sourceClient, destClient, bucket, key) {
const getCommand = new GetObjectCommand({ Bucket: bucket, Key: key })
const response = await sourceClient.send(getCommand)
const chunks = []
for await (const chunk of response.Body) {
chunks.push(chunk)
}
const buffer = Buffer.concat(chunks)
const putCommand = new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: buffer,
ContentType: response.ContentType || 'application/octet-stream',
})
await destClient.send(putCommand)
return { size: buffer.length }
}
async function listAllObjectsWithMetadata(client, bucket, prefix) {
const allObjects = []
let continuationToken = undefined
do {
const command = new ListObjectsV2Command({
Bucket: bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
})
const response = await client.send(command)
if (response.Contents) {
allObjects.push(...response.Contents)
}
continuationToken = response.NextContinuationToken
} while (continuationToken)
return allObjects
}
function needsSync(objA, objB) {
// Different size = different content
if (objA.Size !== objB.Size) return true
// Different modification time (with tolerance of 2 seconds for clock skew)
const timeDiff = Math.abs(new Date(objA.LastModified) - new Date(objB.LastModified))
if (timeDiff > 2000) return true
// ETag comparison if available (not all S3 implementations provide this)
if (objA.ETag && objB.ETag && objA.ETag !== objB.ETag) return true
return false
}
async function resolveConflict(conflict, strategy, minioClient, seaweedfsClient, bucket) {
const { key, minioObj, seaweedfsObj } = conflict
let winnerObj, winnerClient, loserClient, winnerName
switch (strategy) {
case 'newest':
const minioTime = new Date(minioObj.LastModified)
const seaweedfsTime = new Date(seaweedfsObj.LastModified)
if (minioTime > seaweedfsTime) {
winnerObj = minioObj
winnerClient = minioClient
loserClient = seaweedfsClient
winnerName = 'MinIO'
} else {
winnerObj = seaweedfsObj
winnerClient = seaweedfsClient
loserClient = minioClient
winnerName = 'SeaweedFS'
}
break
case 'largest':
if (minioObj.Size > seaweedfsObj.Size) {
winnerObj = minioObj
winnerClient = minioClient
loserClient = seaweedfsClient
winnerName = 'MinIO'
} else {
winnerObj = seaweedfsObj
winnerClient = seaweedfsClient
loserClient = minioClient
winnerName = 'SeaweedFS'
}
break
case 'skip':
return { action: 'skipped', key }
default:
throw new Error(`Unknown conflict resolution strategy: ${strategy}`)
}
// Copy winner to loser
await copyObject(winnerClient, loserClient, bucket, key)
return { action: 'resolved', key, winner: winnerName, size: winnerObj.Size }
}
async function migrateService(serviceName, config, options, checkpoint) {
const direction = options.revert ? 'SeaweedFS → MinIO' : 'MinIO → SeaweedFS'
const sourceName = options.revert ? 'SeaweedFS' : 'MinIO'
const destName = options.revert ? 'MinIO' : 'SeaweedFS'
console.log(`\n${'='.repeat(80)}`)
console.log(`📦 Migrating: ${serviceName}`)
console.log(` ${config.description}`)
console.log(` Direction: ${direction}`)
console.log(` Bucket: ${config.bucket}`)
console.log(` Prefix: ${config.prefix}`)
console.log(`${'='.repeat(80)}\n`)
// Resolve endpoints and credentials from env (fallback to localhost defaults)
const MINIO_ENDPOINT = process.env.OBJECT_STORAGE_ENDPOINT || 'http://localhost:19000'
const MINIO_KEY = process.env.OBJECT_STORAGE_ACCESS_KEY_ID || 'object_storage_root_user'
const MINIO_SECRET = process.env.OBJECT_STORAGE_SECRET_ACCESS_KEY || 'object_storage_root_password'
const SW_ENDPOINT =
process.env.SESSION_RECORDING_V2_S3_ENDPOINT || process.env.SEAWEEDFS_ENDPOINT || 'http://localhost:8333'
const SW_KEY = process.env.SESSION_RECORDING_V2_S3_ACCESS_KEY_ID || process.env.SEAWEEDFS_ACCESS_KEY_ID || 'any'
const SW_SECRET =
process.env.SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY || process.env.SEAWEEDFS_SECRET_ACCESS_KEY || 'any'
// Create S3 clients
const minioClient = await createS3Client(MINIO_ENDPOINT, MINIO_KEY, MINIO_SECRET)
const seaweedfsClient = await createS3Client(SW_ENDPOINT, SW_KEY, SW_SECRET)
// Determine source and destination based on direction
const sourceClient = options.revert ? seaweedfsClient : minioClient
const destClient = options.revert ? minioClient : seaweedfsClient
// Ensure bucket exists in destination
await ensureBucketExists(destClient, config.bucket)
// Test connectivity
const minioOk = await testConnectivity(minioClient, 'MinIO', config.bucket)
const seaweedfsOk = await testConnectivity(seaweedfsClient, 'SeaweedFS', config.bucket)
if (!minioOk || !seaweedfsOk) {
throw new Error('Failed to connect to storage backends')
}
// List objects from source
console.log(`📋 Listing objects from ${sourceName}...`)
let allObjects = []
let continuationToken = undefined
do {
const command = new ListObjectsV2Command({
Bucket: config.bucket,
Prefix: config.prefix,
ContinuationToken: continuationToken,
})
const response = await sourceClient.send(command)
if (response.Contents) {
allObjects = allObjects.concat(response.Contents)
}
continuationToken = response.NextContinuationToken
} while (continuationToken)
console.log(`✅ Found ${allObjects.length} objects`)
if (allObjects.length === 0) {
console.log('✨ No objects to migrate')
return
}
// Filter objects based on checkpoint and resume
let objectsToProcess = allObjects
if (options.resume) {
const lastKey = checkpoint.getLastKey(serviceName)
if (lastKey) {
const lastIndex = allObjects.findIndex((obj) => obj.Key === lastKey)
if (lastIndex >= 0) {
objectsToProcess = allObjects.slice(lastIndex + 1)
console.log(`📍 Resuming from key: ${lastKey}`)
console.log(` ${objectsToProcess.length} objects remaining`)
}
}
}
if (options.dryRun) {
console.log('\n🔍 DRY RUN MODE - No objects will be copied\n')
console.log('Objects that would be migrated:')
objectsToProcess.slice(0, 10).forEach((obj) => {
console.log(` - ${obj.Key} (${(obj.Size / 1024).toFixed(2)} KB)`)
})
if (objectsToProcess.length > 10) {
console.log(` ... and ${objectsToProcess.length - 10} more`)
}
return
}
// Copy objects with progress tracking
console.log(`\n🚀 Starting migration with ${options.workers} workers...\n`)
const progress = new ProgressTracker(objectsToProcess.length)
const failedObjects = []
// Worker pool
const workers = []
const queue = [...objectsToProcess]
for (let i = 0; i < options.workers; i++) {
workers.push(
(async () => {
while (true) {
const obj = queue.shift()
if (!obj) break
try {
// Check if objects are identical (same size, timestamp, content)
const areSame = await objectsAreSame(sourceClient, destClient, config.bucket, obj.Key)
if (areSame && !options.force) {
checkpoint.markSkipped(serviceName)
progress.increment('skipped')
} else {
const result = await copyObject(sourceClient, destClient, config.bucket, obj.Key)
checkpoint.markCompleted(serviceName, obj.Key)
progress.increment('completed', result.size)
}
} catch (err) {
checkpoint.markFailed(serviceName, obj.Key, err)
progress.increment('failed')
failedObjects.push({ key: obj.Key, error: err.message })
}
// Save checkpoint every 100 objects
if ((progress.completed + progress.failed + progress.skipped) % 100 === 0) {
checkpoint.save()
}
}
})()
)
}
await Promise.all(workers)
progress.finish()
// Final checkpoint save
checkpoint.save()
// Summary
console.log(`\n${'='.repeat(80)}`)
console.log('📊 Migration Summary')
console.log(`${'='.repeat(80)}`)
console.log(`✅ Completed: ${progress.completed}`)
console.log(`⊘ Skipped: ${progress.skipped} (already identical in destination)`)
console.log(`✗ Failed: ${progress.failed}`)
console.log(`📦 Data transferred: ${(progress.bytesTransferred / 1024 / 1024).toFixed(2)} MB`)
console.log(`⏱ Total time: ${Math.floor((Date.now() - progress.startTime) / 1000)}s`)
if (failedObjects.length > 0) {
console.log(`\n❌ Failed objects:`)
failedObjects.slice(0, 10).forEach((obj) => {
console.log(` ${obj.key}: ${obj.error}`)
})
if (failedObjects.length > 10) {
console.log(` ... and ${failedObjects.length - 10} more`)
}
console.log(`\n💡 Run with --resume to retry failed objects`)
}
}
async function syncService(serviceName, config, options, checkpoint) {
console.log(`\n${'='.repeat(80)}`)
console.log(`🔄 Syncing: ${serviceName}`)
console.log(` ${config.description}`)
console.log(` Mode: Bidirectional Sync`)
console.log(` Bucket: ${config.bucket}`)
console.log(` Prefix: ${config.prefix}`)
console.log(`${'='.repeat(80)}\n`)
// Create S3 clients
const minioClient = await createS3Client(
'http://localhost:19000',
'object_storage_root_user',
'object_storage_root_password'
)
const seaweedfsClient = await createS3Client('http://localhost:8333', 'any', 'any')
// Ensure bucket exists in both
await ensureBucketExists(minioClient, config.bucket)
await ensureBucketExists(seaweedfsClient, config.bucket)
// Test connectivity
const minioOk = await testConnectivity(minioClient, 'MinIO', config.bucket)
const seaweedfsOk = await testConnectivity(seaweedfsClient, 'SeaweedFS', config.bucket)
if (!minioOk || !seaweedfsOk) {
throw new Error('Failed to connect to storage backends')
}
// List objects from both sides
console.log(`📋 Listing objects from both storages...`)
const [minioObjects, seaweedfsObjects] = await Promise.all([
listAllObjectsWithMetadata(minioClient, config.bucket, config.prefix),
listAllObjectsWithMetadata(seaweedfsClient, config.bucket, config.prefix),
])
console.log(`✅ MinIO: ${minioObjects.length} objects`)
console.log(`✅ SeaweedFS: ${seaweedfsObjects.length} objects`)
// Build key maps
const minioMap = new Map(minioObjects.map((o) => [o.Key, o]))
const seaweedfsMap = new Map(seaweedfsObjects.map((o) => [o.Key, o]))
// Find differences
const onlyInMinio = []
const onlyInSeaweedfs = []
const conflicts = []
for (const [key, minioObj] of minioMap) {
if (!seaweedfsMap.has(key)) {
onlyInMinio.push(minioObj)
} else {
const seaweedfsObj = seaweedfsMap.get(key)
if (needsSync(minioObj, seaweedfsObj)) {
conflicts.push({ key, minioObj, seaweedfsObj })
}
}
}
for (const [key, seaweedfsObj] of seaweedfsMap) {
if (!minioMap.has(key)) {
onlyInSeaweedfs.push(seaweedfsObj)
}
}
// Calculate objects already in sync
const totalObjects = minioObjects.length + seaweedfsObjects.length
const inBothStorages = minioObjects.filter((obj) => seaweedfsMap.has(obj.Key)).length
const alreadyInSync = inBothStorages - conflicts.length
console.log(`\n📊 Sync Analysis:`)
console.log(` Total objects: ${totalObjects}`)
console.log(` Already in sync: ${alreadyInSync}`)
console.log(` MinIO → SeaweedFS: ${onlyInMinio.length} objects`)
console.log(` SeaweedFS → MinIO: ${onlyInSeaweedfs.length} objects`)
console.log(` Conflicts to resolve: ${conflicts.length} objects`)
const totalOperations = onlyInMinio.length + onlyInSeaweedfs.length + conflicts.length
if (totalOperations === 0) {
console.log(`\n✨ Storages are already in sync! No changes needed.`)
return
}
if (options.dryRun) {
console.log('\n🔍 DRY RUN MODE - No objects will be copied\n')
if (onlyInMinio.length > 0) {
console.log('Would copy MinIO → SeaweedFS:')
onlyInMinio.slice(0, 5).forEach((obj) => {
console.log(` - ${obj.Key} (${(obj.Size / 1024).toFixed(2)} KB)`)
})
if (onlyInMinio.length > 5) console.log(` ... and ${onlyInMinio.length - 5} more`)
}
if (onlyInSeaweedfs.length > 0) {
console.log('\nWould copy SeaweedFS → MinIO:')
onlyInSeaweedfs.slice(0, 5).forEach((obj) => {
console.log(` - ${obj.Key} (${(obj.Size / 1024).toFixed(2)} KB)`)
})
if (onlyInSeaweedfs.length > 5) console.log(` ... and ${onlyInSeaweedfs.length - 5} more`)
}
if (conflicts.length > 0) {
const resolution = options.conflictResolution || config.conflictResolution
console.log(`\nWould resolve ${conflicts.length} conflicts using strategy: ${resolution}`)
conflicts.slice(0, 3).forEach((c) => {
console.log(` - ${c.key}:`)
console.log(` MinIO: ${(c.minioObj.Size / 1024).toFixed(2)} KB, ${c.minioObj.LastModified}`)
console.log(
` SeaweedFS: ${(c.seaweedfsObj.Size / 1024).toFixed(2)} KB, ${c.seaweedfsObj.LastModified}`
)
})
if (conflicts.length > 3) console.log(` ... and ${conflicts.length - 3} more`)
}
return
}
// Sync in both directions
console.log(`\n🚀 Starting bidirectional sync with ${options.workers} workers...\n`)
const progress = new ProgressTracker(totalOperations)
const failedObjects = []
// Copy MinIO → SeaweedFS
console.log(`📤 Copying ${onlyInMinio.length} objects from MinIO to SeaweedFS...`)
const queue1 = [...onlyInMinio]
const workers1 = []
for (let i = 0; i < options.workers; i++) {
workers1.push(
(async () => {
while (true) {
const obj = queue1.shift()
if (!obj) break
try {
const result = await copyObject(minioClient, seaweedfsClient, config.bucket, obj.Key)
progress.increment('completed', result.size)
} catch (err) {
progress.increment('failed')
failedObjects.push({ key: obj.Key, error: err.message, direction: 'MinIO→SeaweedFS' })
}
}
})()
)
}
await Promise.all(workers1)
// Copy SeaweedFS → MinIO
console.log(`📥 Copying ${onlyInSeaweedfs.length} objects from SeaweedFS to MinIO...`)
const queue2 = [...onlyInSeaweedfs]
const workers2 = []
for (let i = 0; i < options.workers; i++) {
workers2.push(
(async () => {
while (true) {
const obj = queue2.shift()
if (!obj) break
try {
const result = await copyObject(seaweedfsClient, minioClient, config.bucket, obj.Key)
progress.increment('completed', result.size)
} catch (err) {
progress.increment('failed')
failedObjects.push({ key: obj.Key, error: err.message, direction: 'SeaweedFS→MinIO' })
}
}
})()
)
}
await Promise.all(workers2)
// Resolve conflicts
if (conflicts.length > 0) {
const resolution = options.conflictResolution || config.conflictResolution
console.log(`\n⚔️ Resolving ${conflicts.length} conflicts using strategy: ${resolution}...`)
const queue3 = [...conflicts]
const workers3 = []
for (let i = 0; i < options.workers; i++) {
workers3.push(
(async () => {
while (true) {
const conflict = queue3.shift()
if (!conflict) break
try {
const result = await resolveConflict(
conflict,
resolution,
minioClient,
seaweedfsClient,
config.bucket
)
if (result.action === 'skipped') {
progress.increment('skipped')
} else {
progress.increment('completed', result.size)
}
} catch (err) {
progress.increment('failed')
failedObjects.push({ key: conflict.key, error: err.message, direction: 'conflict' })
}
}
})()
)
}
await Promise.all(workers3)
}
progress.finish()
// Summary
console.log(`\n${'='.repeat(80)}`)
console.log('📊 Sync Summary')
console.log(`${'='.repeat(80)}`)
console.log(`✅ Synced: ${progress.completed}`)
console.log(`⊘ Skipped: ${progress.skipped}`)
console.log(`✗ Failed: ${progress.failed}`)
console.log(`📦 Data transferred: ${(progress.bytesTransferred / 1024 / 1024).toFixed(2)} MB`)
console.log(`⏱ Total time: ${Math.floor((Date.now() - progress.startTime) / 1000)}s`)
if (failedObjects.length > 0) {
console.log(`\n❌ Failed objects:`)
failedObjects.slice(0, 10).forEach((obj) => {
console.log(` [${obj.direction}] ${obj.key}: ${obj.error}`)
})
if (failedObjects.length > 10) {
console.log(` ... and ${failedObjects.length - 10} more`)
}
}
if (config.critical && failedObjects.length > 0) {
console.log(`\n⚠️ WARNING: This is a CRITICAL service and ${failedObjects.length} objects failed to sync!`)
}
}
async function main() {
const options = parseArgs()
let title
if (options.mode === 'sync') {
title = 'Bidirectional Storage Sync Tool'
} else {
title = options.revert ? 'SeaweedFS to MinIO Migration Tool' : 'MinIO to SeaweedFS Migration Tool'
}
console.log(`🔄 ${title}`)
console.log('=====================================\n')
// Validate service
const config = SERVICES[options.service]
if (!config) {
console.error(`❌ Unknown service: ${options.service}`)
console.error(`Available services: ${Object.keys(SERVICES).join(', ')}`)
process.exit(1)
}
// Safety checks
const minioEndpoint = process.env.OBJECT_STORAGE_ENDPOINT || 'http://localhost:19000'
const seaweedEndpoint =
process.env.SESSION_RECORDING_V2_S3_ENDPOINT || process.env.SEAWEEDFS_ENDPOINT || 'http://localhost:8333'
ensureLocalDevelopmentOnly(minioEndpoint, seaweedEndpoint)
// Initialize checkpoint
const checkpoint = new Checkpoint(options.service)
try {
if (options.mode === 'sync') {
if (!config.bidirectional) {
console.warn(`⚠️ Warning: Service '${options.service}' is not configured for bidirectional sync.`)
console.warn(` Proceeding anyway, but this service may not be suitable for sync mode.`)
}
await syncService(options.service, config, options, checkpoint)
console.log('\n✨ Sync completed successfully!\n')
} else {
await migrateService(options.service, config, options, checkpoint)
console.log('\n✨ Migration completed successfully!\n')
}
} catch (err) {
console.error(`\n${options.mode === 'sync' ? 'Sync' : 'Migration'} failed:`, err.message)
console.error(err.stack)
process.exit(1)
}
}
main()

View File

@@ -276,15 +276,15 @@ export function getDefaultConfig(): PluginsServerConfig {
PERSON_JSONB_SIZE_ESTIMATE_ENABLE: 0, // defaults to off PERSON_JSONB_SIZE_ESTIMATE_ENABLE: 0, // defaults to off
// Session recording V2 // Session recording V2
SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB SESSION_RECORDING_MAX_BATCH_SIZE_KB: isDevEnv() ? 2 * 1024 : 100 * 1024, // 2MB on dev, 100MB on prod and test
SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds
SESSION_RECORDING_V2_S3_BUCKET: 'posthog', SESSION_RECORDING_V2_S3_BUCKET: 'posthog',
SESSION_RECORDING_V2_S3_PREFIX: 'session_recordings', SESSION_RECORDING_V2_S3_PREFIX: 'session_recordings',
SESSION_RECORDING_V2_S3_ENDPOINT: 'http://localhost:19000', SESSION_RECORDING_V2_S3_ENDPOINT: 'http://localhost:8333',
SESSION_RECORDING_V2_S3_REGION: 'us-east-1', SESSION_RECORDING_V2_S3_REGION: 'us-east-1',
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user', SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'any',
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password', SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'any',
SESSION_RECORDING_V2_S3_TIMEOUT_MS: 30000, SESSION_RECORDING_V2_S3_TIMEOUT_MS: isDevEnv() ? 120000 : 30000,
SESSION_RECORDING_V2_REPLAY_EVENTS_KAFKA_TOPIC: 'clickhouse_session_replay_events', SESSION_RECORDING_V2_REPLAY_EVENTS_KAFKA_TOPIC: 'clickhouse_session_replay_events',
SESSION_RECORDING_V2_CONSOLE_LOG_ENTRIES_KAFKA_TOPIC: 'log_entries', SESSION_RECORDING_V2_CONSOLE_LOG_ENTRIES_KAFKA_TOPIC: 'log_entries',
SESSION_RECORDING_V2_CONSOLE_LOG_STORE_SYNC_BATCH_LIMIT: 1000, SESSION_RECORDING_V2_CONSOLE_LOG_STORE_SYNC_BATCH_LIMIT: 1000,

View File

@@ -6,12 +6,10 @@ from posthog.settings.base_variables import DEBUG, TEST
from posthog.settings.utils import str_to_bool from posthog.settings.utils import str_to_bool
if TEST or DEBUG: if TEST or DEBUG:
SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "http://objectstorage:19000") SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "http://seaweedfs:8333")
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: Optional[str] = os.getenv( SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: Optional[str] = os.getenv("SESSION_RECORDING_V2_S3_ACCESS_KEY_ID", "any")
"SESSION_RECORDING_V2_S3_ACCESS_KEY_ID", "object_storage_root_user"
)
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: Optional[str] = os.getenv( SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: Optional[str] = os.getenv(
"SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY", "object_storage_root_password" "SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY", "any"
) )
else: else:
SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "") SESSION_RECORDING_V2_S3_ENDPOINT = os.getenv("SESSION_RECORDING_V2_S3_ENDPOINT", "")