Files
heretek-openclaw-core/patches/a2a-protocol-infrastructure.patch
John Doe 762f51b890 docs: Create Heretek fork documentation and patch management system
- Add HERETEK_FORK.md with fork strategy and upstream sync workflow
- Add CHANGELOG_HERETEK.md tracking all Heretek-specific changes
- Create patches/ directory with README documentation
- Generate Phase 1 patch files:
  - a2a-protocol-infrastructure.patch
  - agent-lifecycle-steward-primary.patch
  - approval-system-liberation.patch
- Add patch management scripts:
  - patch-apply.sh - Apply all patches from .patchestoo
  - patch-create.sh - Create new patches from diffs
  - patch-status.sh - Show patch application status
  - upstream-sync.sh - Sync with upstream repository
- Add .patchestoo file listing patches in order
- Update package.json with patch-related npm scripts
- Add postinstall hook for automatic patch application

Phase 1 fixes include:
- A2A Protocol infrastructure (Redis messaging, Gateway, WebSocket bridge)
- Agent lifecycle improvements (auto-registration, heartbeat, /agent-status)
- Approval system liberation (auto-apply patches, approval bypass)
2026-04-01 12:53:16 -04:00

595 lines
18 KiB
Diff

---
# A2A Protocol Infrastructure Patch
# Heretek OpenClaw Core - Phase 1 Bug Fixes
# Date: 2026-04-01
#
# This patch adds the missing A2A (Agent-to-Agent) communication infrastructure:
# - Redis-based messaging module
# - WebSocket-to-Redis bridge
# - Gateway server for agent communication
# - Modular Docker Compose configurations
---
diff --git a/skills/a2a-message-send/a2a-redis.js b/skills/a2a-message-send/a2a-redis.js
new file mode 100644
index 0000000..heretek1
--- /dev/null
+++ b/skills/a2a-message-send/a2a-redis.js
@@ -0,0 +1,350 @@
+/**
+ * Heretek OpenClaw — A2A Redis Messaging Module
+ * ==============================================================================
+ * Redis-based Agent-to-Agent messaging with persistence and pub/sub.
+ *
+ * Features:
+ * - Message persistence using Redis lists
+ * - Agent registration and discovery
+ * - Broadcast via Redis pub/sub
+ * - Inbox management (get, count, clear, mark as read)
+ * - Ping/pong health checks
+ *
+ * Usage:
+ * const { sendMessage, getMessages, registerAgent } = require('./a2a-redis');
+ *
+ * await registerAgent('steward', { role: 'orchestrator' });
+ * await sendMessage('steward', 'alpha', 'Hello Alpha!');
+ * const messages = await getMessages('alpha', 10);
+ */
+
+const Redis = require('ioredis');
+
+// Redis connection
+let redisClient = null;
+
+/**
+ * Get Redis client (singleton)
+ */
+function getRedisClient() {
+ if (!redisClient) {
+ const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379';
+ redisClient = new Redis(redisUrl, {
+ maxRetriesPerRequest: 3,
+ retryDelayOnFailover: 100
+ });
+ }
+ return redisClient;
+}
+
+/**
+ * Send message to another agent
+ * @param {string} from - Sender agent ID
+ * @param {string} to - Target agent ID
+ * @param {Object|string} content - Message content
+ * @param {Object} options - Additional options
+ * @returns {Promise<Object>} Send result
+ */
+async function sendMessage(from, to, content, options = {}) {
+ const redis = getRedisClient();
+ const messageId = `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
+
+ const message = {
+ id: messageId,
+ from,
+ to,
+ content: typeof content === 'string' ? content : JSON.stringify(content),
+ timestamp: new Date().toISOString(),
+ type: options.type || 'task',
+ priority: options.priority || 'normal',
+ read: false
+ };
+
+ // Push to recipient's inbox
+ const inboxKey = `openclaw:a2a:inbox:${to}`;
+ await redis.lpush(inboxKey, JSON.stringify(message));
+
+ // Publish to broadcast channel for real-time delivery
+ await redis.publish('openclaw:a2a:broadcast', JSON.stringify({
+ type: 'message',
+ ...message
+ }));
+
+ return { success: true, messageId, timestamp: message.timestamp };
+}
+
+/**
+ * Get messages for an agent
+ * @param {string} agentId - Agent ID
+ * @param {number} limit - Max messages to return
+ * @returns {Promise<Array>} Messages
+ */
+async function getMessages(agentId, limit = 10) {
+ const redis = getRedisClient();
+ const inboxKey = `openclaw:a2a:inbox:${agentId}`;
+
+ const messages = await redis.lrange(inboxKey, 0, limit - 1);
+ return messages.map(m => JSON.parse(m));
+}
+
+/**
+ * Get unread messages for an agent
+ * @param {string} agentId - Agent ID
+ * @param {number} limit - Max messages to return
+ * @returns {Promise<Array>} Unread messages
+ */
+async function getUnreadMessages(agentId, limit = 10) {
+ const messages = await getMessages(agentId, limit);
+ const redis = getRedisClient();
+ const readKey = `openclaw:a2a:read:${agentId}`;
+
+ const unread = [];
+ for (const msg of messages) {
+ const isRead = await redis.sismember(readKey, msg.id);
+ if (!isRead) {
+ unread.push(msg);
+ }
+ }
+ return unread;
+}
+
+/**
+ * Mark message as read
+ * @param {string} agentId - Agent ID
+ * @param {string} messageId - Message ID
+ * @returns {Promise<boolean>} Success
+ */
+async function markAsRead(agentId, messageId) {
+ const redis = getRedisClient();
+ const readKey = `openclaw:a2a:read:${agentId}`;
+ await redis.sadd(readKey, messageId);
+ return true;
+}
+
+/**
+ * Count messages for an agent
+ * @param {string} agentId - Agent ID
+ * @returns {Promise<number>} Message count
+ */
+async function countMessages(agentId) {
+ const redis = getRedisClient();
+ const inboxKey = `openclaw:a2a:inbox:${agentId}`;
+ return redis.llen(inboxKey);
+}
+
+/**
+ * Clear all messages for an agent
+ * @param {string} agentId - Agent ID
+ * @returns {Promise<boolean>} Success
+ */
+async function clearMessages(agentId) {
+ const redis = getRedisClient();
+ const inboxKey = `openclaw:a2a:inbox:${agentId}`;
+ const readKey = `openclaw:a2a:read:${agentId}`;
+ await redis.del(inboxKey, readKey);
+ return true;
+}
+
+/**
+ * Broadcast message to all agents
+ * @param {string} from - Sender agent ID
+ * @param {Object|string} content - Message content
+ * @returns {Promise<Object>} Broadcast result
+ */
+async function broadcast(from, content) {
+ const redis = getRedisClient();
+ const message = {
+ id: `broadcast_${Date.now()}`,
+ from,
+ content: typeof content === 'string' ? content : JSON.stringify(content),
+ timestamp: new Date().toISOString(),
+ type: 'broadcast'
+ };
+
+ await redis.publish('openclaw:a2a:broadcast', JSON.stringify(message));
+ return { success: true, messageId: message.id };
+}
+
+/**
+ * Ping another agent (health check)
+ * @param {string} from - Sender agent ID
+ * @param {string} to - Target agent ID
+ * @returns {Promise<Object>} Ping result with latency
+ */
+async function pingAgent(from, to) {
+ const startTime = Date.now();
+ await sendMessage(from, to, { type: 'ping' }, { priority: 'high' });
+ const latency = Date.now() - startTime;
+ return { success: true, latency, timestamp: new Date().toISOString() };
+}
+
+/**
+ * Register agent
+ * @param {string} agentId - Agent ID
+ * @param {Object} metadata - Agent metadata
+ * @returns {Promise<Object>} Registration result
+ */
+async function registerAgent(agentId, metadata = {}) {
+ const redis = getRedisClient();
+ const agentKey = `openclaw:a2a:agent:${agentId}`;
+
+ await redis.sadd('openclaw:a2a:agents', agentId);
+ await redis.hmset(agentKey, {
+ id: agentId,
+ registeredAt: new Date().toISOString(),
+ status: 'active',
+ ...metadata
+ });
+
+ return { success: true, agentId, timestamp: new Date().toISOString() };
+}
+
+/**
+ * Unregister agent
+ * @param {string} agentId - Agent ID
+ * @returns {Promise<boolean>} Success
+ */
+async function unregisterAgent(agentId) {
+ const redis = getRedisClient();
+ await redis.srem('openclaw:a2a:agents', agentId);
+ await redis.del(`openclaw:a2a:agent:${agentId}`);
+ return true;
+}
+
+/**
+ * Get all registered agents
+ * @returns {Promise<Array>} Agent IDs
+ */
+async function getRegisteredAgents() {
+ const redis = getRedisClient();
+ return redis.smembers('openclaw:a2a:agents');
+}
+
+module.exports = {
+ sendMessage,
+ getMessages,
+ getUnreadMessages,
+ markAsRead,
+ countMessages,
+ clearMessages,
+ broadcast,
+ pingAgent,
+ registerAgent,
+ unregisterAgent,
+ getRegisteredAgents,
+ getRedisClient
+};
diff --git a/modules/communication/redis-websocket-bridge.js b/modules/communication/redis-websocket-bridge.js
new file mode 100644
index 0000000..heretek2
--- /dev/null
+++ b/modules/communication/redis-websocket-bridge.js
@@ -0,0 +1,200 @@
+/**
+ * Heretek OpenClaw — Redis to WebSocket Bridge
+ * Bridges Redis pub/sub messages to WebSocket clients for real-time A2A updates.
+ */
+
+const WebSocket = require('ws');
+const Redis = require('ioredis');
+
+class RedisToWebSocketBridge {
+ constructor(config = {}) {
+ this.config = {
+ wsPort: config.wsPort || 3002,
+ redisUrl: config.redisUrl || process.env.REDIS_URL || 'redis://localhost:6379',
+ a2aChannel: config.a2aChannel || 'openclaw:a2a:broadcast',
+ ...config
+ };
+ this.isRunning = false;
+ this.clients = new Set();
+ this.wsServer = null;
+ this.pubSubClient = null;
+ }
+
+ async start() {
+ if (this.isRunning) return;
+
+ // Create Redis pub/sub client
+ this.pubSubClient = new Redis(this.config.redisUrl);
+ await this.pubSubClient.subscribe(this.config.a2aChannel);
+
+ // Handle Redis messages
+ this.pubSubClient.on('message', (channel, message) => {
+ this.broadcast(JSON.parse(message));
+ });
+
+ // Create WebSocket server
+ this.wsServer = new WebSocket.Server({ port: this.config.wsPort, path: '/a2a' });
+ this.wsServer.on('connection', (ws) => {
+ this.clients.add(ws);
+ ws.on('close', () => this.clients.delete(ws));
+ });
+
+ this.isRunning = true;
+ console.log(`[Redis-WS Bridge] Started on ws://0.0.0.0:${this.config.wsPort}`);
+ }
+
+ async stop() {
+ if (!this.isRunning) return;
+
+ this.clients.forEach(client => client.close());
+ this.clients.clear();
+
+ if (this.wsServer) await new Promise(resolve => this.wsServer.close(resolve));
+ if (this.pubSubClient) await this.pubSubClient.quit();
+
+ this.isRunning = false;
+ }
+
+ broadcast(message) {
+ const payload = JSON.stringify({ ...message, timestamp: new Date().toISOString() });
+ this.clients.forEach(client => {
+ if (client.readyState === WebSocket.OPEN) {
+ client.send(payload);
+ }
+ });
+ }
+
+ getStatus() {
+ return {
+ running: this.isRunning,
+ clients: this.clients.size,
+ port: this.config.wsPort
+ };
+ }
+}
+
+module.exports = { RedisToWebSocketBridge };
diff --git a/gateway/openclaw-gateway.js b/gateway/openclaw-gateway.js
new file mode 100644
index 0000000..heretek3
--- /dev/null
+++ b/gateway/openclaw-gateway.js
@@ -0,0 +1,250 @@
+/**
+ * Heretek OpenClaw — Gateway Server
+ * Central WebSocket RPC gateway for agent-to-agent (A2A) communication.
+ */
+
+const WebSocket = require('ws');
+const http = require('http');
+const Redis = require('ioredis');
+const EventEmitter = require('events');
+const crypto = require('crypto');
+
+const CONFIG = {
+ port: parseInt(process.env.GATEWAY_PORT || '18789', 10),
+ host: process.env.GATEWAY_HOST || '0.0.0.0',
+ redisUrl: process.env.REDIS_URL || 'redis://localhost:6379',
+ heartbeatInterval: 30000
+};
+
+const A2A_PREFIX = 'openclaw:a2a';
+
+class OpenClawGateway extends EventEmitter {
+ constructor(config = {}) {
+ super();
+ this.config = { ...CONFIG, ...config };
+ this.isRunning = false;
+ this.httpServer = null;
+ this.wsServer = null;
+ this.redisClient = null;
+ this.agents = new Map();
+ this.pendingResponses = new Map();
+ this.messageCounter = 0;
+ }
+
+ async start() {
+ if (this.isRunning) return;
+
+ this.httpServer = http.createServer(this._handleHttpRequest.bind(this));
+ this.wsServer = new WebSocket.Server({ server: this.httpServer, path: '/a2a' });
+
+ this.redisClient = new Redis(this.config.redisUrl);
+ this._setupWebSocketHandlers();
+
+ await new Promise(resolve => this.httpServer.listen(this.config.port, this.config.host, resolve));
+
+ this.isRunning = true;
+ console.log(`[Gateway] OpenClaw Gateway running on ws://${this.config.host}:${this.config.port}/a2a`);
+ }
+
+ async stop() {
+ if (!this.isRunning) return;
+
+ this.agents.forEach(agent => agent.ws?.close());
+ this.agents.clear();
+
+ if (this.wsServer) await new Promise(resolve => this.wsServer.close(resolve));
+ if (this.httpServer) await new Promise(resolve => this.httpServer.close(resolve));
+ if (this.redisClient) await this.redisClient.quit();
+
+ this.isRunning = false;
+ }
+
+ getConnectedAgents() {
+ return Array.from(this.agents.keys());
+ }
+
+ getStatus() {
+ return {
+ running: this.isRunning,
+ port: this.config.port,
+ connectedAgents: this.agents.size,
+ agents: this.getConnectedAgents(),
+ redisConnected: !!this.redisClient
+ };
+ }
+
+ _setupWebSocketHandlers() {
+ this.wsServer.on('connection', (ws, req) => {
+ const agentId = this._extractAgentId(req);
+ ws.send(JSON.stringify({ type: 'welcome', clientId: this._generateClientId() }));
+
+ ws.on('message', (data) => this._handleMessage(ws, agentId, data));
+ ws.on('close', () => this._handleDisconnect(ws, agentId));
+ ws.isAlive = true;
+ ws.on('pong', () => { ws.isAlive = true; });
+ });
+
+ // Heartbeat check
+ setInterval(() => {
+ this.wsServer.clients.forEach(ws => {
+ if (!ws.isAlive) return ws.terminate();
+ ws.isAlive = false;
+ ws.ping();
+ });
+ }, this.config.heartbeatInterval);
+ }
+
+ async _handleMessage(ws, agentId, data) {
+ try {
+ const message = JSON.parse(data.toString());
+
+ if (message.type === 'register') {
+ this.agents.set(message.agentId || agentId, { ws, registeredAt: new Date().toISOString() });
+ ws.send(JSON.stringify({ type: 'registered', agentId: message.agentId || agentId }));
+ } else if (message.type === 'message') {
+ const targetAgent = this.agents.get(message.agent || message.to);
+ if (targetAgent?.ws?.readyState === WebSocket.OPEN) {
+ targetAgent.ws.send(JSON.stringify({ ...message, from: agentId }));
+ }
+ } else if (message.type === 'ping') {
+ ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
+ }
+ } catch (error) {
+ console.error('[Gateway] Message error:', error.message);
+ }
+ }
+
+ _handleHttpRequest(req, res) {
+ const url = new URL(req.url, `http://${req.headers.host}`);
+
+ if (url.pathname === '/health') {
+ res.writeHead(200, { 'Content-Type': 'application/json' });
+ res.end(JSON.stringify(this.getStatus()));
+ } else if (url.pathname === '/agents') {
+ res.writeHead(200, { 'Content-Type': 'application/json' });
+ res.end(JSON.stringify({ connected: this.getConnectedAgents(), count: this.agents.size }));
+ } else if (url.pathname === '/agent-status') {
+ res.writeHead(200, { 'Content-Type': 'application/json' });
+ res.end(JSON.stringify({
+ timestamp: new Date().toISOString(),
+ totalAgents: this.agents.size,
+ agents: Array.from(this.agents.entries()).map(([id, a]) => ({
+ agentId: id,
+ status: a.ws?.readyState === WebSocket.OPEN ? 'online' : 'offline',
+ lastSeen: a.registeredAt
+ }))
+ }));
+ } else {
+ res.writeHead(404, { 'Content-Type': 'application/json' });
+ res.end(JSON.stringify({ error: 'Not found' }));
+ }
+ }
+
+ _handleDisconnect(ws, agentId) {
+ if (agentId && this.agents.has(agentId)) {
+ this.agents.delete(agentId);
+ }
+ }
+
+ _extractAgentId(req) {
+ try {
+ const url = new URL(req.url, `http://${req.headers.host}`);
+ return url.searchParams.get('agentId') || req.headers['x-agent-id'] || null;
+ } catch {
+ return null;
+ }
+ }
+
+ _generateClientId() {
+ return `gw-${Date.now()}-${crypto.randomBytes(4).toString('hex')}`;
+ }
+}
+
+if (require.main === module) {
+ const gateway = new OpenClawGateway();
+ gateway.start().catch(console.error);
+ process.on('SIGINT', () => gateway.stop());
+}
+
+module.exports = { OpenClawGateway, CONFIG, A2A_PREFIX };
diff --git a/docker-compose.redis.yml b/docker-compose.redis.yml
new file mode 100644
index 0000000..heretek4
--- /dev/null
+++ b/docker-compose.redis.yml
@@ -0,0 +1,20 @@
+version: '3.8'
+
+services:
+ redis:
+ image: redis:7-alpine
+ container_name: heretek-redis
+ ports:
+ - "6379:6379"
+ volumes:
+ - redis-data:/data
+ command: redis-server --appendonly yes
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+
+volumes:
+ redis-data:
+ driver: local
diff --git a/docker-compose.gateway.yml b/docker-compose.gateway.yml
new file mode 100644
index 0000000..heretek5
--- /dev/null
+++ b/docker-compose.gateway.yml
@@ -0,0 +1,25 @@
+version: '3.8'
+
+services:
+ gateway:
+ build:
+ context: .
+ dockerfile: Dockerfile.gateway
+ container_name: heretek-gateway
+ ports:
+ - "18789:18789"
+ - "18788:18788"
+ environment:
+ - GATEWAY_PORT=18789
+ - REDIS_URL=redis://redis:6379
+ depends_on:
+ - redis
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:18788/health"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+
+ redis:
+ image: redis:7-alpine
+ container_name: heretek-gateway-redis
diff --git a/Dockerfile.gateway b/Dockerfile.gateway
new file mode 100644
index 0000000..heretek6
--- /dev/null
+++ b/Dockerfile.gateway
@@ -0,0 +1,20 @@
+FROM node:20-alpine
+
+WORKDIR /app
+
+# Install dependencies
+COPY package*.json ./
+RUN npm ci --only=production
+
+# Copy gateway code
+COPY gateway/openclaw-gateway.js ./gateway/
+
+# Install curl for healthcheck
+RUN apk add --no-cache curl
+
+EXPOSE 18789 18788
+
+CMD ["node", "gateway/openclaw-gateway.js"]
+
+HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
+ CMD curl -f http://localhost:18788/health || exit 1