diff --git a/src/channels/plugins/matrix-channel.ts b/src/channels/plugins/matrix-channel.ts deleted file mode 100755 index b08829a488..0000000000 --- a/src/channels/plugins/matrix-channel.ts +++ /dev/null @@ -1,545 +0,0 @@ -/** - * Matrix Channel Plugin for OpenClaw - * - * Provides Matrix protocol support for triad node communication. - * Uses Dendrite homeserver with E2E encryption. - * - * @see docs/matrix-triad-setup.md for deployment guide - */ - -import type { OpenClawConfig } from "../../config/config.js"; -import type { ChannelPlugin } from "./types.plugin.js"; - -// Matrix API types -type MatrixRoomId = string; -type MatrixUserId = string; -type MatrixAccessToken = string; - -type MatrixMessageContent = { - msgtype: "m.text" | "m.notice" | "m.emote"; - body: string; - format?: "org.matrix.custom.html"; - formatted_body?: string; -}; - -type MatrixEventResponse = { - event_id: string; -}; - -type MatrixLoginResponse = { - access_token: MatrixAccessToken; - home_server: string; - user_id: MatrixUserId; - device_id?: string; -}; - -// Matrix homeserver configuration -type MatrixAccountConfig = { - homeserverUrl: string; - userId: MatrixUserId; - password?: string; - accessToken?: MatrixAccessToken; - deviceId?: string; -}; - -type MatrixProbeResult = { - ok: boolean; - homeserver: string; - userId?: MatrixUserId; - deviceId?: string; - error?: string; -}; - -/** - * Matrix API client - */ -class MatrixApi { - constructor( - private homeserverUrl: string, - private accessToken: MatrixAccessToken, - ) {} - - async sendMessage( - roomId: MatrixRoomId, - content: MatrixMessageContent, - ): Promise { - const url = `${this.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message`; - - const response = await fetch(url, { - method: "POST", - headers: { - Authorization: `Bearer ${this.accessToken}`, - "Content-Type": "application/json", - }, - body: JSON.stringify(content), - }); - - if (!response.ok) { - throw new Error(`Matrix API error: ${response.status} ${response.statusText}`); - } - - return response.json(); - } - - async readMessages(roomId: MatrixRoomId, limit: number = 10): Promise { - const url = `${this.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/messages?dir=b&limit=${limit}`; - - const response = await fetch(url, { - headers: { - Authorization: `Bearer ${this.accessToken}`, - }, - }); - - if (!response.ok) { - throw new Error(`Matrix API error: ${response.status}`); - } - - const data = await response.json(); - return data.chunk || []; - } - - async login(userId: string, password: string): Promise { - const url = `${this.homeserverUrl}/_matrix/client/v3/login`; - - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - type: "m.login.password", - identifier: { - type: "m.id.user", - user: userId, - }, - password: password, - }), - }); - - if (!response.ok) { - throw new Error(`Matrix login failed: ${response.status}`); - } - - return response.json(); - } - - async getJoinedRooms(): Promise<{ joined_rooms: MatrixRoomId[] }> { - const url = `${this.homeserverUrl}/_matrix/client/v3/join_rooms`; - - const response = await fetch(url, { - headers: { - Authorization: `Bearer ${this.accessToken}`, - }, - }); - - if (!response.ok) { - throw new Error(`Matrix API error: ${response.status}`); - } - - return response.json(); - } - - async createRoom(name: string, encrypted: boolean = true): Promise<{ room_id: MatrixRoomId }> { - const url = `${this.homeserverUrl}/_matrix/client/v3/createRoom`; - - const body: unknown = { - name: name, - visibility: "private", - preset: "private_chat", - }; - - if (encrypted) { - body.initial_state = [ - { - type: "m.room.encryption", - state_key: "", - content: { - algorithm: "m.megolm.v1.aes-shash", - }, - }, - ]; - } - - const response = await fetch(url, { - method: "POST", - headers: { - Authorization: `Bearer ${this.accessToken}`, - "Content-Type": "application/json", - }, - body: JSON.stringify(body), - }); - - if (!response.ok) { - throw new Error(`Matrix create room failed: ${response.status}`); - } - - return response.json(); - } - - async invite(roomId: MatrixRoomId, userId: MatrixUserId): Promise { - const url = `${this.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/invite`; - - const response = await fetch(url, { - method: "POST", - headers: { - Authorization: `Bearer ${this.accessToken}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ user_id: userId }), - }); - - if (!response.ok) { - throw new Error(`Matrix invite failed: ${response.status}`); - } - } - - async healthCheck(): Promise { - try { - const url = `${this.homeserverUrl}/health`; - const response = await fetch(url, { method: "GET" }); - return response.ok; - } catch { - return false; - } - } -} - -/** - * Probe Matrix homeserver connectivity - */ -async function probeMatrix(cfg?: OpenClawConfig): Promise { - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - - if (!matrixConfig?.homeserverUrl) { - return { ok: false, homeserver: "", error: "No Matrix homeserver configured" }; - } - - const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken || ""); - - try { - const healthy = await api.healthCheck(); - return { - ok: healthy, - homeserver: matrixConfig.homeserverUrl, - userId: matrixConfig.userId, - }; - } catch (error) { - return { - ok: false, - homeserver: matrixConfig.homeserverUrl, - error: error instanceof Error ? error.message : "Unknown error", - }; - } -} - -/** - * Send message to Matrix room - */ -async function sendMessageMatrix( - roomId: MatrixRoomId, - content: string, - accountId?: string, - cfg?: OpenClawConfig, -): Promise<{ ok: true; eventId: string }> { - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - - if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) { - throw new Error("Matrix not configured: missing homeserver URL or access token"); - } - - const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken); - - const result = await api.sendMessage(roomId, { - msgtype: "m.text", - body: content, - }); - - return { ok: true, eventId: result.event_id }; -} - -/** - * Read recent messages from Matrix room - */ -async function readMessagesMatrix( - roomId: MatrixRoomId, - limit: number = 10, - accountId?: string, - cfg?: OpenClawConfig, -): Promise { - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - - if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) { - throw new Error("Matrix not configured: missing Homeserver URL or access token"); - } - - const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken); - return api.readMessages(roomId, limit); -} - -/** - * Monitor Matrix provider health - */ -async function monitorMatrixProvider(cfg?: OpenClawConfig): Promise<{ - ok: boolean; - homeserver: string; - userId?: string; - error?: string; -}> { - return probeMatrix(cfg); -} - -/** - * Resolve Matrix channel allowlist - */ -async function resolveMatrixChannelAllowlist( - _roomId: MatrixRoomId, - _cfg?: OpenClawConfig, -): Promise { - // For triad use case, allowlist is the 4 triad nodes - const triadUsers = [ - "@tm1:triad.local", - "@tm2:triad.local", - "@tm3:triad.local", - "@tm4:triad.local", - ]; - return triadUsers; -} - -/** - * Resolve Matrix user allowlist - */ -async function resolveMatrixUserAllowlist(_cfg?: OpenClawConfig): Promise { - return ["@tm1:triad.local", "@tm2:triad.local", "@tm3:triad.local", "@tm4:triad.local"]; -} - -/** - * Matrix message actions (reactions, edits, deletes) - */ -const matrixMessageActions = { - async react(roomId: MatrixRoomId, eventId: string, emoji: string, cfg?: OpenClawConfig) { - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) { - throw new Error("Matrix not configured"); - } - - const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken); - // Matrix reactions are sent as relation annotations - await api.sendMessage(roomId, { - msgtype: "m.reaction", - body: emoji, - "m.relates_to": { - rel_type: "m.annotation", - event_id: eventId, - key: emoji, - }, - } as unknown as Record); - }, - - async edit(roomId: MatrixRoomId, eventId: string, newContent: string, cfg?: OpenClawConfig) { - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) { - throw new Error("Matrix not configured"); - } - - const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken); - await api.sendMessage(roomId, { - msgtype: "m.text", - body: newContent, - "m.new_content": { - msgtype: "m.text", - body: newContent, - }, - "m.relates_to": { - rel_type: "m.replace", - event_id: eventId, - }, - } as unknown as Record); - }, - - async delete(roomId: MatrixRoomId, eventId: string, cfg?: OpenClawConfig) { - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) { - throw new Error("Matrix not configured"); - } - - // Matrix uses redaction for deletion - void matrixConfig.homeserverUrl; // reserved for future use - void matrixConfig.accessToken; // reserved for future use - const url = `${matrixConfig.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/redact/${encodeURIComponent(eventId)}`; - await fetch(url, { - method: "POST", - headers: { - Authorization: `Bearer ${matrixConfig.accessToken}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ reason: "User requested deletion" }), - }); - }, -}; - -/** - * Thread binding helpers for Matrix (rooms as threads) - */ -async function setMatrixThreadBindingIdleTimeoutBySessionKey( - sessionKey: string, - timeoutMs: number, -): Promise { - // Matrix rooms don't have idle timeouts in the same way as Discord threads - // This is a no-op for Matrix, rooms persist indefinitely - console.debug(`[Matrix] Thread binding idle timeout set (no-op): ${sessionKey} = ${timeoutMs}ms`); -} - -async function setMatrixThreadBindingMaxAgeBySessionKey( - sessionKey: string, - maxAgeMs: number, -): Promise { - // Matrix rooms don't expire - console.debug(`[Matrix] Thread binding max age set (no-op): ${sessionKey} = ${maxAgeMs}ms`); -} - -/** - * Matrix Channel Plugin Definition - */ -export const matrixChannelPlugin: ChannelPlugin = { - id: "matrix", - meta: { - name: "Matrix", - description: "Matrix protocol support for triad communication", - version: "0.1.0", - author: "Tabula Myriad", - license: "MIT", - }, - capabilities: { - messaging: true, - threading: false, // Matrix uses rooms, not threads - reactions: true, - files: true, - voice: false, - video: false, - presence: true, - typing: false, // TODO: implement typing indicator - readReceipts: true, - encryption: true, // E2E encryption supported - federation: true, - }, - config: { - async resolveAccount(accountId, cfg) { - return cfg?.channels?.matrix as MatrixAccountConfig | null; - }, - async listAccounts(cfg) { - // For triad, single account configuration - const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined; - return matrixConfig ? [{ id: "matrix-default", config: matrixConfig }] : []; - }, - }, - setup: { - async validate(cfg) { - const result = await probeMatrix(cfg); - return { ok: result.ok, error: result.error }; - }, - async wizard() { - // Interactive setup wizard for Matrix configuration - return { - steps: [ - { - field: "homeserverUrl", - prompt: "Matrix homeserver URL (e.g., https://triad.local:8448)", - validate: (v: string) => v.startsWith("http"), - }, - { - field: "userId", - prompt: "Matrix user ID (e.g., @tm1:triad.local)", - validate: (v: string) => v.startsWith("@") && v.includes(":"), - }, - { - field: "password", - prompt: "Matrix password", - sensitive: true, - }, - ], - }; - }, - }, - pairing: { - async requestPairing(_channel: string, _accountId: string, _cfg?: OpenClawConfig) { - // Matrix uses user accounts, not bot pairing - return { status: "not_applicable", reason: "Matrix uses user authentication" }; - }, - }, - security: { - async auditPermissions(_channel: string, _accountId: string, _cfg?: OpenClawConfig) { - return { ok: true, warnings: [] }; - }, - }, - outbound: { - async send(message, target, accountId, cfg) { - const roomId = target; // Matrix target is room ID - return sendMessageMatrix(roomId, message, accountId, cfg); - }, - }, - status: { - async probe(cfg) { - return probeMatrix(cfg); - }, - async audit(_cfg?: OpenClawConfig) { - return { ok: true, findings: [] }; - }, - }, - gateway: { - methods: ["sendMessage", "readMessages", "probe"], - }, - allowlist: { - async resolveChannelAllowlist(roomId, cfg) { - return resolveMatrixChannelAllowlist(roomId, cfg); - }, - async resolveUserAllowlist(cfg) { - return resolveMatrixUserAllowlist(cfg); - }, - }, - messaging: { - async send(message, target, opts, cfg) { - return sendMessageMatrix(target, message, opts?.accountId, cfg); - }, - async read(target, limit, opts, cfg) { - return readMessagesMatrix(target, limit, opts?.accountId, cfg); - }, - }, - actions: { - async handle(action, payload, cfg) { - // Delegate to matrixMessageActions - const roomId = payload.roomId as string; - const eventId = payload.eventId as string; - - switch (action) { - case "react": - return matrixMessageActions.react(roomId, eventId, payload.emoji as string, cfg); - case "edit": - return matrixMessageActions.edit(roomId, eventId, payload.content as string, cfg); - case "delete": - return matrixMessageActions.delete(roomId, eventId, cfg); - default: - throw new Error(`Unknown Matrix action: ${action}`); - } - }, - }, - heartbeat: { - async check(cfg) { - const result = await monitorMatrixProvider(cfg); - return { ok: result.ok, message: result.error || "Matrix healthy" }; - }, - }, -}; - -// Export individual functions for direct use -export { - probeMatrix, - sendMessageMatrix, - readMessagesMatrix, - monitorMatrixProvider, - resolveMatrixChannelAllowlist, - resolveMatrixUserAllowlist, - matrixMessageActions, - setMatrixThreadBindingIdleTimeoutBySessionKey, - setMatrixThreadBindingMaxAgeBySessionKey, - MatrixApi, -}; diff --git a/src/services/node-sync-service.test.ts b/src/services/node-sync-service.test.ts deleted file mode 100755 index 0b6ca27782..0000000000 --- a/src/services/node-sync-service.test.ts +++ /dev/null @@ -1,362 +0,0 @@ -/** - * Node Sync Service Tests - * - * Tests for WebSocket peer sync, presence detection, and state sync - */ - -import { describe, it, expect, beforeEach, vi } from "vitest"; -import { - NodeSyncService, - PresenceDetector, - StateSyncManager, - PeerConnection, - TRIAD_NODES, - MessageType, - createNodeSyncService, -} from "./node-sync-service.js"; - -describe("NodeSyncService", () => { - describe("createNodeSyncService", () => { - it("creates a service instance for a valid node", () => { - const service = createNodeSyncService("TM-1"); - expect(service).toBeInstanceOf(NodeSyncService); - }); - - it("throws for invalid node ID", () => { - expect(() => createNodeSyncService("INVALID")).toThrow(); - }); - }); - - describe("TRIAD_NODES configuration", () => { - it("has all four triad nodes configured", () => { - expect(Object.keys(TRIAD_NODES)).toHaveLength(4); - expect(TRIAD_NODES["TM-1"]).toBeDefined(); - expect(TRIAD_NODES["TM-2"]).toBeDefined(); - expect(TRIAD_NODES["TM-3"]).toBeDefined(); - expect(TRIAD_NODES["TM-4"]).toBeDefined(); - }); - - it("TM-1 is configured as authority", () => { - expect(TRIAD_NODES["TM-1"].role).toBe("authority"); - }); - - it("other nodes are configured as participants", () => { - expect(TRIAD_NODES["TM-2"].role).toBe("participant"); - expect(TRIAD_NODES["TM-3"].role).toBe("participant"); - expect(TRIAD_NODES["TM-4"].role).toBe("participant"); - }); - }); -}); - -describe("PresenceDetector", () => { - let detector: PresenceDetector; - - beforeEach(() => { - detector = new PresenceDetector("TM-1"); - }); - - describe("updateHeartbeat", () => { - it("marks node as alive on heartbeat", () => { - detector.updateHeartbeat("TM-2"); - const presence = detector.getPresence("TM-2"); - expect(presence?.isAlive).toBe(true); - expect(presence?.connectionState).toBe("connected"); - }); - - it("emits node:online event when node comes alive", () => { - const callback = vi.fn(); - detector.on("node:online", callback); - detector.updateHeartbeat("TM-2"); - expect(callback).toHaveBeenCalledWith({ - nodeId: "TM-2", - presence: expect.anything(), - }); - }); - - it("updates latency when provided", () => { - detector.updateHeartbeat("TM-2", 5); - const presence = detector.getPresence("TM-2"); - expect(presence?.latencyMs).toBe(5); - }); - }); - - describe("checkTimeouts", () => { - it("marks node as offline after timeout", () => { - detector.updateHeartbeat("TM-2"); - vi.useFakeTimers(); - vi.advanceTimersByTime(16000); // Beyond 15s timeout - detector.checkTimeouts(); - const presence = detector.getPresence("TM-2"); - expect(presence?.isAlive).toBe(false); - expect(presence?.connectionState).toBe("disconnected"); - vi.useRealTimers(); - }); - - it("marks node as degraded at 70% timeout", () => { - detector.updateHeartbeat("TM-2"); - vi.useFakeTimers(); - vi.advanceTimersByTime(11000); // ~70% of 15s - detector.checkTimeouts(); - const presence = detector.getPresence("TM-2"); - expect(presence?.connectionState).toBe("degraded"); - vi.useRealTimers(); - }); - - it("emits node:offline event on timeout", () => { - detector.updateHeartbeat("TM-2"); - const callback = vi.fn(); - detector.on("node:offline", callback); - vi.useFakeTimers(); - vi.advanceTimersByTime(16000); - detector.checkTimeouts(); - expect(callback).toHaveBeenCalledWith({ - nodeId: "TM-2", - presence: expect.anything(), - elapsed: expect.any(Number), - }); - vi.useRealTimers(); - }); - }); - - describe("getQuorumStatus", () => { - it("counts self as alive", () => { - const status = detector.getQuorumStatus(); - expect(status.available).toBe(1); // Self - expect(status.hasQuorum).toBe(false); // Need 2-of-3 - }); - - it("has quorum when 2 nodes alive", () => { - detector.updateHeartbeat("TM-2"); - const status = detector.getQuorumStatus(); - expect(status.available).toBe(2); - expect(status.hasQuorum).toBe(true); - }); - - it("has quorum when all nodes alive", () => { - detector.updateHeartbeat("TM-2"); - detector.updateHeartbeat("TM-3"); - const status = detector.getQuorumStatus(); - expect(status.available).toBe(3); - expect(status.total).toBe(4); // 3 peers + self - expect(status.hasQuorum).toBe(true); - }); - }); - - describe("startMonitoring", () => { - it("returns interval ID", () => { - const interval = detector.startMonitoring(); - expect(interval).toBeDefined(); - }); - - it("clears interval on cleanup", () => { - const interval = detector.startMonitoring(); - detector.emit("cleanup"); - expect(clearInterval).toHaveBeenCalledWith(interval); - }); - }); -}); - -describe("StateSyncManager", () => { - let manager: StateSyncManager; - - beforeEach(() => { - manager = new StateSyncManager("TM-1"); - }); - - describe("updateLocalState", () => { - it("updates local state with provided fields", () => { - manager.updateLocalState({ - gitHash: "abc123", - ledgerHash: "def456", - }); - const state = manager.getState("TM-1") as unknown as { gitHash: string; ledgerHash: string }; - expect(state.gitHash).toBe("abc123"); - expect(state.ledgerHash).toBe("def456"); - }); - - it("updates lastSyncTimestamp", () => { - const before = Date.now(); - manager.updateLocalState({ gitHash: "test" }); - const state = manager.getState("TM-1") as unknown as { lastSyncTimestamp: number }; - expect(state.lastSyncTimestamp).toBeGreaterThanOrEqual(before); - }); - - it("emits state:updated event", () => { - const callback = vi.fn(); - manager.on("state:updated", callback); - manager.updateLocalState({ gitHash: "test" }); - expect(callback).toHaveBeenCalledWith({ - nodeId: "TM-1", - state: expect.anything(), - }); - }); - }); - - describe("receivePeerState", () => { - it("stores peer state", () => { - manager.receivePeerState("TM-2", { - gitHash: "xyz789", - ledgerHash: "uvw012", - memoryEntries: 100, - consensusVotes: 5, - lastSyncTimestamp: Date.now(), - }); - const state = manager.getState("TM-2"); - expect(state?.gitHash).toBe("xyz789"); - }); - - it("emits state:received event", () => { - const callback = vi.fn(); - manager.on("state:received", callback); - manager.receivePeerState("TM-2", { - gitHash: "test", - ledgerHash: "test", - memoryEntries: 0, - consensusVotes: 0, - lastSyncTimestamp: Date.now(), - }); - expect(callback).toHaveBeenCalledWith({ - nodeId: "TM-2", - state: expect.anything(), - }); - }); - }); - - describe("checkDivergence", () => { - it("returns no divergence when states match", () => { - manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" }); - manager.receivePeerState("TM-2", { - gitHash: "abc", - ledgerHash: "def", - memoryEntries: 0, - consensusVotes: 0, - lastSyncTimestamp: Date.now(), - }); - const result = manager.checkDivergence(); - expect(result.diverged).toBe(false); - }); - - it("detects git divergence", () => { - manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" }); - manager.receivePeerState("TM-2", { - gitHash: "xyz", - ledgerHash: "def", - memoryEntries: 0, - consensusVotes: 0, - lastSyncTimestamp: Date.now(), - }); - const result = manager.checkDivergence(); - expect(result.diverged).toBe(true); - expect(result.details).toContainEqual(expect.stringContaining("Git divergence")); - }); - - it("detects ledger divergence", () => { - manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" }); - manager.receivePeerState("TM-2", { - gitHash: "abc", - ledgerHash: "xyz", - memoryEntries: 0, - consensusVotes: 0, - lastSyncTimestamp: Date.now(), - }); - const result = manager.checkDivergence(); - expect(result.diverged).toBe(true); - expect(result.details).toContainEqual(expect.stringContaining("Ledger divergence")); - }); - - it("emits divergence:detected event", () => { - const callback = vi.fn(); - manager.on("divergence:detected", callback); - manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" }); - manager.receivePeerState("TM-2", { - gitHash: "xyz", - ledgerHash: "def", - memoryEntries: 0, - consensusVotes: 0, - lastSyncTimestamp: Date.now(), - }); - manager.checkDivergence(); - expect(callback).toHaveBeenCalledWith({ - diverged: true, - details: expect.any(Array), - }); - }); - }); -}); - -describe("PeerConnection", () => { - let peer: PeerConnection; - - beforeEach(() => { - peer = new PeerConnection("TM-2", "ws://192.168.31.209:8765"); - }); - - describe("constructor", () => { - it("initializes with node ID and URL", () => { - expect(peer.nodeId).toBe("TM-2"); - expect(peer.url).toBe("ws://192.168.31.209:8765"); - expect(peer.state).toBe("disconnected"); - }); - }); - - describe("message queue", () => { - it("queues messages when disconnected", () => { - const message = { - type: MessageType.HEARTBEAT, - sourceNodeId: "TM-1", - timestamp: Date.now(), - payload: {}, - }; - peer.send(message); - expect(peer.messageQueue).toHaveLength(1); - }); - - it("flushes queue on connect", () => { - // Mock WebSocket - peer.ws = { - send: vi.fn(), - close: vi.fn(), - on: vi.fn(), - } as unknown as WebSocket; - peer.state = "connected"; - - peer.messageQueue.push({ - type: MessageType.HEARTBEAT, - sourceNodeId: "TM-1", - timestamp: Date.now(), - payload: {}, - }); - - peer.flushMessageQueue(); - expect(peer.ws.send).toHaveBeenCalled(); - // ts-eslint: ignore-next-line unbound-method - expect(peer.messageQueue).toHaveLength(0); - }); - }); - - describe("reconnect", () => { - it("schedules reconnect with exponential backoff", () => { - peer.reconnectAttempts = 0; - peer.scheduleReconnect(); - expect(peer.reconnectAttempts).toBe(1); - expect(peer.state).toBe("reconnecting"); - }); - - it("stops after max attempts", () => { - peer.reconnectAttempts = 5; - peer.scheduleReconnect(); - expect(peer.state).toBe("disconnected"); - }); - }); -}); - -describe("MessageType enum", () => { - it("includes all message types", () => { - expect(MessageType.HEARTBEAT).toBe("heartbeat"); - expect(MessageType.PRESENCE_ANNOUNCE).toBe("presence:announce"); - expect(MessageType.STATE_SYNC_REQUEST).toBe("state:sync:request"); - expect(MessageType.CONSENSUS_PROPOSAL).toBe("consensus:proposal"); - expect(MessageType.MEMORY_REPLICATE).toBe("memory:replicate"); - expect(MessageType.ERROR_REPORT).toBe("error:report"); - }); -}); diff --git a/src/services/node-sync-service.ts b/src/services/node-sync-service.ts deleted file mode 100755 index 36aceaa712..0000000000 --- a/src/services/node-sync-service.ts +++ /dev/null @@ -1,754 +0,0 @@ -/** - * Node Sync Service — WebSocket Peer-to-Peer Architecture for Triad Nodes - * - * Provides direct inter-node communication beyond Discord: - * - WebSocket peer sync for real-time state propagation - * - Presence detection (heartbeat-based liveness) - * - State sync for non-Git data (consensus votes, memory entries) - * - Failure mode detection and recovery - * - * Triad Nodes: - * - TM-1: silica-animus (192.168.31.99) — Authority - * - TM-2: testbench (192.168.31.209) - * - TM-3: tabula-myriad-3 (192.168.31.85) - * - TM-4: tabula-myriad-4 (192.168.31.205) - * - * @module NodeSyncService - */ - -import { EventEmitter } from "events"; -import WebSocket from "ws"; -import { Logger } from "../logger.js"; - -const logger = new Logger("node-sync-service"); - -// ============================================================================ -// Configuration -// ============================================================================ - -export interface TriadNodeConfig { - nodeId: string; - hostname: string; - ipAddress: string; - wsPort: number; - role: "authority" | "participant"; -} - -export const TRIAD_NODES: Record = { - "TM-1": { - nodeId: "TM-1", - hostname: "silica-animus", - ipAddress: "192.168.31.99", - wsPort: 8765, - role: "authority", - }, - "TM-2": { - nodeId: "TM-2", - hostname: "testbench", - ipAddress: "192.168.31.209", - wsPort: 8765, - role: "participant", - }, - "TM-3": { - nodeId: "TM-3", - hostname: "tabula-myriad-3", - ipAddress: "192.168.31.85", - wsPort: 8765, - role: "participant", - }, - "TM-4": { - nodeId: "TM-4", - hostname: "tabula-myriad-4", - ipAddress: "192.168.31.205", - wsPort: 8765, - role: "participant", - }, -}; - -// ============================================================================ -// Message Types -// ============================================================================ - -export enum MessageType { - // Presence & Heartbeat - HEARTBEAT = "heartbeat", - PRESENCE_ANNOUNCE = "presence:announce", - PRESENCE_QUERY = "presence:query", - - // State Sync - STATE_SYNC_REQUEST = "state:sync:request", - STATE_SYNC_RESPONSE = "state:sync:response", - STATE_DELTA = "state:delta", - - // Consensus - CONSENSUS_PROPOSAL = "consensus:proposal", - CONSENSUS_VOTE = "consensus:vote", - CONSENSUS_RESULT = "consensus:result", - - // Memory & Knowledge - MEMORY_REPLICATE = "memory:replicate", - KNOWLEDGE_INDEX = "knowledge:index", - - // Coordination - TASK_ASSIGN = "task:assign", - TASK_COMPLETE = "task:complete", - - // Error & Recovery - ERROR_REPORT = "error:report", - RECOVERY_REQUEST = "recovery:request", -} - -export interface SyncMessage { - type: MessageType; - sourceNodeId: string; - targetNodeId?: string; - timestamp: number; - payload: unknown; - sequenceNumber?: number; -} - -// ============================================================================ -// Presence Detection -// ============================================================================ - -export interface NodePresence { - nodeId: string; - isAlive: boolean; - lastHeartbeat: number; - connectionState: "connected" | "disconnected" | "degraded"; - latencyMs?: number; -} - -export class PresenceDetector extends EventEmitter { - private nodes: Map = new Map(); - private heartbeatIntervalMs: number; - private timeoutThresholdMs: number; - private localNodeId: string; - - constructor(localNodeId: string, heartbeatIntervalMs = 5000, timeoutThresholdMs = 15000) { - super(); - this.localNodeId = localNodeId; - this.heartbeatIntervalMs = heartbeatIntervalMs; - this.timeoutThresholdMs = timeoutThresholdMs; - - // Initialize all triad nodes as unknown - for (const nodeId in TRIAD_NODES) { - if (nodeId !== localNodeId) { - this.nodes.set(nodeId, { - nodeId, - isAlive: false, - lastHeartbeat: 0, - connectionState: "disconnected", - }); - } - } - } - - updateHeartbeat(nodeId: string, latencyMs?: number) { - const presence = this.nodes.get(nodeId); - if (!presence) { - return; - } - - const wasAlive = presence.isAlive; - presence.isAlive = true; - presence.lastHeartbeat = Date.now(); - presence.latencyMs = latencyMs; - presence.connectionState = "connected"; - - if (!wasAlive) { - logger.info(`Node ${nodeId} is now alive`); - this.emit("node:online", { nodeId, presence }); - } - } - - checkTimeouts() { - const now = Date.now(); - for (const [nodeId, presence] of this.nodes) { - if (!presence.isAlive) { - continue; - } - - const elapsed = now - presence.lastHeartbeat; - if (elapsed > this.timeoutThresholdMs) { - logger.warn(`Node ${nodeId} heartbeat timeout (${elapsed}ms)`); - presence.isAlive = false; - presence.connectionState = "disconnected"; - this.emit("node:offline", { nodeId, presence, elapsed }); - } else if (elapsed > this.timeoutThresholdMs * 0.7) { - presence.connectionState = "degraded"; - this.emit("node:degraded", { nodeId, presence, elapsed }); - } - } - } - - getPresence(nodeId: string): NodePresence | undefined { - return this.nodes.get(nodeId); - } - - getAllPresence(): Map { - return this.nodes; - } - - getQuorumStatus(): { available: number; total: number; hasQuorum: boolean } { - let available = 0; - for (const presence of this.nodes.values()) { - if (presence.isAlive) { - available++; - } - } - // Add self - available++; - - const total = this.nodes.size + 1; - const hasQuorum = available >= 2; // 2-of-3 minimum - - return { available, total, hasQuorum }; - } - - startMonitoring() { - const interval = setInterval(() => this.checkTimeouts(), this.heartbeatIntervalMs); - this.on("cleanup", () => clearInterval(interval)); - return interval; - } -} - -// ============================================================================ -// WebSocket Peer Manager -// ============================================================================ - -export class PeerConnection { - nodeId: string; - ws: WebSocket | null = null; - url: string; - state: "connecting" | "connected" | "disconnected" | "reconnecting" = "disconnected"; - reconnectAttempts = 0; - maxReconnectAttempts = 5; - messageQueue: SyncMessage[] = []; - - constructor(nodeId: string, url: string) { - this.nodeId = nodeId; - this.url = url; - } - - connect(): Promise { - return new Promise((resolve, reject) => { - this.state = "connecting"; - - try { - this.ws = new WebSocket(this.url); - - this.ws.on("open", () => { - logger.info(`Connected to ${this.nodeId} at ${this.url}`); - this.state = "connected"; - this.reconnectAttempts = 0; - this.flushMessageQueue(); - resolve(); - }); - - this.ws.on("close", () => { - logger.warn(`Connection to ${this.nodeId} closed`); - this.state = "disconnected"; - this.scheduleReconnect(); - }); - - this.ws.on("error", (err) => { - logger.error(`Error connecting to ${this.nodeId}:`, err.message); - this.state = "disconnected"; - reject(err); - }); - - this.ws.on("message", (data: string) => { - this.handleMessage(data); - }); - } catch (err) { - reject(err); - } - }); - } - - private scheduleReconnect() { - if (this.reconnectAttempts >= this.maxReconnectAttempts) { - logger.error(`Max reconnect attempts reached for ${this.nodeId}`); - this.state = "disconnected"; - return; - } - - this.reconnectAttempts++; - const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); - - logger.info(`Reconnecting to ${this.nodeId} in ${delay}ms (attempt ${this.reconnectAttempts})`); - this.state = "reconnecting"; - - setTimeout(() => this.connect(), delay); - } - - send(message: SyncMessage): boolean { - if (this.state !== "connected" || !this.ws) { - this.messageQueue.push(message); - return false; - } - - try { - this.ws.send(JSON.stringify(message)); - return true; - } catch (err) { - logger.error(`Failed to send to ${this.nodeId}:`, err); - this.messageQueue.push(message); - return false; - } - } - - private flushMessageQueue() { - while (this.messageQueue.length > 0) { - const msg = this.messageQueue.shift(); - if (msg && this.ws) { - this.ws.send(JSON.stringify(msg)); - } - } - } - - private handleMessage(data: string) { - try { - const message: SyncMessage = JSON.parse(data); - this.emit("message", message); - } catch (err) { - logger.error(`Failed to parse message from ${this.nodeId}:`, err); - } - } - - close() { - if (this.ws) { - this.ws.close(); - this.ws = null; - } - this.state = "disconnected"; - } -} - -// ============================================================================ -// State Sync Manager -// ============================================================================ - -export interface SyncState { - gitHash: string; - ledgerHash: string; - memoryEntries: number; - consensusVotes: number; - lastSyncTimestamp: number; -} - -export class StateSyncManager extends EventEmitter { - private localState: SyncState; - private peerStates: Map = new Map(); - private localNodeId: string; - - constructor(localNodeId: string) { - super(); - this.localNodeId = localNodeId; - this.localState = { - gitHash: "", - ledgerHash: "", - memoryEntries: 0, - consensusVotes: 0, - lastSyncTimestamp: Date.now(), - }; - } - - updateLocalState(state: Partial) { - this.localState = { ...this.localState, ...state, lastSyncTimestamp: Date.now() }; - this.emit("state:updated", { nodeId: this.localNodeId, state: this.localState }); - } - - receivePeerState(nodeId: string, state: SyncState) { - this.peerStates.set(nodeId, state); - this.emit("state:received", { nodeId, state }); - this.checkDivergence(); - } - - checkDivergence(): { diverged: boolean; details: string[] } { - const diverged: string[] = []; - - for (const [nodeId, state] of this.peerStates) { - if (state.gitHash !== this.localState.gitHash) { - diverged.push( - `Git divergence with ${nodeId}: local=${this.localState.gitHash}, remote=${state.gitHash}`, - ); - } - if (state.ledgerHash !== this.localState.ledgerHash) { - diverged.push( - `Ledger divergence with ${nodeId}: local=${this.localState.ledgerHash}, remote=${state.ledgerHash}`, - ); - } - } - - if (diverged.length > 0) { - logger.warn("State divergence detected:", diverged); - this.emit("divergence:detected", { diverged, details: diverged }); - return { diverged: true, details: diverged }; - } - - return { diverged: false, details: [] }; - } - - getState(nodeId?: string): SyncState | Map { - if (nodeId) { - return this.peerStates.get(nodeId) || this.localState; - } - return this.peerStates; - } - - async requestSyncFromPeer(nodeId: string): Promise { - return new Promise((resolve) => { - const handler = ({ nodeId: source, state }: { nodeId: string; state: SyncState }) => { - if (source === nodeId) { - this.off("state:received", handler); - resolve(state); - } - }; - - this.once("state:received", handler); - this.emit("state:sync:request", { targetNodeId: nodeId }); - - // Timeout after 5 seconds - setTimeout(() => { - this.off("state:received", handler); - resolve(null); - }, 5000); - }); - } -} - -// ============================================================================ -// Node Sync Service (Main Coordinator) -// ============================================================================ - -export class NodeSyncService extends EventEmitter { - private localNodeId: string; - private presenceDetector: PresenceDetector; - private stateSyncManager: StateSyncManager; - private peerConnections: Map = new Map(); - private wsServer?: WebSocket.Server; - private messageSequence = 0; - - constructor(localNodeId: string) { - super(); - this.localNodeId = localNodeId; - this.presenceDetector = new PresenceDetector(localNodeId); - this.stateSyncManager = new StateSyncManager(localNodeId); - - this.setupEventHandlers(); - } - - private setupEventHandlers() { - // Presence events - this.presenceDetector.on("node:online", (data) => { - this.emit("node:online", data); - }); - - this.presenceDetector.on("node:offline", (data) => { - this.emit("node:offline", data); - this.handleNodeOffline(data.nodeId); - }); - - this.presenceDetector.on("node:degraded", (data) => { - this.emit("node:degraded", data); - }); - - // State sync events - this.stateSyncManager.on("divergence:detected", (data) => { - this.emit("divergence:detected", data); - this.handleDivergence(data); - }); - - this.stateSyncManager.on("state:updated", (data) => { - this.broadcastState(data.state); - }); - } - - async initialize(): Promise { - logger.info(`Initializing Node Sync Service for ${this.localNodeId}`); - - // Start WebSocket server for incoming connections - await this.startServer(); - - // Connect to peer nodes - await this.connectToPeers(); - - // Start presence monitoring - this.presenceDetector.startMonitoring(); - - logger.info("Node Sync Service initialized"); - } - - private async startServer(): Promise { - const config = TRIAD_NODES[this.localNodeId]; - const port = config?.wsPort || 8765; - - return new Promise((resolve, reject) => { - try { - this.wsServer = new WebSocket.Server({ port }); - - this.wsServer.on("connection", (ws) => { - this.handleIncomingConnection(ws); - }); - - this.wsServer.on("listening", () => { - logger.info(`WebSocket server listening on port ${port}`); - resolve(); - }); - - this.wsServer.on("error", (err) => { - logger.error("WebSocket server error:", err); - reject(err); - }); - } catch (err) { - reject(err); - } - }); - } - - private handleIncomingConnection(ws: WebSocket) { - logger.info("Incoming WebSocket connection"); - - ws.on("message", (data: string) => { - try { - const message: SyncMessage = JSON.parse(data); - this.handleMessage(message); - } catch (err) { - logger.error("Failed to parse incoming message:", err); - } - }); - - ws.on("close", () => { - logger.info("WebSocket connection closed"); - }); - } - - private async connectToPeers(): Promise { - const config = TRIAD_NODES[this.localNodeId]; - if (!config) { - logger.error("Local node config not found"); - return; - } - - for (const [nodeId, nodeConfig] of Object.entries(TRIAD_NODES)) { - if (nodeId === this.localNodeId) { - continue; - } - - const url = `ws://${nodeConfig.ipAddress}:${nodeConfig.wsPort}`; - const peer = new PeerConnection(nodeId, url); - - peer.on("message", (msg: SyncMessage) => { - this.handleMessage(msg); - }); - - this.peerConnections.set(nodeId, peer); - - try { - await peer.connect(); - this.presenceDetector.updateHeartbeat(nodeId, peer.ws?.PING?.() ? 0 : undefined); - } catch (err) { - logger.error(`Failed to connect to ${nodeId}:`, err); - } - } - } - - private handleMessage(message: SyncMessage) { - logger.debug(`Received ${message.type} from ${message.sourceNodeId}`); - - switch (message.type) { - case MessageType.HEARTBEAT: - this.presenceDetector.updateHeartbeat(message.sourceNodeId); - break; - - case MessageType.STATE_SYNC_RESPONSE: - this.stateSyncManager.receivePeerState(message.sourceNodeId, message.payload as SyncState); - break; - - case MessageType.CONSENSUS_PROPOSAL: - this.emit("consensus:proposal", message); - break; - - case MessageType.CONSENSUS_VOTE: - this.emit("consensus:vote", message); - break; - - case MessageType.MEMORY_REPLICATE: - this.emit("memory:replicate", message); - break; - - default: - logger.debug(`Unhandled message type: ${message.type}`); - } - } - - private handleNodeOffline(nodeId: string) { - logger.warn(`Node ${nodeId} is offline, initiating recovery`); - - const peer = this.peerConnections.get(nodeId); - if (peer) { - peer.close(); - } - - this.emit("recovery:needed", { nodeId, reason: "offline" }); - } - - private handleDivergence(data: { diverged: boolean; details: string[] }) { - if (!data.diverged) { - return; - } - - logger.warn("Divergence detected, requesting sync from authority node"); - - // Request state sync from TM-1 (authority) - if (this.localNodeId !== "TM-1") { - this.requestStateSync("TM-1"); - } - } - - sendHeartbeat() { - const message: SyncMessage = { - type: MessageType.HEARTBEAT, - sourceNodeId: this.localNodeId, - timestamp: Date.now(), - payload: { uptime: process.uptime() }, - sequenceNumber: this.messageSequence++, - }; - - this.broadcast(message); - } - - broadcastState(state: SyncState) { - const message: SyncMessage = { - type: MessageType.STATE_SYNC_RESPONSE, - sourceNodeId: this.localNodeId, - timestamp: Date.now(), - payload: state, - sequenceNumber: this.messageSequence++, - }; - - this.broadcast(message); - } - - broadcast(message: SyncMessage, targetNodeId?: string) { - if (targetNodeId) { - const peer = this.peerConnections.get(targetNodeId); - if (peer) { - peer.send(message); - } - } else { - for (const peer of this.peerConnections.values()) { - peer.send(message); - } - } - } - - requestStateSync(targetNodeId: string) { - const message: SyncMessage = { - type: MessageType.STATE_SYNC_REQUEST, - sourceNodeId: this.localNodeId, - targetNodeId, - timestamp: Date.now(), - payload: { requestedFields: ["gitHash", "ledgerHash", "memoryEntries"] }, - sequenceNumber: this.messageSequence++, - }; - - this.broadcast(message, targetNodeId); - } - - proposeConsensus(proposal: string) { - const message: SyncMessage = { - type: MessageType.CONSENSUS_PROPOSAL, - sourceNodeId: this.localNodeId, - timestamp: Date.now(), - payload: { proposal, gitHash: this.stateSyncManager.localState.gitHash }, - sequenceNumber: this.messageSequence++, - }; - - this.broadcast(message); - this.emit("consensus:proposed", { proposal }); - } - - submitConsensusVote(proposalId: string, vote: "yes" | "no" | "abstain") { - const message: SyncMessage = { - type: MessageType.CONSENSUS_VOTE, - sourceNodeId: this.localNodeId, - timestamp: Date.now(), - payload: { proposalId, vote }, - sequenceNumber: this.messageSequence++, - }; - - this.broadcast(message); - } - - replicateMemory(entry: { key: string; value: unknown; tier: "fact" | "episodic" | "pad" }) { - const message: SyncMessage = { - type: MessageType.MEMORY_REPLICATE, - sourceNodeId: this.localNodeId, - timestamp: Date.now(), - payload: entry, - sequenceNumber: this.messageSequence++, - }; - - this.broadcast(message); - } - - getPresenceStatus() { - return this.presenceDetector.getQuorumStatus(); - } - - getDivergenceStatus() { - return this.stateSyncManager.checkDivergence(); - } - - async shutdown(): Promise { - logger.info("Shutting down Node Sync Service"); - - // Close all peer connections - for (const peer of this.peerConnections.values()) { - peer.close(); - } - - // Close server - if (this.wsServer) { - await new Promise((resolve) => { - this.wsServer?.close(() => resolve()); - }); - } - - this.emit("shutdown"); - } -} - -// ============================================================================ -// Factory Function -// ============================================================================ - -export function createNodeSyncService(localNodeId: string): NodeSyncService { - return new NodeSyncService(localNodeId); -} - -// ============================================================================ -// Integration with Triad Heartbeat -// ============================================================================ - -/** - * Integration hook for triad-heartbeat skill - * Called on each heartbeat wake to check sync state - */ -export async function triadHeartbeatIntegration( - service: NodeSyncService, - callback: (status: { - quorum: { available: number; total: number; hasQuorum: boolean }; - divergence: { diverged: boolean; details: string[] }; - presence: Map; - }) => void, -) { - const quorum = service.getPresenceStatus(); - const divergence = service.getDivergenceStatus(); - const presence = service.presenceDetector.getAllPresence(); - - callback({ quorum, divergence, presence }); -}