diff --git a/composables/task.ts b/composables/task.ts index a431074..7a0c8f2 100644 --- a/composables/task.ts +++ b/composables/task.ts @@ -1,7 +1,15 @@ import type { TaskMessage } from "~/server/internal/tasks"; +import { WebSocketHandler } from "./ws"; -let ws: WebSocket | undefined; -const msgQueue: Array = []; +const websocketHandler = new WebSocketHandler("/api/v1/task"); + +websocketHandler.listen((message) => { + const msg = JSON.parse(message) as TaskMessage; + const taskStates = useTaskStates(); + const state = taskStates.value[msg.id]; + if (!state) return; + state.value = msg; +}); const useTaskStates = () => useState<{ [key: string]: Ref }>("task-states", () => ({ @@ -15,32 +23,6 @@ const useTaskStates = () => })), })); -function initWs() { - const isSecure = location.protocol === "https:"; - const url = (isSecure ? "wss://" : "ws://") + location.host + "/api/v1/task"; - ws = new WebSocket(url); - ws.onmessage = (e) => { - const msg = JSON.parse(e.data) as TaskMessage; - const taskStates = useTaskStates(); - const state = taskStates.value[msg.id]; - if (!state) return; - state.value = msg; - }; - ws.onopen = () => { - for (const message of msgQueue) { - ws?.send(message); - } - }; - - return ws; -} - -function sendMessage(msg: string) { - if (!ws) return msgQueue.push(msg); - if (ws.readyState == 0) return msgQueue.push(msg); - return ws.send(msg); -} - export const useTaskReady = () => { const taskStates = useTaskStates(); return taskStates.value["connect"]; @@ -51,8 +33,6 @@ export const useTask = (taskId: string): Ref => { const taskStates = useTaskStates(); if (taskStates.value[taskId]) return taskStates.value[taskId]; - if (!ws) initWs(); - taskStates.value[taskId] = useState(`task-${taskId}`, () => ({ id: taskId, name: "loading...", @@ -61,6 +41,6 @@ export const useTask = (taskId: string): Ref => { error: undefined, log: [], })); - sendMessage(`connect/${taskId}`); + websocketHandler.send(`connect/${taskId}`); return taskStates.value[taskId]; }; diff --git a/composables/ws.ts b/composables/ws.ts new file mode 100644 index 0000000..0501a05 --- /dev/null +++ b/composables/ws.ts @@ -0,0 +1,48 @@ +export type WebSocketCallback = (message: string) => void; + +export class WebSocketHandler { + private listeners: Array = []; + private outQueue: Array = []; + private inQueue: Array = []; + private ws: WebSocket | undefined = undefined; + private connected: boolean = false; + + constructor(route: string) { + if (import.meta.server) return; + const isSecure = location.protocol === "https:"; + const url = (isSecure ? "wss://" : "ws://") + location.host + route; + this.ws = new WebSocket(url); + + this.ws.onopen = () => { + this.connected = true; + for (const message of this.outQueue) { + this.ws?.send(message); + } + }; + + this.ws.onmessage = (e) => { + const message = e.data; + if (this.listeners.length == 0) { + this.inQueue.push(message); + return; + } + + for (const listener of this.listeners) { + listener(message); + } + }; + } + + listen(callback: WebSocketCallback) { + this.listeners.push(callback); + } + + send(message: string) { + if (!this.connected || !this.ws) { + this.outQueue.push(message); + return; + } + + this.ws.send(message); + } +} diff --git a/pages/admin/task/[id]/index.vue b/pages/admin/task/[id]/index.vue index 8162323..36833a8 100644 --- a/pages/admin/task/[id]/index.vue +++ b/pages/admin/task/[id]/index.vue @@ -35,7 +35,7 @@