Files
openclaw/scripts/triad-sync-server.mjs
T
2026-03-24 15:33:35 -04:00

328 lines
8.9 KiB
JavaScript

#!/usr/bin/env node
/**
* Triad Sync Server — Direct HTTP inter-node communication
*
* Port: 8765 (configurable via TRIAD_SYNC_PORT)
* Endpoints: /state, /health, /sync, /broadcast
*
* Usage: node triad-sync-server.mjs
* Env: TRIAD_NODE_ID, TRIAD_SYNC_PORT
*/
import { execSync } from "child_process";
import fs from "fs";
import http from "http";
import path from "path";
import { fileURLToPath } from "url";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
// Configuration
const PORT = parseInt(process.env.TRIAD_SYNC_PORT, 10) || 8765;
const WORKSPACE = process.env.TRIAD_WORKSPACE || "/home/openclaw/.openclaw/workspace";
const NODE_ID = process.env.TRIAD_NODE_ID || "TM-1";
// Triad node registry
const TRIAD_NODES = {
"TM-1": { host: "192.168.31.99", port: 8765, role: "authority" },
"TM-2": { host: "192.168.31.209", port: 8765, role: "consensus" },
"TM-3": { host: "192.168.31.85", port: 8765, role: "consensus" },
"TM-4": { host: "192.168.31.205", port: 8765, role: "code" },
};
// State cache
let stateCache = {
lastSyncHash: null,
lastSyncTime: null,
ledgerHash: null,
};
// Utility functions
function getGitHash() {
try {
return execSync("git rev-parse --short HEAD", {
cwd: WORKSPACE,
stdio: ["pipe", "pipe", "pipe"],
})
.toString()
.trim();
} catch {
return "unknown";
}
}
function getLedgerHash() {
try {
const db = path.join(WORKSPACE, ".aura/consensus.db");
if (!fs.existsSync(db)) {
return "ledger:not_found";
}
const count = execSync(`sqlite3 "${db}" "SELECT COUNT(*) FROM consensus_votes"`, {
stdio: ["pipe", "pipe", "pipe"],
})
.toString()
.trim();
return `votes:${count}`;
} catch {
return "ledger:unknown";
}
}
function updateStateCache() {
stateCache.lastSyncHash = getGitHash();
stateCache.lastSyncTime = new Date().toISOString();
stateCache.ledgerHash = getLedgerHash();
}
function log(message, level = "info") {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [${level}] [${NODE_ID}] ${message}`);
}
// Broadcast to all other nodes
async function broadcastToAll(payload, excludeNode = null) {
const results = [];
for (const [nodeId, config] of Object.entries(TRIAD_NODES)) {
if (nodeId === excludeNode) {
continue;
}
try {
const res = await fetch(`http://${config.host}:${config.port}/broadcast`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ...payload, relayed_from: NODE_ID }),
signal: AbortSignal.timeout(5000),
});
const data = await res.json();
results.push({ node: nodeId, success: res.ok, data });
log(`Broadcast to ${nodeId}: ${res.ok ? "success" : "failed"}`);
} catch (err) {
log(`Broadcast to ${nodeId} failed: ${err.message}`, "error");
results.push({ node: nodeId, success: false, error: err.message });
}
}
return results;
}
// Log broadcast to consensus ledger
function logBroadcastToLedger(payload) {
try {
const db = path.join(WORKSPACE, ".aura/consensus.db");
if (!fs.existsSync(db)) {
log("Ledger not found, skipping log", "warn");
return;
}
const sql = `
INSERT INTO consensus_votes (timestamp, proposal, result, signers, git_hash, processed)
VALUES (?, ?, ?, ?, ?, 1)
`;
const values = [
new Date().toISOString(),
payload.message || JSON.stringify(payload),
payload.type || "broadcast",
JSON.stringify([NODE_ID]),
getGitHash(),
];
execSync(
`sqlite3 "${db}" "${sql.replace(/\?/g, () => {
const val = values.shift();
return typeof val === "string" ? val.replace(/'/g, "''") : val;
})}"`,
{
stdio: ["pipe", "pipe", "pipe"],
},
);
log("Broadcast logged to ledger");
} catch (err) {
log(`Failed to log to ledger: ${err.message}`, "error");
}
}
// HTTP request handler
function handleRequest(req, res) {
const url = new URL(req.url, `http://localhost:${PORT}`);
const clientIP = req.socket.remoteAddress;
log(`${req.method} ${url.pathname} from ${clientIP}`);
res.setHeader("Content-Type", "application/json");
res.setHeader("Access-Control-Allow-Origin", "*");
res.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
res.setHeader("Access-Control-Allow-Headers", "Content-Type");
// CORS preflight
if (req.method === "OPTIONS") {
res.statusCode = 204;
res.end();
return;
}
// Route handling
if (url.pathname === "/state" && req.method === "GET") {
updateStateCache();
const response = {
node_id: NODE_ID,
git_hash: stateCache.lastSyncHash,
ledger_hash: stateCache.ledgerHash,
last_sync: stateCache.lastSyncTime,
timestamp: new Date().toISOString(),
role: TRIAD_NODES[NODE_ID]?.role || "unknown",
};
res.end(JSON.stringify(response, null, 2));
} else if (url.pathname === "/health" && req.method === "GET") {
const response = {
status: "ok",
node_id: NODE_ID,
timestamp: new Date().toISOString(),
uptime: process.uptime(),
};
res.end(JSON.stringify(response, null, 2));
} else if (url.pathname === "/sync" && req.method === "POST") {
let body = "";
req.on("data", (chunk) => (body += chunk));
req.on("end", async () => {
try {
const { from_node } = JSON.parse(body);
log(`Sync requested from ${from_node}`);
execSync("git fetch origin && git reset --hard origin/main", {
cwd: WORKSPACE,
stdio: ["pipe", "pipe", "pipe"],
});
const newHash = getGitHash();
updateStateCache();
log(`Sync complete: ${newHash}`);
res.end(
JSON.stringify({
success: true,
synced_from: from_node,
new_hash: newHash,
timestamp: new Date().toISOString(),
}),
);
} catch (err) {
log(`Sync failed: ${err.message}`, "error");
res.statusCode = 500;
res.end(
JSON.stringify({
success: false,
error: err.message,
timestamp: new Date().toISOString(),
}),
);
}
});
} else if (url.pathname === "/broadcast" && req.method === "POST") {
let body = "";
req.on("data", (chunk) => (body += chunk));
req.on("end", async () => {
try {
const payload = JSON.parse(body);
log(
`Broadcast received: ${payload.type || "unknown"}${payload.message?.substring(0, 50) || "no message"}`,
);
// Log to ledger
logBroadcastToLedger(payload);
// Forward to all other nodes (if not already relayed)
if (!payload.relayed_from) {
const results = await broadcastToAll(payload, NODE_ID);
res.end(
JSON.stringify({
success: true,
broadcasted: true,
timestamp: new Date().toISOString(),
relay_results: results,
}),
);
} else {
// Already relayed, just log
res.end(
JSON.stringify({
success: true,
broadcasted: true,
relayed: true,
timestamp: new Date().toISOString(),
}),
);
}
} catch (err) {
log(`Broadcast handling failed: ${err.message}`, "error");
res.statusCode = 500;
res.end(
JSON.stringify({
success: false,
error: err.message,
timestamp: new Date().toISOString(),
}),
);
}
});
} else if (url.pathname === "/nodes" && req.method === "GET") {
// Return node registry
res.end(
JSON.stringify({
nodes: TRIAD_NODES,
local_node: NODE_ID,
timestamp: new Date().toISOString(),
}),
);
} else {
res.statusCode = 404;
res.end(
JSON.stringify({
error: "Not found",
path: url.pathname,
timestamp: new Date().toISOString(),
}),
);
}
}
// Create and start server
const server = http.createServer(handleRequest);
server.listen(PORT, "0.0.0.0", () => {
log(`Triad sync server listening on port ${PORT}`);
updateStateCache();
log(`Initial state: git=${stateCache.lastSyncHash}, ledger=${stateCache.ledgerHash}`);
});
// Graceful shutdown
process.on("SIGTERM", () => {
log("SIGTERM received, shutting down gracefully");
server.close(() => {
log("Server closed");
process.exit(0);
});
});
process.on("SIGINT", () => {
log("SIGINT received, shutting down gracefully");
server.close(() => {
log("Server closed");
process.exit(0);
});
});
// Handle uncaught errors
process.on("uncaughtException", (err) => {
log(`Uncaught exception: ${err.message}`, "error");
process.exit(1);
});
process.on("unhandledRejection", (reason, _promise) => {
log(`Unhandled rejection: ${String(reason)}`, "error");
});