From e1c1d7ea396ceffb4dffe1e94fb1e6f08f07b862 Mon Sep 17 00:00:00 2001 From: DecDuck Date: Mon, 21 Oct 2024 21:50:21 +1100 Subject: [PATCH] fixed task system --- server/api/v1/task/index.get.ts | 22 ++++++++++++++-------- server/internal/tasks/index.ts | 10 +++++----- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/server/api/v1/task/index.get.ts b/server/api/v1/task/index.get.ts index 669ffcc..f511c51 100644 --- a/server/api/v1/task/index.get.ts +++ b/server/api/v1/task/index.get.ts @@ -3,12 +3,16 @@ import session from "~/server/internal/session"; import { v4 as uuidv4 } from "uuid"; import taskHandler, { TaskMessage } from "~/server/internal/tasks"; +// TODO add web socket sessions for horizontal scaling +// ID to admin +const socketSessions: { [key: string]: boolean } = {}; + export default defineWebSocketHandler({ open(peer) { const dummyEvent = { node: { req: { - headers: peer.headers, + headers: peer.request?.headers, }, }, } as unknown as H3Event; @@ -18,29 +22,31 @@ export default defineWebSocketHandler({ return; } const admin = session.getAdminUser(dummyEvent); - const peerId = uuidv4(); - peer.ctx.id = peerId; - peer.ctx.admin = admin !== undefined; + socketSessions[peer.id] = admin !== undefined; const rtMsg: TaskMessage = { id: "connect", + name: "Connect", success: true, progress: 0, error: undefined, log: [], }; - peer.send(rtMsg); + peer.send(JSON.stringify(rtMsg)); }, message(peer, message) { - if (!peer.ctx.id) return; + if (!peer.id) return; + if (socketSessions[peer.id] === undefined) return; const text = message.text(); if (text.startsWith("connect/")) { const id = text.substring("connect/".length); - taskHandler.connect(peer.ctx.id, id, peer, peer.ctx.admin); + taskHandler.connect(peer.id, id, peer, socketSessions[peer.id]); return; } }, close(peer, details) { - if (!peer.ctx.id) return; + if (!peer.id) return; + if (socketSessions[peer.id] === undefined) return; + delete socketSessions[peer.id]; }, }); diff --git a/server/internal/tasks/index.ts b/server/internal/tasks/index.ts index ce8a249..57bae6c 100644 --- a/server/internal/tasks/index.ts +++ b/server/internal/tasks/index.ts @@ -41,7 +41,7 @@ class TaskHandler { }; for (const client of Object.keys(taskEntry.clients)) { if (!this.clientRegistry[client]) continue; - this.clientRegistry[client].send(taskMessage); + this.clientRegistry[client].send(JSON.stringify(taskMessage)); } updateCollectTimeout = undefined; }, 100); @@ -91,9 +91,9 @@ class TaskHandler { connect(id: string, taskId: string, peer: PeerImpl, isAdmin = false) { const task = this.taskRegistry[taskId]; - if (!task) return false; + if (!task) return "Invalid task"; - if (task.requireAdmin && !isAdmin) return false; + if (task.requireAdmin && !isAdmin) return "Requires admin"; this.clientRegistry[id] = peer; this.taskRegistry[taskId].clients[id] = true; // Uniquely insert client to avoid sending duplicate traffic @@ -106,7 +106,7 @@ class TaskHandler { log: task.log, progress: task.progress, }; - peer.send(catchupMessage); + peer.send(JSON.stringify(catchupMessage)); return true; } @@ -150,7 +150,7 @@ export type TaskMessage = { }; export type PeerImpl = { - send: (message: TaskMessage) => void; + send: (message: string) => void; }; export const taskHandler = new TaskHandler();