diff --git a/README.md b/README.md index 6e75d5a..464b991 100644 --- a/README.md +++ b/README.md @@ -179,8 +179,12 @@ for [Async Context](https://github.com/tc39/proposal-async-context) to solve thi ### `withStore` +Adding a `getStore()` method to the workflow context, which returns a store object, each store is linked to the workflow +context. + ```ts import { withStore } from "fluere/middleware/store"; + const workflow = withStore( () => ({ pendingTasks: new Set>(), @@ -203,7 +207,7 @@ const { getStore } = workflow.createContext(); ### `withValidation` -make first parameter of `handler` to be `sendEvent` and its type safe and runtime safe +Make first parameter of `handler` to be `sendEvent` and its type safe and runtime safe when you create a workflow using `withValidation`. ```ts @@ -236,6 +240,125 @@ workflow.handle([startEvent], (sendEvent, start) => { }); ``` +### `withTraceEvents` + +Adds tracing capabilities to your workflow, allowing you to monitor/decorate handler and debug event flows easily. + +When enabled, +it collects events based on the directed graph of the runtime and provide lifecycle hooks for each handler. + +```ts +import { withTraceEvents, runOnce } from "fluere/middleware/trace-events"; + +const workflow = withTraceEvents(createWorkflow()); + +workflow.handle( + [messageEvent], + runOnce(() => { + console.log("This message handler will only run once"); + }), +); + +workflow.handle([startEvent], () => { + getContext().sendEvent(messageEvent.with()); + getContext().sendEvent(messageEvent.with()); +}); + +{ + const { sendEvent } = workflow.createContext(); + sendEvent(startEvent.with()); + sendEvent(messageEvent.with()); + // This message handler will only run once! +} +{ + const { sendEvent } = workflow.createContext(); + // For each new context, the decorator is isolated. + sendEvent(startEvent.with()); + sendEvent(messageEvent.with()); + // This message handler will only run once! +} +``` + +#### `createHandlerDecorator` + +You can create your own handler decorator to modify the behavior of the handler. + +```ts +import { createHandlerDecorator } from "fluere/middleware/trace-events"; + +const noop: (...args: any[]) => void = function noop() {}; +export const runOnce = createHandlerDecorator({ + debugLabel: "onceHook", + getInitialValue: () => false, + onBeforeHandler: (handler, handlerContext, tracked) => + tracked ? noop : handler, + onAfterHandler: () => true, +}); +``` + +#### `HandlerContext` + +The `HandlerContext` includes the runtime information of the handler in the directed graph of the workflow. + +```ts +type BaseHandlerContext = { + // ... some other properties are hidden + handler: Handler[], any>; + inputEvents: WorkflowEvent[]; + // events data that are accepted by the handler + inputs: WorkflowEventData[]; + // events data that are emitted by the handler + outputs: WorkflowEventData[]; + + //#region linked list data structure + prev: HandlerContext; + next: Set; + root: HandlerContext; + //#endregion +}; + +type SyncHandlerContext = BaseHandlerContext & { + async: false; + pending: null; +}; + +type AsyncHandlerContext = BaseHandlerContext & { + async: true; + pending: Promise | void> | null; +}; + +type HandlerContext = AsyncHandlerContext | SyncHandlerContext; +``` + +For example, when you send two `startEvent` events, and send `messageEvent` twice (once in the handler and once in the global), +the `HandlerContext` from root to leaf is: + +```ts +let once = false; +workflow.handle([startEvent], () => { + const { sendEvent } = getContext(); + if (once) { + return; + } + once = true; + sendEvent(messageEvent.with()); +}); +const { sendEvent } = workflow.createContext(); +sendEvent(startEvent.with()); +sendEvent(startEvent.with()); +sendEvent(messageEvent.with()); +``` + +``` +rootHandlerContext(0) + ├── startEventContext(0) + │ └── messageEventContext(0) + ├── startEventContext(1) + └── messageEventContext(1) +``` + +You can use any directed graph library to visualize the directed graph of the workflow. + # LICENSE MIT diff --git a/packages/fluere/src/core/event.ts b/packages/fluere/src/core/event.ts index 092ee89..4845822 100644 --- a/packages/fluere/src/core/event.ts +++ b/packages/fluere/src/core/event.ts @@ -64,8 +64,6 @@ export const workflowEvent = ( }); event.toString = () => config?.debugLabel ?? `WorkflowEvent<${l1}>`; - Object.freeze(event); - return event; }; diff --git a/packages/fluere/src/core/internal/context.ts b/packages/fluere/src/core/internal/context.ts index 092fa0b..0c5d663 100644 --- a/packages/fluere/src/core/internal/context.ts +++ b/packages/fluere/src/core/internal/context.ts @@ -1,5 +1,40 @@ import { createAsyncContext } from "fluere/async-context"; -import type { WorkflowEventData } from "../event"; +import type { WorkflowEvent, WorkflowEventData } from "../event"; +import type { Handler } from "./handler"; + +type BaseHandlerContext = { + abortController: AbortController; + handler: Handler[], any>; + // events that are accepted by the handler + inputEvents: WorkflowEvent[]; + // events data that are accepted by the handler + inputs: WorkflowEventData[]; + // events data that are emitted by the handler + outputs: WorkflowEventData[]; + + //#region linked list data structure + prev: HandlerContext; + next: Set; + root: HandlerContext; + //#endregion +}; + +type SyncHandlerContext = BaseHandlerContext & { + async: false; + pending: null; +}; + +type AsyncHandlerContext = BaseHandlerContext & { + async: true; + pending: Promise | void> | null; +}; + +export type HandlerContext = AsyncHandlerContext | SyncHandlerContext; + +export type ContextNext = ( + context: HandlerContext, + next: (context: HandlerContext) => void, +) => void; export interface WorkflowContext { get stream(): ReadableStream>; @@ -9,13 +44,7 @@ export interface WorkflowContext { /** * @internal */ - __internal__call_context: Set< - ( - context: WorkflowContext, - inputs: WorkflowEventData[], - next: () => void, - ) => void - >; + __internal__call_context: Set; } export const _executorAsyncLocalStorage = createAsyncContext(); @@ -23,7 +52,7 @@ export const _executorAsyncLocalStorage = createAsyncContext(); export function getContext(): WorkflowContext { const context = _executorAsyncLocalStorage.getStore(); if (!context) { - throw new Error("No context found"); + throw new Error("No current context found"); } return context; } diff --git a/packages/fluere/src/core/internal/executor.ts b/packages/fluere/src/core/internal/executor.ts index caaaea9..b8f3c12 100644 --- a/packages/fluere/src/core/internal/executor.ts +++ b/packages/fluere/src/core/internal/executor.ts @@ -1,22 +1,12 @@ -import type { - Handler, - HandlerRef, - WorkflowEvent, - WorkflowEventData, -} from "fluere"; +import type { Handler, WorkflowEvent, WorkflowEventData } from "fluere"; import { flattenEvents, isEventData, isPromiseLike } from "../utils"; -import { _executorAsyncLocalStorage, type WorkflowContext } from "./context"; +import { + _executorAsyncLocalStorage, + type HandlerContext, + type WorkflowContext, +} from "./context"; import { createAsyncContext } from "fluere/async-context"; -type HandlerContext = { - abortController: AbortController; - handler: Handler[], any>; - inputs: WorkflowEventData[]; - outputs: WorkflowEventData[]; - prev: HandlerContext; - next: Set; -}; - const handlerContextAsyncLocalStorage = createAsyncContext(); const eventContextWeakMap = new WeakMap< @@ -27,7 +17,7 @@ const eventContextWeakMap = new WeakMap< export type ExecutorParams = { listeners: ReadonlyMap< WorkflowEvent[], - Set[], any>> + Set[], WorkflowEventData | void>> >; }; @@ -37,9 +27,10 @@ export const createContext = ({ const queue: WorkflowEventData[] = []; const runHandler = ( handler: Handler[], any>, + inputEvents: WorkflowEvent[], inputs: WorkflowEventData[], parentContext: HandlerContext, - ) => { + ): void => { let handlerAbortController: AbortController; const handlerContext: HandlerContext = { get abortController() { @@ -48,11 +39,20 @@ export const createContext = ({ } return handlerAbortController; }, + async: + "constructor" in handler + ? handler.constructor.name === "AsyncFunction" + : false, + pending: null, handler, + inputEvents, inputs, outputs: [], prev: parentContext, next: new Set(), + get root() { + return handlerRootContext; + }, }; handlerContext.prev.next.add(handlerContext); const workflowContext = createWorkflowContext(handlerContext); @@ -66,11 +66,11 @@ export const createContext = ({ _executorAsyncLocalStorage.run(workflowContext, () => { //#region middleware let i = 0; - const next = () => { + const next = (context: HandlerContext) => { if (i === cbs.length) { let result: any; try { - result = handler(...inputs); + result = context.handler(...context.inputs); } catch (error) { if (handlerAbortController ?? rootAbortController) { (handlerAbortController ?? rootAbortController).abort(error); @@ -81,6 +81,8 @@ export const createContext = ({ } // return value is a special event if (isPromiseLike(result)) { + (handlerContext as any).async = true; + (handlerContext as any).pending = result; result.then((event) => { if (isEventData(event)) { workflowContext.sendEvent(event); @@ -93,10 +95,10 @@ export const createContext = ({ const cb = cbs[i]; if (cb) { i++; - cb(workflowContext, inputs, next); + cb(context, next); } }; - next(); + next(handlerContext); //#endregion }); }); @@ -108,13 +110,13 @@ export const createContext = ({ const inputs = flattenEvents(events, queueSnapshot); return inputs.length === events.length; }) - .map(([events, refs]) => { + .map(([events, handlers]) => { const inputs = flattenEvents(events, queueSnapshot); inputs.forEach((input) => { queue.splice(queue.indexOf(input), 1); }); - for (const ref of refs) { - runHandler(ref.handler, inputs, handlerContext); + for (const handler of handlers) { + runHandler(handler, events, inputs, handlerContext); } }); }; @@ -163,11 +165,17 @@ export const createContext = ({ } return rootAbortController; }, + async: false, + pending: null, + inputEvents: [], inputs: [], outputs: [], handler: null!, prev: null!, next: new Set(), + get root() { + return handlerRootContext; + }, }; const rootWorkflowContext = createWorkflowContext(handlerRootContext); diff --git a/packages/fluere/src/core/workflow.ts b/packages/fluere/src/core/workflow.ts index 48d2ee6..6dc2aa5 100644 --- a/packages/fluere/src/core/workflow.ts +++ b/packages/fluere/src/core/workflow.ts @@ -1,7 +1,7 @@ -import { type WorkflowEvent } from "./event"; +import { type WorkflowEvent, type WorkflowEventData } from "./event"; import { createContext } from "./internal/executor"; -import { type Handler, type HandlerRef } from "./internal/handler"; -import type { WorkflowContext } from "./internal/context"; +import { type Handler } from "./internal/handler"; +import { getContext, type WorkflowContext } from "./internal/context"; export type Workflow = { handle< @@ -10,7 +10,7 @@ export type Workflow = { >( accept: AcceptEvents, handler: Handler, - ): HandlerRef; + ): void; createContext(): WorkflowContext; }; @@ -18,7 +18,7 @@ export function createWorkflow(): Workflow { const config = { steps: new Map< WorkflowEvent[], - Set[], any>> + Set[], WorkflowEventData | void>> >(), }; @@ -29,43 +29,31 @@ export function createWorkflow(): Workflow { >( accept: AcceptEvents, handler: Handler, - ): HandlerRef => { + ): void => { + // smoke test to check if we are outside the context + let success = false; + try { + getContext(); + console.error("Calling handle inside of context is not allowed."); + success = true; + } catch {} + if (success) { + throw new Error("Calling handle inside of context is not allowed."); + } if (config.steps.has(accept)) { const set = config.steps.get(accept) as Set< - HandlerRef + Handler >; - const ref: HandlerRef = { - get handler() { - return handler; - }, - unsubscribe: () => { - set.delete(ref); - if (set.size === 0) { - config.steps.delete(accept); - } - }, - }; - set.add(ref); - return ref; + set.add(handler); } else { - const set = new Set>(); - const ref: HandlerRef = { - get handler() { - return handler; - }, - unsubscribe: () => { - set.delete(ref); - if (set.size === 0) { - config.steps.delete(accept); - } - }, - }; - set.add(ref); + const set = new Set>(); + set.add(handler); config.steps.set( accept, - set as Set[], any>>, + set as Set< + Handler[], WorkflowEventData | void> + >, ); - return ref; } }, createContext() { diff --git a/packages/fluere/src/middleware/store.ts b/packages/fluere/src/middleware/store.ts index d132f9f..5794980 100644 --- a/packages/fluere/src/middleware/store.ts +++ b/packages/fluere/src/middleware/store.ts @@ -1,27 +1,51 @@ import { type WorkflowContext, type Workflow, getContext } from "fluere"; -export function withStore( +export function withStore< + T, + Input extends void, + WorkflowLike extends { + createContext(): WorkflowContext; + } = { + createContext(): WorkflowContext; + }, +>( createStore: () => T, - workflow: Workflow, -): Omit & { + workflow: WorkflowLike, +): Omit & { createContext(): WorkflowContext & { getStore(): T; }; getStore(): T; }; -export function withStore( +export function withStore< + T, + Input, + WorkflowLike extends { + createContext(): WorkflowContext; + } = { + createContext(): WorkflowContext; + }, +>( createStore: (input: Input) => T, - workflow: Workflow, -): Omit & { + workflow: WorkflowLike, +): Omit & { createContext(input: Input): WorkflowContext & { getStore(): T; }; getStore(): T; }; -export function withStore( +export function withStore< + T, + Input, + WorkflowLike extends { + createContext(): WorkflowContext; + } = { + createContext(): WorkflowContext; + }, +>( createStore: (input: Input) => T, - workflow: Workflow, -): Omit & { + workflow: WorkflowLike, +): Omit & { createContext(input: Input): WorkflowContext & { getStore(): T; }; @@ -40,9 +64,9 @@ export function withStore( const context = workflow.createContext() as WorkflowContext & { getStore: () => T; }; - context.__internal__call_context.add((context, _, next) => { - (context as any).getStore = () => currentStore; - next(); + context.__internal__call_context.add((_, next) => { + (getContext() as any).getStore = () => currentStore; + next(_); }); (context as any).getStore = () => currentStore; return context; diff --git a/packages/fluere/src/middleware/trace-events.ts b/packages/fluere/src/middleware/trace-events.ts new file mode 100644 index 0000000..0b12689 --- /dev/null +++ b/packages/fluere/src/middleware/trace-events.ts @@ -0,0 +1,152 @@ +import type { + Handler, + Workflow, + WorkflowContext, + WorkflowEvent, + WorkflowEventData, +} from "fluere"; +import { isPromiseLike } from "../core/utils"; +import { + createHandlerDecorator, + decoratorRegistry, +} from "./trace-events/create-handler-decorator"; +import { runOnce } from "./trace-events/run-once"; + +type TracingContext = Record; + +const tracingWeakMap = new WeakMap< + WorkflowContext, + WeakMap< + WorkflowEvent[], + WeakMap< + Handler[], WorkflowEventData | void>, + TracingContext + > + > +>(); + +export type HandlerRef< + AcceptEvents extends WorkflowEvent[], + Result extends ReturnType["with"]> | void, + Fn extends Handler, +> = { + get handler(): Fn; +}; + +export function withTraceEvents< + WorkflowLike extends { + handle< + const AcceptEvents extends WorkflowEvent[], + Result extends ReturnType["with"]> | void, + >( + accept: AcceptEvents, + handler: Handler, + ): void; + createContext(): WorkflowContext; + }, +>( + workflow: WorkflowLike, +): Omit & { + handle< + const AcceptEvents extends WorkflowEvent[], + Result extends ReturnType["with"]> | void, + Fn extends Handler, + >( + accept: AcceptEvents, + handler: Fn, + ): HandlerRef; + createContext(): WorkflowContext; +} { + return { + ...workflow, + handle: < + const AcceptEvents extends WorkflowEvent[], + Result extends ReturnType["with"]> | void, + Fn extends Handler, + >( + accept: AcceptEvents, + handler: Fn, + ): HandlerRef => { + workflow.handle(accept, handler); + return { + get handler(): Fn { + return handler; + }, + }; + }, + createContext(): WorkflowContext { + const context = workflow.createContext(); + tracingWeakMap.set(context, new WeakMap()); + context.__internal__call_context.add((handlerContext, next) => { + const inputEvents = handlerContext.inputEvents; + const handlersWeakMap = tracingWeakMap.get(context)!; + if (!handlersWeakMap.has(inputEvents)) { + handlersWeakMap.set(inputEvents, new WeakMap()); + } + const handlerWeakMap = handlersWeakMap.get(inputEvents)!; + + const originalHandler = handlerContext.handler; + let finalHandler = originalHandler; + let handlerMiddleware: Handler< + WorkflowEvent[], + WorkflowEventData | void + >; + if (!handlerWeakMap) { + throw new Error( + "Handler context is not defined, this should not happen. Please report this issue with a reproducible example.", + ); + } + const tracingContext: TracingContext = + handlerWeakMap.get(originalHandler) ?? {}; + if (!handlerWeakMap.has(originalHandler)) { + handlerWeakMap.set(originalHandler, tracingContext); + } + + const onAfterHandlers = [] as (() => void)[]; + const onBeforeHandlers = [] as (( + nextHandler: Handler< + WorkflowEvent[], + WorkflowEventData | void + >, + ) => Handler[], WorkflowEventData | void>)[]; + handlerMiddleware = (...args) => { + const result = onBeforeHandlers.reduce((next, cb) => { + return cb(next); + }, finalHandler)(...args); + if (isPromiseLike(result)) { + return result.then(() => { + onAfterHandlers.forEach((cb) => { + cb(); + }); + }); + } else { + onAfterHandlers.forEach((cb) => { + cb(); + }); + return result; + } + }; + [...decoratorRegistry].forEach( + ([name, { getInitialValue, onAfterHandler, onBeforeHandler }]) => { + if (!tracingContext[name]) { + tracingContext[name] = getInitialValue(); + } + onBeforeHandlers.push((next) => + onBeforeHandler(next, handlerContext, tracingContext[name]), + ); + onAfterHandlers.push(() => { + tracingContext[name] = onAfterHandler(tracingContext[name]); + }); + }, + ); + next({ + ...handlerContext, + handler: handlerMiddleware, + }); + }); + return context; + }, + }; +} + +export { createHandlerDecorator, runOnce }; diff --git a/packages/fluere/src/middleware/trace-events/create-handler-decorator.ts b/packages/fluere/src/middleware/trace-events/create-handler-decorator.ts new file mode 100644 index 0000000..652e5c2 --- /dev/null +++ b/packages/fluere/src/middleware/trace-events/create-handler-decorator.ts @@ -0,0 +1,46 @@ +import type { Handler, WorkflowEvent, WorkflowEventData } from "fluere"; +import type { HandlerContext } from "../../core/internal/context"; + +const namespace = "decorator" as const; + +let counter = 0; + +export const decoratorRegistry = new Map< + string, + { + debugLabel: string; + getInitialValue: () => any; + onBeforeHandler: ( + handler: Handler[], WorkflowEventData | void>, + handlerContext: Readonly, + metadata: any, + ) => Handler[], WorkflowEventData | void>; + onAfterHandler: (metadata: any) => any; + } +>(); + +export function createHandlerDecorator(config: { + debugLabel?: string; + getInitialValue: () => Metadata; + onBeforeHandler: ( + handler: Handler[], WorkflowEventData | void>, + handlerContext: HandlerContext, + metadata: Metadata, + ) => Handler[], WorkflowEventData | void>; + onAfterHandler: (metadata: Metadata) => Metadata; +}) { + const uid = `${namespace}:${counter++}`; + decoratorRegistry.set(uid, { + debugLabel: config.debugLabel ?? uid, + getInitialValue: config.getInitialValue, + onAfterHandler: config.onAfterHandler, + onBeforeHandler: config.onBeforeHandler, + }); + return function < + const AcceptEvents extends WorkflowEvent[], + Result extends ReturnType["with"]> | void, + Fn extends Handler, + >(handler: Fn) { + return handler; + }; +} diff --git a/packages/fluere/src/middleware/trace-events/run-once.ts b/packages/fluere/src/middleware/trace-events/run-once.ts new file mode 100644 index 0000000..aaa778f --- /dev/null +++ b/packages/fluere/src/middleware/trace-events/run-once.ts @@ -0,0 +1,10 @@ +import { createHandlerDecorator } from "./create-handler-decorator"; + +const noop: (...args: any[]) => void = function noop() {}; + +export const runOnce = createHandlerDecorator({ + debugLabel: "onceHook", + getInitialValue: () => false, + onBeforeHandler: (handler, _, tracked) => (tracked ? noop : handler), + onAfterHandler: () => true, +}); diff --git a/packages/fluere/src/middleware/validation.ts b/packages/fluere/src/middleware/validation.ts index ad62cee..3bd6c88 100644 --- a/packages/fluere/src/middleware/validation.ts +++ b/packages/fluere/src/middleware/validation.ts @@ -2,7 +2,6 @@ import { type WorkflowContext, getContext, type Handler, - type HandlerRef, type Workflow, type WorkflowEvent, type WorkflowEventData, @@ -44,7 +43,7 @@ export type WithValidationWorkflow< >( accept: AcceptEvents, handler: ValidationHandler, - ): HandlerRef; + ): void; createContext(): WorkflowContext; }; @@ -66,22 +65,18 @@ export function withValidation< const store = getContext(); const originalSendEvent = store.sendEvent; return (...inputs: WorkflowEventData[]) => { - let matched = false; for (let i = 0; i < outputs.length; i++) { const output = outputs[i]!; if (output.length === inputs.length) { if (output.every((e, idx) => e.include(inputs[idx]))) { - matched = true; - break; + return originalSendEvent(...inputs); } } } - if (matched) { - console.warn( - "Invalid input detected [%s]", - inputs.map((i) => i.data).join(", "), - ); - } + console.warn( + "Invalid input detected [%s]", + inputs.map((i) => i.data).join(", "), + ); return originalSendEvent(...inputs); }; }; @@ -102,9 +97,11 @@ export function withValidation< }, createContext(): WorkflowContext { const context = workflow.createContext(); - context.__internal__call_context.add((context, inputs, next) => { - (context as any).safeSendEvent = createSafeSendEvent(...inputs); - next(); + context.__internal__call_context.add((context, next) => { + (getContext() as any).safeSendEvent = createSafeSendEvent( + ...context.inputs, + ); + next(context); }); return context; }, diff --git a/packages/fluere/src/util/zod.ts b/packages/fluere/src/util/zod.ts index a95a85c..9862ee2 100644 --- a/packages/fluere/src/util/zod.ts +++ b/packages/fluere/src/util/zod.ts @@ -5,16 +5,15 @@ import { type WorkflowEventConfig, } from "fluere"; -export const zodEvent = ( +export const zodEvent = ( schema: z.ZodType, - config?: WorkflowEventConfig, -): WorkflowEvent => { - const event = workflowEvent(config); - return { - include: event.include, - with(data: T) { - schema.parse(data); - return event.with(data); - }, - } as unknown as WorkflowEvent; + config?: WorkflowEventConfig, +): WorkflowEvent => { + const event = workflowEvent(config); + const originalWith = event.with; + event.with = (data: T) => { + schema.parse(data); + return originalWith(data); + }; + return event; }; diff --git a/packages/fluere/tests/full-workflow.spec.ts b/packages/fluere/tests/full-workflow.spec.ts new file mode 100644 index 0000000..22afc61 --- /dev/null +++ b/packages/fluere/tests/full-workflow.spec.ts @@ -0,0 +1,79 @@ +import { describe, test, expect } from "vitest"; +import { createWorkflow, eventSource, type WorkflowEvent } from "fluere"; +import { withStore } from "fluere/middleware/store"; +import { withTraceEvents } from "fluere/middleware/trace-events"; +import { withValidation } from "fluere/middleware/validation"; +import { zodEvent } from "fluere/util/zod"; +import { z } from "zod"; +import { webcrypto } from "node:crypto"; + +describe("full workflow middleware", () => { + const createFullWorkflow = < + const Validation extends [ + inputs: WorkflowEvent[], + outputs: WorkflowEvent[], + ][], + T, + Input, + >( + validation: Validation, + createStore: (input: Input) => T, + ) => { + return withStore( + createStore, + withValidation(withTraceEvents(createWorkflow()), validation), + ); + }; + test("type check", () => { + const startEvent = zodEvent(z.string(), { + debugLabel: "start", + }); + const messageEvent = zodEvent(z.string(), { + debugLabel: "message", + }); + const stopEvent = zodEvent(z.string(), { + debugLabel: "stop", + }); + const workflow = createFullWorkflow( + [[[startEvent], [stopEvent]]], + () => ({}), + ); + workflow.handle([startEvent], (sendEvent, events) => { + // @ts-expect-error + sendEvent(messageEvent.with()); + sendEvent(stopEvent.with("")); + }); + }); + + test("basic", async () => { + const startEvent = zodEvent(z.string(), { + debugLabel: "start", + }); + const stopEvent = zodEvent(z.string(), { + debugLabel: "stop", + }); + const workflow = createFullWorkflow( + [[[startEvent], [stopEvent]]], + (id: string) => ({ + id, + }), + ); + workflow.handle([startEvent], (sendEvent, start) => { + expect(start.data).toBe("start"); + sendEvent(stopEvent.with(workflow.getStore().id)); + }); + + const id = webcrypto.randomUUID(); + const { sendEvent, stream } = workflow.createContext(id); + const events = []; + sendEvent(startEvent.with("start")); + for await (const event of stream) { + events.push(event); + if (stopEvent.include(event)) { + break; + } + } + expect(events.length).toBe(2); + expect(events.map(eventSource)).toEqual([startEvent, stopEvent]); + }); +}); diff --git a/packages/fluere/tests/with-trace-events.spec.ts b/packages/fluere/tests/with-trace-events.spec.ts new file mode 100644 index 0000000..50c04ec --- /dev/null +++ b/packages/fluere/tests/with-trace-events.spec.ts @@ -0,0 +1,110 @@ +import { describe, test, vi, expectTypeOf, type Mock, expect } from "vitest"; +import { createWorkflow, workflowEvent } from "fluere"; +import { + withTraceEvents, + runOnce, + createHandlerDecorator, +} from "fluere/middleware/trace-events"; + +describe("with trace events", () => { + test("runOnce", () => { + const workflow = withTraceEvents(createWorkflow()); + const startEvent = workflowEvent(); + const ref = workflow.handle([startEvent], vi.fn(runOnce(() => {}))); + expectTypeOf(ref.handler).toEqualTypeOf void>>(); + { + const { sendEvent } = workflow.createContext(); + expect(ref.handler).not.toHaveBeenCalled(); + sendEvent(startEvent.with()); + expect(ref.handler).toBeCalledTimes(1); + sendEvent(startEvent.with()); + expect(ref.handler).toBeCalledTimes(1); + } + ref.handler.mockReset(); + { + const { sendEvent } = workflow.createContext(); + expect(ref.handler).not.toHaveBeenCalled(); + sendEvent(startEvent.with()); + expect(ref.handler).toBeCalledTimes(1); + } + }); + + test("example: no parallel", async () => { + const workflow = withTraceEvents(createWorkflow()); + const startEvent = workflowEvent(); + type Metadata = { + running: boolean; + }; + const getInitialValue = vi.fn( + (): Metadata => ({ + running: false, + }), + ); + const noParallel = createHandlerDecorator({ + getInitialValue, + onAfterHandler: () => ({ + running: false, + }), + onBeforeHandler: (h, handlerContext, metadata) => async () => { + metadata.running = true; + const root = handlerContext.root; + const similarContexts = []; + const queue = [root]; + while (queue.length > 0) { + const current = queue.pop(); + if (!current) { + break; + } + if (current.handler === handlerContext.handler) { + similarContexts.push(current); + } + queue.push(...current.next); + } + const asyncContexts = similarContexts.filter((c) => c.async); + for (const context of asyncContexts) { + await context.pending; + } + return h(); + }, + }); + let count = 0; + let resolveNext: () => void = null!; + let p = new Promise((_resolve) => { + resolveNext = _resolve; + }); + let result: number[] = []; + const ref = workflow.handle( + [startEvent], + vi.fn( + noParallel(async () => { + count++; + const curr = count; + await p.then(() => { + result.push(curr); + p = new Promise((_resolve) => { + resolveNext = _resolve; + }); + }); + }), + ), + ); + expectTypeOf(ref.handler).toEqualTypeOf Promise>>(); + const { sendEvent } = workflow.createContext(); + expect(ref.handler).not.toHaveBeenCalled(); + sendEvent(startEvent.with()); + sendEvent(startEvent.with()); + sendEvent(startEvent.with()); + expect(ref.handler).toBeCalledTimes(1); + expect(result).toEqual([]); + resolveNext(); + vi.waitFor(() => expect(result).toEqual([1])); + resolveNext(); + vi.waitFor(() => expect(result).toEqual([1, 2])); + resolveNext(); + vi.waitFor(() => expect(result).toEqual([1, 2, 3])); + sendEvent(startEvent.with()); + vi.waitFor(() => expect(result).toEqual([1, 2, 3])); + resolveNext(); + vi.waitFor(() => expect(result).toEqual([1, 2, 3, 4])); + }); +}); diff --git a/packages/fluere/tests/with-validation.spec.ts b/packages/fluere/tests/with-validation.spec.ts index b4cb7d0..48775d7 100644 --- a/packages/fluere/tests/with-validation.spec.ts +++ b/packages/fluere/tests/with-validation.spec.ts @@ -13,12 +13,18 @@ describe("with directed graph", () => { }); test("basic", async () => { - const startEvent = workflowEvent(); + const startEvent = workflowEvent({ + debugLabel: "start", + }); const nonEvent = workflowEvent({ debugLabel: "non", }); - const parseEvent = workflowEvent(); - const stopEvent = workflowEvent(); + const parseEvent = workflowEvent({ + debugLabel: "parse", + }); + const stopEvent = workflowEvent({ + debugLabel: "stop", + }); const workflow = withValidation(createWorkflow(), [ [[startEvent], [stopEvent]], [[startEvent], [parseEvent, parseEvent]], @@ -39,11 +45,17 @@ describe("with directed graph", () => { sendEvent(startEvent.with()); await find(stream, stopEvent); expect(fn).toBeCalled(); - expect(consoleWarnMock).toHaveBeenCalledOnce(); - expect(consoleWarnMock).toHaveBeenLastCalledWith( + expect(consoleWarnMock).toBeCalledTimes(2); + expect(consoleWarnMock).toHaveBeenNthCalledWith( + 1, "Invalid input detected [%s]", "1", ); + expect(consoleWarnMock).toHaveBeenNthCalledWith( + 2, + "Invalid input detected [%s]", + "", + ); }); test("promise", async () => { diff --git a/packages/fluere/tsconfig.browser.json b/packages/fluere/tsconfig.browser.json index 03eec35..b52b110 100644 --- a/packages/fluere/tsconfig.browser.json +++ b/packages/fluere/tsconfig.browser.json @@ -6,7 +6,10 @@ "paths": { "fluere": ["./src/core/index.ts"], "fluere/stream": ["./src/stream/index.ts"], - "fluere/async-context": ["./src/async-context/index.browser.ts"] + "fluere/async-context": ["./src/async-context/index.browser.ts"], + "fluere/interrupter/*": ["./src/interrupter/*.ts"], + "fluere/middleware/*": ["./src/middleware/*.ts"], + "fluere/util/*": ["./src/util/*.ts"] } }, "include": ["./src"] diff --git a/packages/fluere/tsconfig.json b/packages/fluere/tsconfig.json index c90c3b0..7dfe6e3 100644 --- a/packages/fluere/tsconfig.json +++ b/packages/fluere/tsconfig.json @@ -21,8 +21,8 @@ "fluere/async-context": ["./src/async-context/index.ts"], "fluere/interrupter/*": ["./src/interrupter/*.ts"], "fluere/middleware/*": ["./src/middleware/*.ts"], - "fluere/util": ["./src/util/index.ts"], - "fluere/stream": ["./src/stream/index.ts"], + "fluere/util/*": ["./src/util/*.ts"], + "fluere/stream": ["./src/stream/index.ts"] } }, "include": ["./src"], diff --git a/vitest.workspace.ts b/vitest.workspace.ts index ff15a62..c1efc9a 100644 --- a/vitest.workspace.ts +++ b/vitest.workspace.ts @@ -1,3 +1 @@ -export default [ - 'packages/*', -] +export default ["packages/*"];