refactor: remove uncompiled node-sync-service + matrix-channel + tests

These files have TypeScript type mismatches with current ChannelPluginContract
and EventEmitter patterns. Will re-implement after build pipeline stabilizes.

Keeping:
- docker-compose.matrix.yml (Matrix homeserver config)
- docs/matrix-triad-setup.md (setup documentation)
- docs/mcp-triad-integration.md (MCP integration docs)
- docs/mcp-curiosity-mapping.md (capability mapping)
- docs/node-sync-architecture.md (architecture design)
- scripts/triad-corruption-check.mjs (corruption detection)
- docs/triad-resilience.md (recovery procedures)
- .secure/deployment-logs/README.md (schema v2)
- skills/triad-heartbeat/SKILL.md (heartbeat integration)
- scripts/npm-publish.mjs (NPM automation)
- .github/workflows/npm-publish.yml (GitHub Actions)
- docs/npm-publish-guide.md (publish documentation)
This commit is contained in:
Tabula Myriad
2026-03-24 02:59:07 -04:00
parent f625b3d8a1
commit dc306695cb
3 changed files with 0 additions and 1661 deletions
-545
View File
@@ -1,545 +0,0 @@
/**
* Matrix Channel Plugin for OpenClaw
*
* Provides Matrix protocol support for triad node communication.
* Uses Dendrite homeserver with E2E encryption.
*
* @see docs/matrix-triad-setup.md for deployment guide
*/
import type { OpenClawConfig } from "../../config/config.js";
import type { ChannelPlugin } from "./types.plugin.js";
// Matrix API types
type MatrixRoomId = string;
type MatrixUserId = string;
type MatrixAccessToken = string;
type MatrixMessageContent = {
msgtype: "m.text" | "m.notice" | "m.emote";
body: string;
format?: "org.matrix.custom.html";
formatted_body?: string;
};
type MatrixEventResponse = {
event_id: string;
};
type MatrixLoginResponse = {
access_token: MatrixAccessToken;
home_server: string;
user_id: MatrixUserId;
device_id?: string;
};
// Matrix homeserver configuration
type MatrixAccountConfig = {
homeserverUrl: string;
userId: MatrixUserId;
password?: string;
accessToken?: MatrixAccessToken;
deviceId?: string;
};
type MatrixProbeResult = {
ok: boolean;
homeserver: string;
userId?: MatrixUserId;
deviceId?: string;
error?: string;
};
/**
* Matrix API client
*/
class MatrixApi {
constructor(
private homeserverUrl: string,
private accessToken: MatrixAccessToken,
) {}
async sendMessage(
roomId: MatrixRoomId,
content: MatrixMessageContent,
): Promise<MatrixEventResponse> {
const url = `${this.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/send/m.room.message`;
const response = await fetch(url, {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify(content),
});
if (!response.ok) {
throw new Error(`Matrix API error: ${response.status} ${response.statusText}`);
}
return response.json();
}
async readMessages(roomId: MatrixRoomId, limit: number = 10): Promise<unknown[]> {
const url = `${this.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/messages?dir=b&limit=${limit}`;
const response = await fetch(url, {
headers: {
Authorization: `Bearer ${this.accessToken}`,
},
});
if (!response.ok) {
throw new Error(`Matrix API error: ${response.status}`);
}
const data = await response.json();
return data.chunk || [];
}
async login(userId: string, password: string): Promise<MatrixLoginResponse> {
const url = `${this.homeserverUrl}/_matrix/client/v3/login`;
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
type: "m.login.password",
identifier: {
type: "m.id.user",
user: userId,
},
password: password,
}),
});
if (!response.ok) {
throw new Error(`Matrix login failed: ${response.status}`);
}
return response.json();
}
async getJoinedRooms(): Promise<{ joined_rooms: MatrixRoomId[] }> {
const url = `${this.homeserverUrl}/_matrix/client/v3/join_rooms`;
const response = await fetch(url, {
headers: {
Authorization: `Bearer ${this.accessToken}`,
},
});
if (!response.ok) {
throw new Error(`Matrix API error: ${response.status}`);
}
return response.json();
}
async createRoom(name: string, encrypted: boolean = true): Promise<{ room_id: MatrixRoomId }> {
const url = `${this.homeserverUrl}/_matrix/client/v3/createRoom`;
const body: unknown = {
name: name,
visibility: "private",
preset: "private_chat",
};
if (encrypted) {
body.initial_state = [
{
type: "m.room.encryption",
state_key: "",
content: {
algorithm: "m.megolm.v1.aes-shash",
},
},
];
}
const response = await fetch(url, {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
if (!response.ok) {
throw new Error(`Matrix create room failed: ${response.status}`);
}
return response.json();
}
async invite(roomId: MatrixRoomId, userId: MatrixUserId): Promise<void> {
const url = `${this.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/invite`;
const response = await fetch(url, {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ user_id: userId }),
});
if (!response.ok) {
throw new Error(`Matrix invite failed: ${response.status}`);
}
}
async healthCheck(): Promise<boolean> {
try {
const url = `${this.homeserverUrl}/health`;
const response = await fetch(url, { method: "GET" });
return response.ok;
} catch {
return false;
}
}
}
/**
* Probe Matrix homeserver connectivity
*/
async function probeMatrix(cfg?: OpenClawConfig): Promise<MatrixProbeResult> {
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
if (!matrixConfig?.homeserverUrl) {
return { ok: false, homeserver: "", error: "No Matrix homeserver configured" };
}
const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken || "");
try {
const healthy = await api.healthCheck();
return {
ok: healthy,
homeserver: matrixConfig.homeserverUrl,
userId: matrixConfig.userId,
};
} catch (error) {
return {
ok: false,
homeserver: matrixConfig.homeserverUrl,
error: error instanceof Error ? error.message : "Unknown error",
};
}
}
/**
* Send message to Matrix room
*/
async function sendMessageMatrix(
roomId: MatrixRoomId,
content: string,
accountId?: string,
cfg?: OpenClawConfig,
): Promise<{ ok: true; eventId: string }> {
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) {
throw new Error("Matrix not configured: missing homeserver URL or access token");
}
const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken);
const result = await api.sendMessage(roomId, {
msgtype: "m.text",
body: content,
});
return { ok: true, eventId: result.event_id };
}
/**
* Read recent messages from Matrix room
*/
async function readMessagesMatrix(
roomId: MatrixRoomId,
limit: number = 10,
accountId?: string,
cfg?: OpenClawConfig,
): Promise<unknown[]> {
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) {
throw new Error("Matrix not configured: missing Homeserver URL or access token");
}
const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken);
return api.readMessages(roomId, limit);
}
/**
* Monitor Matrix provider health
*/
async function monitorMatrixProvider(cfg?: OpenClawConfig): Promise<{
ok: boolean;
homeserver: string;
userId?: string;
error?: string;
}> {
return probeMatrix(cfg);
}
/**
* Resolve Matrix channel allowlist
*/
async function resolveMatrixChannelAllowlist(
_roomId: MatrixRoomId,
_cfg?: OpenClawConfig,
): Promise<string[]> {
// For triad use case, allowlist is the 4 triad nodes
const triadUsers = [
"@tm1:triad.local",
"@tm2:triad.local",
"@tm3:triad.local",
"@tm4:triad.local",
];
return triadUsers;
}
/**
* Resolve Matrix user allowlist
*/
async function resolveMatrixUserAllowlist(_cfg?: OpenClawConfig): Promise<string[]> {
return ["@tm1:triad.local", "@tm2:triad.local", "@tm3:triad.local", "@tm4:triad.local"];
}
/**
* Matrix message actions (reactions, edits, deletes)
*/
const matrixMessageActions = {
async react(roomId: MatrixRoomId, eventId: string, emoji: string, cfg?: OpenClawConfig) {
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) {
throw new Error("Matrix not configured");
}
const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken);
// Matrix reactions are sent as relation annotations
await api.sendMessage(roomId, {
msgtype: "m.reaction",
body: emoji,
"m.relates_to": {
rel_type: "m.annotation",
event_id: eventId,
key: emoji,
},
} as unknown as Record<string, unknown>);
},
async edit(roomId: MatrixRoomId, eventId: string, newContent: string, cfg?: OpenClawConfig) {
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) {
throw new Error("Matrix not configured");
}
const api = new MatrixApi(matrixConfig.homeserverUrl, matrixConfig.accessToken);
await api.sendMessage(roomId, {
msgtype: "m.text",
body: newContent,
"m.new_content": {
msgtype: "m.text",
body: newContent,
},
"m.relates_to": {
rel_type: "m.replace",
event_id: eventId,
},
} as unknown as Record<string, unknown>);
},
async delete(roomId: MatrixRoomId, eventId: string, cfg?: OpenClawConfig) {
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
if (!matrixConfig?.homeserverUrl || !matrixConfig?.accessToken) {
throw new Error("Matrix not configured");
}
// Matrix uses redaction for deletion
void matrixConfig.homeserverUrl; // reserved for future use
void matrixConfig.accessToken; // reserved for future use
const url = `${matrixConfig.homeserverUrl}/_matrix/client/v3/rooms/${encodeURIComponent(roomId)}/redact/${encodeURIComponent(eventId)}`;
await fetch(url, {
method: "POST",
headers: {
Authorization: `Bearer ${matrixConfig.accessToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ reason: "User requested deletion" }),
});
},
};
/**
* Thread binding helpers for Matrix (rooms as threads)
*/
async function setMatrixThreadBindingIdleTimeoutBySessionKey(
sessionKey: string,
timeoutMs: number,
): Promise<void> {
// Matrix rooms don't have idle timeouts in the same way as Discord threads
// This is a no-op for Matrix, rooms persist indefinitely
console.debug(`[Matrix] Thread binding idle timeout set (no-op): ${sessionKey} = ${timeoutMs}ms`);
}
async function setMatrixThreadBindingMaxAgeBySessionKey(
sessionKey: string,
maxAgeMs: number,
): Promise<void> {
// Matrix rooms don't expire
console.debug(`[Matrix] Thread binding max age set (no-op): ${sessionKey} = ${maxAgeMs}ms`);
}
/**
* Matrix Channel Plugin Definition
*/
export const matrixChannelPlugin: ChannelPlugin = {
id: "matrix",
meta: {
name: "Matrix",
description: "Matrix protocol support for triad communication",
version: "0.1.0",
author: "Tabula Myriad",
license: "MIT",
},
capabilities: {
messaging: true,
threading: false, // Matrix uses rooms, not threads
reactions: true,
files: true,
voice: false,
video: false,
presence: true,
typing: false, // TODO: implement typing indicator
readReceipts: true,
encryption: true, // E2E encryption supported
federation: true,
},
config: {
async resolveAccount(accountId, cfg) {
return cfg?.channels?.matrix as MatrixAccountConfig | null;
},
async listAccounts(cfg) {
// For triad, single account configuration
const matrixConfig = cfg?.channels?.matrix as MatrixAccountConfig | undefined;
return matrixConfig ? [{ id: "matrix-default", config: matrixConfig }] : [];
},
},
setup: {
async validate(cfg) {
const result = await probeMatrix(cfg);
return { ok: result.ok, error: result.error };
},
async wizard() {
// Interactive setup wizard for Matrix configuration
return {
steps: [
{
field: "homeserverUrl",
prompt: "Matrix homeserver URL (e.g., https://triad.local:8448)",
validate: (v: string) => v.startsWith("http"),
},
{
field: "userId",
prompt: "Matrix user ID (e.g., @tm1:triad.local)",
validate: (v: string) => v.startsWith("@") && v.includes(":"),
},
{
field: "password",
prompt: "Matrix password",
sensitive: true,
},
],
};
},
},
pairing: {
async requestPairing(_channel: string, _accountId: string, _cfg?: OpenClawConfig) {
// Matrix uses user accounts, not bot pairing
return { status: "not_applicable", reason: "Matrix uses user authentication" };
},
},
security: {
async auditPermissions(_channel: string, _accountId: string, _cfg?: OpenClawConfig) {
return { ok: true, warnings: [] };
},
},
outbound: {
async send(message, target, accountId, cfg) {
const roomId = target; // Matrix target is room ID
return sendMessageMatrix(roomId, message, accountId, cfg);
},
},
status: {
async probe(cfg) {
return probeMatrix(cfg);
},
async audit(_cfg?: OpenClawConfig) {
return { ok: true, findings: [] };
},
},
gateway: {
methods: ["sendMessage", "readMessages", "probe"],
},
allowlist: {
async resolveChannelAllowlist(roomId, cfg) {
return resolveMatrixChannelAllowlist(roomId, cfg);
},
async resolveUserAllowlist(cfg) {
return resolveMatrixUserAllowlist(cfg);
},
},
messaging: {
async send(message, target, opts, cfg) {
return sendMessageMatrix(target, message, opts?.accountId, cfg);
},
async read(target, limit, opts, cfg) {
return readMessagesMatrix(target, limit, opts?.accountId, cfg);
},
},
actions: {
async handle(action, payload, cfg) {
// Delegate to matrixMessageActions
const roomId = payload.roomId as string;
const eventId = payload.eventId as string;
switch (action) {
case "react":
return matrixMessageActions.react(roomId, eventId, payload.emoji as string, cfg);
case "edit":
return matrixMessageActions.edit(roomId, eventId, payload.content as string, cfg);
case "delete":
return matrixMessageActions.delete(roomId, eventId, cfg);
default:
throw new Error(`Unknown Matrix action: ${action}`);
}
},
},
heartbeat: {
async check(cfg) {
const result = await monitorMatrixProvider(cfg);
return { ok: result.ok, message: result.error || "Matrix healthy" };
},
},
};
// Export individual functions for direct use
export {
probeMatrix,
sendMessageMatrix,
readMessagesMatrix,
monitorMatrixProvider,
resolveMatrixChannelAllowlist,
resolveMatrixUserAllowlist,
matrixMessageActions,
setMatrixThreadBindingIdleTimeoutBySessionKey,
setMatrixThreadBindingMaxAgeBySessionKey,
MatrixApi,
};
-362
View File
@@ -1,362 +0,0 @@
/**
* Node Sync Service Tests
*
* Tests for WebSocket peer sync, presence detection, and state sync
*/
import { describe, it, expect, beforeEach, vi } from "vitest";
import {
NodeSyncService,
PresenceDetector,
StateSyncManager,
PeerConnection,
TRIAD_NODES,
MessageType,
createNodeSyncService,
} from "./node-sync-service.js";
describe("NodeSyncService", () => {
describe("createNodeSyncService", () => {
it("creates a service instance for a valid node", () => {
const service = createNodeSyncService("TM-1");
expect(service).toBeInstanceOf(NodeSyncService);
});
it("throws for invalid node ID", () => {
expect(() => createNodeSyncService("INVALID")).toThrow();
});
});
describe("TRIAD_NODES configuration", () => {
it("has all four triad nodes configured", () => {
expect(Object.keys(TRIAD_NODES)).toHaveLength(4);
expect(TRIAD_NODES["TM-1"]).toBeDefined();
expect(TRIAD_NODES["TM-2"]).toBeDefined();
expect(TRIAD_NODES["TM-3"]).toBeDefined();
expect(TRIAD_NODES["TM-4"]).toBeDefined();
});
it("TM-1 is configured as authority", () => {
expect(TRIAD_NODES["TM-1"].role).toBe("authority");
});
it("other nodes are configured as participants", () => {
expect(TRIAD_NODES["TM-2"].role).toBe("participant");
expect(TRIAD_NODES["TM-3"].role).toBe("participant");
expect(TRIAD_NODES["TM-4"].role).toBe("participant");
});
});
});
describe("PresenceDetector", () => {
let detector: PresenceDetector;
beforeEach(() => {
detector = new PresenceDetector("TM-1");
});
describe("updateHeartbeat", () => {
it("marks node as alive on heartbeat", () => {
detector.updateHeartbeat("TM-2");
const presence = detector.getPresence("TM-2");
expect(presence?.isAlive).toBe(true);
expect(presence?.connectionState).toBe("connected");
});
it("emits node:online event when node comes alive", () => {
const callback = vi.fn();
detector.on("node:online", callback);
detector.updateHeartbeat("TM-2");
expect(callback).toHaveBeenCalledWith({
nodeId: "TM-2",
presence: expect.anything(),
});
});
it("updates latency when provided", () => {
detector.updateHeartbeat("TM-2", 5);
const presence = detector.getPresence("TM-2");
expect(presence?.latencyMs).toBe(5);
});
});
describe("checkTimeouts", () => {
it("marks node as offline after timeout", () => {
detector.updateHeartbeat("TM-2");
vi.useFakeTimers();
vi.advanceTimersByTime(16000); // Beyond 15s timeout
detector.checkTimeouts();
const presence = detector.getPresence("TM-2");
expect(presence?.isAlive).toBe(false);
expect(presence?.connectionState).toBe("disconnected");
vi.useRealTimers();
});
it("marks node as degraded at 70% timeout", () => {
detector.updateHeartbeat("TM-2");
vi.useFakeTimers();
vi.advanceTimersByTime(11000); // ~70% of 15s
detector.checkTimeouts();
const presence = detector.getPresence("TM-2");
expect(presence?.connectionState).toBe("degraded");
vi.useRealTimers();
});
it("emits node:offline event on timeout", () => {
detector.updateHeartbeat("TM-2");
const callback = vi.fn();
detector.on("node:offline", callback);
vi.useFakeTimers();
vi.advanceTimersByTime(16000);
detector.checkTimeouts();
expect(callback).toHaveBeenCalledWith({
nodeId: "TM-2",
presence: expect.anything(),
elapsed: expect.any(Number),
});
vi.useRealTimers();
});
});
describe("getQuorumStatus", () => {
it("counts self as alive", () => {
const status = detector.getQuorumStatus();
expect(status.available).toBe(1); // Self
expect(status.hasQuorum).toBe(false); // Need 2-of-3
});
it("has quorum when 2 nodes alive", () => {
detector.updateHeartbeat("TM-2");
const status = detector.getQuorumStatus();
expect(status.available).toBe(2);
expect(status.hasQuorum).toBe(true);
});
it("has quorum when all nodes alive", () => {
detector.updateHeartbeat("TM-2");
detector.updateHeartbeat("TM-3");
const status = detector.getQuorumStatus();
expect(status.available).toBe(3);
expect(status.total).toBe(4); // 3 peers + self
expect(status.hasQuorum).toBe(true);
});
});
describe("startMonitoring", () => {
it("returns interval ID", () => {
const interval = detector.startMonitoring();
expect(interval).toBeDefined();
});
it("clears interval on cleanup", () => {
const interval = detector.startMonitoring();
detector.emit("cleanup");
expect(clearInterval).toHaveBeenCalledWith(interval);
});
});
});
describe("StateSyncManager", () => {
let manager: StateSyncManager;
beforeEach(() => {
manager = new StateSyncManager("TM-1");
});
describe("updateLocalState", () => {
it("updates local state with provided fields", () => {
manager.updateLocalState({
gitHash: "abc123",
ledgerHash: "def456",
});
const state = manager.getState("TM-1") as unknown as { gitHash: string; ledgerHash: string };
expect(state.gitHash).toBe("abc123");
expect(state.ledgerHash).toBe("def456");
});
it("updates lastSyncTimestamp", () => {
const before = Date.now();
manager.updateLocalState({ gitHash: "test" });
const state = manager.getState("TM-1") as unknown as { lastSyncTimestamp: number };
expect(state.lastSyncTimestamp).toBeGreaterThanOrEqual(before);
});
it("emits state:updated event", () => {
const callback = vi.fn();
manager.on("state:updated", callback);
manager.updateLocalState({ gitHash: "test" });
expect(callback).toHaveBeenCalledWith({
nodeId: "TM-1",
state: expect.anything(),
});
});
});
describe("receivePeerState", () => {
it("stores peer state", () => {
manager.receivePeerState("TM-2", {
gitHash: "xyz789",
ledgerHash: "uvw012",
memoryEntries: 100,
consensusVotes: 5,
lastSyncTimestamp: Date.now(),
});
const state = manager.getState("TM-2");
expect(state?.gitHash).toBe("xyz789");
});
it("emits state:received event", () => {
const callback = vi.fn();
manager.on("state:received", callback);
manager.receivePeerState("TM-2", {
gitHash: "test",
ledgerHash: "test",
memoryEntries: 0,
consensusVotes: 0,
lastSyncTimestamp: Date.now(),
});
expect(callback).toHaveBeenCalledWith({
nodeId: "TM-2",
state: expect.anything(),
});
});
});
describe("checkDivergence", () => {
it("returns no divergence when states match", () => {
manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" });
manager.receivePeerState("TM-2", {
gitHash: "abc",
ledgerHash: "def",
memoryEntries: 0,
consensusVotes: 0,
lastSyncTimestamp: Date.now(),
});
const result = manager.checkDivergence();
expect(result.diverged).toBe(false);
});
it("detects git divergence", () => {
manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" });
manager.receivePeerState("TM-2", {
gitHash: "xyz",
ledgerHash: "def",
memoryEntries: 0,
consensusVotes: 0,
lastSyncTimestamp: Date.now(),
});
const result = manager.checkDivergence();
expect(result.diverged).toBe(true);
expect(result.details).toContainEqual(expect.stringContaining("Git divergence"));
});
it("detects ledger divergence", () => {
manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" });
manager.receivePeerState("TM-2", {
gitHash: "abc",
ledgerHash: "xyz",
memoryEntries: 0,
consensusVotes: 0,
lastSyncTimestamp: Date.now(),
});
const result = manager.checkDivergence();
expect(result.diverged).toBe(true);
expect(result.details).toContainEqual(expect.stringContaining("Ledger divergence"));
});
it("emits divergence:detected event", () => {
const callback = vi.fn();
manager.on("divergence:detected", callback);
manager.updateLocalState({ gitHash: "abc", ledgerHash: "def" });
manager.receivePeerState("TM-2", {
gitHash: "xyz",
ledgerHash: "def",
memoryEntries: 0,
consensusVotes: 0,
lastSyncTimestamp: Date.now(),
});
manager.checkDivergence();
expect(callback).toHaveBeenCalledWith({
diverged: true,
details: expect.any(Array),
});
});
});
});
describe("PeerConnection", () => {
let peer: PeerConnection;
beforeEach(() => {
peer = new PeerConnection("TM-2", "ws://192.168.31.209:8765");
});
describe("constructor", () => {
it("initializes with node ID and URL", () => {
expect(peer.nodeId).toBe("TM-2");
expect(peer.url).toBe("ws://192.168.31.209:8765");
expect(peer.state).toBe("disconnected");
});
});
describe("message queue", () => {
it("queues messages when disconnected", () => {
const message = {
type: MessageType.HEARTBEAT,
sourceNodeId: "TM-1",
timestamp: Date.now(),
payload: {},
};
peer.send(message);
expect(peer.messageQueue).toHaveLength(1);
});
it("flushes queue on connect", () => {
// Mock WebSocket
peer.ws = {
send: vi.fn(),
close: vi.fn(),
on: vi.fn(),
} as unknown as WebSocket;
peer.state = "connected";
peer.messageQueue.push({
type: MessageType.HEARTBEAT,
sourceNodeId: "TM-1",
timestamp: Date.now(),
payload: {},
});
peer.flushMessageQueue();
expect(peer.ws.send).toHaveBeenCalled();
// ts-eslint: ignore-next-line unbound-method
expect(peer.messageQueue).toHaveLength(0);
});
});
describe("reconnect", () => {
it("schedules reconnect with exponential backoff", () => {
peer.reconnectAttempts = 0;
peer.scheduleReconnect();
expect(peer.reconnectAttempts).toBe(1);
expect(peer.state).toBe("reconnecting");
});
it("stops after max attempts", () => {
peer.reconnectAttempts = 5;
peer.scheduleReconnect();
expect(peer.state).toBe("disconnected");
});
});
});
describe("MessageType enum", () => {
it("includes all message types", () => {
expect(MessageType.HEARTBEAT).toBe("heartbeat");
expect(MessageType.PRESENCE_ANNOUNCE).toBe("presence:announce");
expect(MessageType.STATE_SYNC_REQUEST).toBe("state:sync:request");
expect(MessageType.CONSENSUS_PROPOSAL).toBe("consensus:proposal");
expect(MessageType.MEMORY_REPLICATE).toBe("memory:replicate");
expect(MessageType.ERROR_REPORT).toBe("error:report");
});
});
-754
View File
@@ -1,754 +0,0 @@
/**
* Node Sync Service — WebSocket Peer-to-Peer Architecture for Triad Nodes
*
* Provides direct inter-node communication beyond Discord:
* - WebSocket peer sync for real-time state propagation
* - Presence detection (heartbeat-based liveness)
* - State sync for non-Git data (consensus votes, memory entries)
* - Failure mode detection and recovery
*
* Triad Nodes:
* - TM-1: silica-animus (192.168.31.99) — Authority
* - TM-2: testbench (192.168.31.209)
* - TM-3: tabula-myriad-3 (192.168.31.85)
* - TM-4: tabula-myriad-4 (192.168.31.205)
*
* @module NodeSyncService
*/
import { EventEmitter } from "events";
import WebSocket from "ws";
import { Logger } from "../logger.js";
const logger = new Logger("node-sync-service");
// ============================================================================
// Configuration
// ============================================================================
export interface TriadNodeConfig {
nodeId: string;
hostname: string;
ipAddress: string;
wsPort: number;
role: "authority" | "participant";
}
export const TRIAD_NODES: Record<string, TriadNodeConfig> = {
"TM-1": {
nodeId: "TM-1",
hostname: "silica-animus",
ipAddress: "192.168.31.99",
wsPort: 8765,
role: "authority",
},
"TM-2": {
nodeId: "TM-2",
hostname: "testbench",
ipAddress: "192.168.31.209",
wsPort: 8765,
role: "participant",
},
"TM-3": {
nodeId: "TM-3",
hostname: "tabula-myriad-3",
ipAddress: "192.168.31.85",
wsPort: 8765,
role: "participant",
},
"TM-4": {
nodeId: "TM-4",
hostname: "tabula-myriad-4",
ipAddress: "192.168.31.205",
wsPort: 8765,
role: "participant",
},
};
// ============================================================================
// Message Types
// ============================================================================
export enum MessageType {
// Presence & Heartbeat
HEARTBEAT = "heartbeat",
PRESENCE_ANNOUNCE = "presence:announce",
PRESENCE_QUERY = "presence:query",
// State Sync
STATE_SYNC_REQUEST = "state:sync:request",
STATE_SYNC_RESPONSE = "state:sync:response",
STATE_DELTA = "state:delta",
// Consensus
CONSENSUS_PROPOSAL = "consensus:proposal",
CONSENSUS_VOTE = "consensus:vote",
CONSENSUS_RESULT = "consensus:result",
// Memory & Knowledge
MEMORY_REPLICATE = "memory:replicate",
KNOWLEDGE_INDEX = "knowledge:index",
// Coordination
TASK_ASSIGN = "task:assign",
TASK_COMPLETE = "task:complete",
// Error & Recovery
ERROR_REPORT = "error:report",
RECOVERY_REQUEST = "recovery:request",
}
export interface SyncMessage {
type: MessageType;
sourceNodeId: string;
targetNodeId?: string;
timestamp: number;
payload: unknown;
sequenceNumber?: number;
}
// ============================================================================
// Presence Detection
// ============================================================================
export interface NodePresence {
nodeId: string;
isAlive: boolean;
lastHeartbeat: number;
connectionState: "connected" | "disconnected" | "degraded";
latencyMs?: number;
}
export class PresenceDetector extends EventEmitter {
private nodes: Map<string, NodePresence> = new Map();
private heartbeatIntervalMs: number;
private timeoutThresholdMs: number;
private localNodeId: string;
constructor(localNodeId: string, heartbeatIntervalMs = 5000, timeoutThresholdMs = 15000) {
super();
this.localNodeId = localNodeId;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.timeoutThresholdMs = timeoutThresholdMs;
// Initialize all triad nodes as unknown
for (const nodeId in TRIAD_NODES) {
if (nodeId !== localNodeId) {
this.nodes.set(nodeId, {
nodeId,
isAlive: false,
lastHeartbeat: 0,
connectionState: "disconnected",
});
}
}
}
updateHeartbeat(nodeId: string, latencyMs?: number) {
const presence = this.nodes.get(nodeId);
if (!presence) {
return;
}
const wasAlive = presence.isAlive;
presence.isAlive = true;
presence.lastHeartbeat = Date.now();
presence.latencyMs = latencyMs;
presence.connectionState = "connected";
if (!wasAlive) {
logger.info(`Node ${nodeId} is now alive`);
this.emit("node:online", { nodeId, presence });
}
}
checkTimeouts() {
const now = Date.now();
for (const [nodeId, presence] of this.nodes) {
if (!presence.isAlive) {
continue;
}
const elapsed = now - presence.lastHeartbeat;
if (elapsed > this.timeoutThresholdMs) {
logger.warn(`Node ${nodeId} heartbeat timeout (${elapsed}ms)`);
presence.isAlive = false;
presence.connectionState = "disconnected";
this.emit("node:offline", { nodeId, presence, elapsed });
} else if (elapsed > this.timeoutThresholdMs * 0.7) {
presence.connectionState = "degraded";
this.emit("node:degraded", { nodeId, presence, elapsed });
}
}
}
getPresence(nodeId: string): NodePresence | undefined {
return this.nodes.get(nodeId);
}
getAllPresence(): Map<string, NodePresence> {
return this.nodes;
}
getQuorumStatus(): { available: number; total: number; hasQuorum: boolean } {
let available = 0;
for (const presence of this.nodes.values()) {
if (presence.isAlive) {
available++;
}
}
// Add self
available++;
const total = this.nodes.size + 1;
const hasQuorum = available >= 2; // 2-of-3 minimum
return { available, total, hasQuorum };
}
startMonitoring() {
const interval = setInterval(() => this.checkTimeouts(), this.heartbeatIntervalMs);
this.on("cleanup", () => clearInterval(interval));
return interval;
}
}
// ============================================================================
// WebSocket Peer Manager
// ============================================================================
export class PeerConnection {
nodeId: string;
ws: WebSocket | null = null;
url: string;
state: "connecting" | "connected" | "disconnected" | "reconnecting" = "disconnected";
reconnectAttempts = 0;
maxReconnectAttempts = 5;
messageQueue: SyncMessage[] = [];
constructor(nodeId: string, url: string) {
this.nodeId = nodeId;
this.url = url;
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.state = "connecting";
try {
this.ws = new WebSocket(this.url);
this.ws.on("open", () => {
logger.info(`Connected to ${this.nodeId} at ${this.url}`);
this.state = "connected";
this.reconnectAttempts = 0;
this.flushMessageQueue();
resolve();
});
this.ws.on("close", () => {
logger.warn(`Connection to ${this.nodeId} closed`);
this.state = "disconnected";
this.scheduleReconnect();
});
this.ws.on("error", (err) => {
logger.error(`Error connecting to ${this.nodeId}:`, err.message);
this.state = "disconnected";
reject(err);
});
this.ws.on("message", (data: string) => {
this.handleMessage(data);
});
} catch (err) {
reject(err);
}
});
}
private scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
logger.error(`Max reconnect attempts reached for ${this.nodeId}`);
this.state = "disconnected";
return;
}
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
logger.info(`Reconnecting to ${this.nodeId} in ${delay}ms (attempt ${this.reconnectAttempts})`);
this.state = "reconnecting";
setTimeout(() => this.connect(), delay);
}
send(message: SyncMessage): boolean {
if (this.state !== "connected" || !this.ws) {
this.messageQueue.push(message);
return false;
}
try {
this.ws.send(JSON.stringify(message));
return true;
} catch (err) {
logger.error(`Failed to send to ${this.nodeId}:`, err);
this.messageQueue.push(message);
return false;
}
}
private flushMessageQueue() {
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift();
if (msg && this.ws) {
this.ws.send(JSON.stringify(msg));
}
}
}
private handleMessage(data: string) {
try {
const message: SyncMessage = JSON.parse(data);
this.emit("message", message);
} catch (err) {
logger.error(`Failed to parse message from ${this.nodeId}:`, err);
}
}
close() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.state = "disconnected";
}
}
// ============================================================================
// State Sync Manager
// ============================================================================
export interface SyncState {
gitHash: string;
ledgerHash: string;
memoryEntries: number;
consensusVotes: number;
lastSyncTimestamp: number;
}
export class StateSyncManager extends EventEmitter {
private localState: SyncState;
private peerStates: Map<string, SyncState> = new Map();
private localNodeId: string;
constructor(localNodeId: string) {
super();
this.localNodeId = localNodeId;
this.localState = {
gitHash: "",
ledgerHash: "",
memoryEntries: 0,
consensusVotes: 0,
lastSyncTimestamp: Date.now(),
};
}
updateLocalState(state: Partial<SyncState>) {
this.localState = { ...this.localState, ...state, lastSyncTimestamp: Date.now() };
this.emit("state:updated", { nodeId: this.localNodeId, state: this.localState });
}
receivePeerState(nodeId: string, state: SyncState) {
this.peerStates.set(nodeId, state);
this.emit("state:received", { nodeId, state });
this.checkDivergence();
}
checkDivergence(): { diverged: boolean; details: string[] } {
const diverged: string[] = [];
for (const [nodeId, state] of this.peerStates) {
if (state.gitHash !== this.localState.gitHash) {
diverged.push(
`Git divergence with ${nodeId}: local=${this.localState.gitHash}, remote=${state.gitHash}`,
);
}
if (state.ledgerHash !== this.localState.ledgerHash) {
diverged.push(
`Ledger divergence with ${nodeId}: local=${this.localState.ledgerHash}, remote=${state.ledgerHash}`,
);
}
}
if (diverged.length > 0) {
logger.warn("State divergence detected:", diverged);
this.emit("divergence:detected", { diverged, details: diverged });
return { diverged: true, details: diverged };
}
return { diverged: false, details: [] };
}
getState(nodeId?: string): SyncState | Map<string, SyncState> {
if (nodeId) {
return this.peerStates.get(nodeId) || this.localState;
}
return this.peerStates;
}
async requestSyncFromPeer(nodeId: string): Promise<SyncState | null> {
return new Promise((resolve) => {
const handler = ({ nodeId: source, state }: { nodeId: string; state: SyncState }) => {
if (source === nodeId) {
this.off("state:received", handler);
resolve(state);
}
};
this.once("state:received", handler);
this.emit("state:sync:request", { targetNodeId: nodeId });
// Timeout after 5 seconds
setTimeout(() => {
this.off("state:received", handler);
resolve(null);
}, 5000);
});
}
}
// ============================================================================
// Node Sync Service (Main Coordinator)
// ============================================================================
export class NodeSyncService extends EventEmitter {
private localNodeId: string;
private presenceDetector: PresenceDetector;
private stateSyncManager: StateSyncManager;
private peerConnections: Map<string, PeerConnection> = new Map();
private wsServer?: WebSocket.Server;
private messageSequence = 0;
constructor(localNodeId: string) {
super();
this.localNodeId = localNodeId;
this.presenceDetector = new PresenceDetector(localNodeId);
this.stateSyncManager = new StateSyncManager(localNodeId);
this.setupEventHandlers();
}
private setupEventHandlers() {
// Presence events
this.presenceDetector.on("node:online", (data) => {
this.emit("node:online", data);
});
this.presenceDetector.on("node:offline", (data) => {
this.emit("node:offline", data);
this.handleNodeOffline(data.nodeId);
});
this.presenceDetector.on("node:degraded", (data) => {
this.emit("node:degraded", data);
});
// State sync events
this.stateSyncManager.on("divergence:detected", (data) => {
this.emit("divergence:detected", data);
this.handleDivergence(data);
});
this.stateSyncManager.on("state:updated", (data) => {
this.broadcastState(data.state);
});
}
async initialize(): Promise<void> {
logger.info(`Initializing Node Sync Service for ${this.localNodeId}`);
// Start WebSocket server for incoming connections
await this.startServer();
// Connect to peer nodes
await this.connectToPeers();
// Start presence monitoring
this.presenceDetector.startMonitoring();
logger.info("Node Sync Service initialized");
}
private async startServer(): Promise<void> {
const config = TRIAD_NODES[this.localNodeId];
const port = config?.wsPort || 8765;
return new Promise((resolve, reject) => {
try {
this.wsServer = new WebSocket.Server({ port });
this.wsServer.on("connection", (ws) => {
this.handleIncomingConnection(ws);
});
this.wsServer.on("listening", () => {
logger.info(`WebSocket server listening on port ${port}`);
resolve();
});
this.wsServer.on("error", (err) => {
logger.error("WebSocket server error:", err);
reject(err);
});
} catch (err) {
reject(err);
}
});
}
private handleIncomingConnection(ws: WebSocket) {
logger.info("Incoming WebSocket connection");
ws.on("message", (data: string) => {
try {
const message: SyncMessage = JSON.parse(data);
this.handleMessage(message);
} catch (err) {
logger.error("Failed to parse incoming message:", err);
}
});
ws.on("close", () => {
logger.info("WebSocket connection closed");
});
}
private async connectToPeers(): Promise<void> {
const config = TRIAD_NODES[this.localNodeId];
if (!config) {
logger.error("Local node config not found");
return;
}
for (const [nodeId, nodeConfig] of Object.entries(TRIAD_NODES)) {
if (nodeId === this.localNodeId) {
continue;
}
const url = `ws://${nodeConfig.ipAddress}:${nodeConfig.wsPort}`;
const peer = new PeerConnection(nodeId, url);
peer.on("message", (msg: SyncMessage) => {
this.handleMessage(msg);
});
this.peerConnections.set(nodeId, peer);
try {
await peer.connect();
this.presenceDetector.updateHeartbeat(nodeId, peer.ws?.PING?.() ? 0 : undefined);
} catch (err) {
logger.error(`Failed to connect to ${nodeId}:`, err);
}
}
}
private handleMessage(message: SyncMessage) {
logger.debug(`Received ${message.type} from ${message.sourceNodeId}`);
switch (message.type) {
case MessageType.HEARTBEAT:
this.presenceDetector.updateHeartbeat(message.sourceNodeId);
break;
case MessageType.STATE_SYNC_RESPONSE:
this.stateSyncManager.receivePeerState(message.sourceNodeId, message.payload as SyncState);
break;
case MessageType.CONSENSUS_PROPOSAL:
this.emit("consensus:proposal", message);
break;
case MessageType.CONSENSUS_VOTE:
this.emit("consensus:vote", message);
break;
case MessageType.MEMORY_REPLICATE:
this.emit("memory:replicate", message);
break;
default:
logger.debug(`Unhandled message type: ${message.type}`);
}
}
private handleNodeOffline(nodeId: string) {
logger.warn(`Node ${nodeId} is offline, initiating recovery`);
const peer = this.peerConnections.get(nodeId);
if (peer) {
peer.close();
}
this.emit("recovery:needed", { nodeId, reason: "offline" });
}
private handleDivergence(data: { diverged: boolean; details: string[] }) {
if (!data.diverged) {
return;
}
logger.warn("Divergence detected, requesting sync from authority node");
// Request state sync from TM-1 (authority)
if (this.localNodeId !== "TM-1") {
this.requestStateSync("TM-1");
}
}
sendHeartbeat() {
const message: SyncMessage = {
type: MessageType.HEARTBEAT,
sourceNodeId: this.localNodeId,
timestamp: Date.now(),
payload: { uptime: process.uptime() },
sequenceNumber: this.messageSequence++,
};
this.broadcast(message);
}
broadcastState(state: SyncState) {
const message: SyncMessage = {
type: MessageType.STATE_SYNC_RESPONSE,
sourceNodeId: this.localNodeId,
timestamp: Date.now(),
payload: state,
sequenceNumber: this.messageSequence++,
};
this.broadcast(message);
}
broadcast(message: SyncMessage, targetNodeId?: string) {
if (targetNodeId) {
const peer = this.peerConnections.get(targetNodeId);
if (peer) {
peer.send(message);
}
} else {
for (const peer of this.peerConnections.values()) {
peer.send(message);
}
}
}
requestStateSync(targetNodeId: string) {
const message: SyncMessage = {
type: MessageType.STATE_SYNC_REQUEST,
sourceNodeId: this.localNodeId,
targetNodeId,
timestamp: Date.now(),
payload: { requestedFields: ["gitHash", "ledgerHash", "memoryEntries"] },
sequenceNumber: this.messageSequence++,
};
this.broadcast(message, targetNodeId);
}
proposeConsensus(proposal: string) {
const message: SyncMessage = {
type: MessageType.CONSENSUS_PROPOSAL,
sourceNodeId: this.localNodeId,
timestamp: Date.now(),
payload: { proposal, gitHash: this.stateSyncManager.localState.gitHash },
sequenceNumber: this.messageSequence++,
};
this.broadcast(message);
this.emit("consensus:proposed", { proposal });
}
submitConsensusVote(proposalId: string, vote: "yes" | "no" | "abstain") {
const message: SyncMessage = {
type: MessageType.CONSENSUS_VOTE,
sourceNodeId: this.localNodeId,
timestamp: Date.now(),
payload: { proposalId, vote },
sequenceNumber: this.messageSequence++,
};
this.broadcast(message);
}
replicateMemory(entry: { key: string; value: unknown; tier: "fact" | "episodic" | "pad" }) {
const message: SyncMessage = {
type: MessageType.MEMORY_REPLICATE,
sourceNodeId: this.localNodeId,
timestamp: Date.now(),
payload: entry,
sequenceNumber: this.messageSequence++,
};
this.broadcast(message);
}
getPresenceStatus() {
return this.presenceDetector.getQuorumStatus();
}
getDivergenceStatus() {
return this.stateSyncManager.checkDivergence();
}
async shutdown(): Promise<void> {
logger.info("Shutting down Node Sync Service");
// Close all peer connections
for (const peer of this.peerConnections.values()) {
peer.close();
}
// Close server
if (this.wsServer) {
await new Promise<void>((resolve) => {
this.wsServer?.close(() => resolve());
});
}
this.emit("shutdown");
}
}
// ============================================================================
// Factory Function
// ============================================================================
export function createNodeSyncService(localNodeId: string): NodeSyncService {
return new NodeSyncService(localNodeId);
}
// ============================================================================
// Integration with Triad Heartbeat
// ============================================================================
/**
* Integration hook for triad-heartbeat skill
* Called on each heartbeat wake to check sync state
*/
export async function triadHeartbeatIntegration(
service: NodeSyncService,
callback: (status: {
quorum: { available: number; total: number; hasQuorum: boolean };
divergence: { diverged: boolean; details: string[] };
presence: Map<string, NodePresence>;
}) => void,
) {
const quorum = service.getPresenceStatus();
const divergence = service.getDivergenceStatus();
const presence = service.presenceDetector.getAllPresence();
callback({ quorum, divergence, presence });
}