feat: middleware withTraceEvents (#24)

This commit is contained in:
Alex Yang
2025-04-05 21:53:15 -07:00
committed by GitHub
parent 5ac0b476af
commit d057a756aa
17 changed files with 696 additions and 120 deletions
+124 -1
View File
@@ -179,8 +179,12 @@ for [Async Context](https://github.com/tc39/proposal-async-context) to solve thi
### `withStore`
Adding a `getStore()` method to the workflow context, which returns a store object, each store is linked to the workflow
context.
```ts
import { withStore } from "fluere/middleware/store";
const workflow = withStore(
() => ({
pendingTasks: new Set<Promise<unknown>>(),
@@ -203,7 +207,7 @@ const { getStore } = workflow.createContext();
### `withValidation`
make first parameter of `handler` to be `sendEvent` and its type safe and runtime safe
Make first parameter of `handler` to be `sendEvent` and its type safe and runtime safe
when you create a workflow using `withValidation`.
```ts
@@ -236,6 +240,125 @@ workflow.handle([startEvent], (sendEvent, start) => {
});
```
### `withTraceEvents`
Adds tracing capabilities to your workflow, allowing you to monitor/decorate handler and debug event flows easily.
When enabled,
it collects events based on the directed graph of the runtime and provide lifecycle hooks for each handler.
```ts
import { withTraceEvents, runOnce } from "fluere/middleware/trace-events";
const workflow = withTraceEvents(createWorkflow());
workflow.handle(
[messageEvent],
runOnce(() => {
console.log("This message handler will only run once");
}),
);
workflow.handle([startEvent], () => {
getContext().sendEvent(messageEvent.with());
getContext().sendEvent(messageEvent.with());
});
{
const { sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
sendEvent(messageEvent.with());
// This message handler will only run once!
}
{
const { sendEvent } = workflow.createContext();
// For each new context, the decorator is isolated.
sendEvent(startEvent.with());
sendEvent(messageEvent.with());
// This message handler will only run once!
}
```
#### `createHandlerDecorator`
You can create your own handler decorator to modify the behavior of the handler.
```ts
import { createHandlerDecorator } from "fluere/middleware/trace-events";
const noop: (...args: any[]) => void = function noop() {};
export const runOnce = createHandlerDecorator({
debugLabel: "onceHook",
getInitialValue: () => false,
onBeforeHandler: (handler, handlerContext, tracked) =>
tracked ? noop : handler,
onAfterHandler: () => true,
});
```
#### `HandlerContext`
The `HandlerContext` includes the runtime information of the handler in the directed graph of the workflow.
```ts
type BaseHandlerContext = {
// ... some other properties are hidden
handler: Handler<WorkflowEvent<any>[], any>;
inputEvents: WorkflowEvent<any>[];
// events data that are accepted by the handler
inputs: WorkflowEventData<any>[];
// events data that are emitted by the handler
outputs: WorkflowEventData<any>[];
//#region linked list data structure
prev: HandlerContext;
next: Set<HandlerContext>;
root: HandlerContext;
//#endregion
};
type SyncHandlerContext = BaseHandlerContext & {
async: false;
pending: null;
};
type AsyncHandlerContext = BaseHandlerContext & {
async: true;
pending: Promise<WorkflowEventData<any> | void> | null;
};
type HandlerContext = AsyncHandlerContext | SyncHandlerContext;
```
For example, when you send two `startEvent` events, and send `messageEvent` twice (once in the handler and once in the global),
the `HandlerContext` from root to leaf is:
```ts
let once = false;
workflow.handle([startEvent], () => {
const { sendEvent } = getContext();
if (once) {
return;
}
once = true;
sendEvent(messageEvent.with());
});
const { sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
sendEvent(startEvent.with());
sendEvent(messageEvent.with());
```
```
rootHandlerContext(0)
├── startEventContext(0)
│ └── messageEventContext(0)
├── startEventContext(1)
└── messageEventContext(1)
```
You can use any directed graph library to visualize the directed graph of the workflow.
# LICENSE
MIT
-2
View File
@@ -64,8 +64,6 @@ export const workflowEvent = <Data = void, DebugLabel extends string = string>(
});
event.toString = () => config?.debugLabel ?? `WorkflowEvent<${l1}>`;
Object.freeze(event);
return event;
};
+38 -9
View File
@@ -1,5 +1,40 @@
import { createAsyncContext } from "fluere/async-context";
import type { WorkflowEventData } from "../event";
import type { WorkflowEvent, WorkflowEventData } from "../event";
import type { Handler } from "./handler";
type BaseHandlerContext = {
abortController: AbortController;
handler: Handler<WorkflowEvent<any>[], any>;
// events that are accepted by the handler
inputEvents: WorkflowEvent<any>[];
// events data that are accepted by the handler
inputs: WorkflowEventData<any>[];
// events data that are emitted by the handler
outputs: WorkflowEventData<any>[];
//#region linked list data structure
prev: HandlerContext;
next: Set<HandlerContext>;
root: HandlerContext;
//#endregion
};
type SyncHandlerContext = BaseHandlerContext & {
async: false;
pending: null;
};
type AsyncHandlerContext = BaseHandlerContext & {
async: true;
pending: Promise<WorkflowEventData<any> | void> | null;
};
export type HandlerContext = AsyncHandlerContext | SyncHandlerContext;
export type ContextNext = (
context: HandlerContext,
next: (context: HandlerContext) => void,
) => void;
export interface WorkflowContext {
get stream(): ReadableStream<WorkflowEventData<any>>;
@@ -9,13 +44,7 @@ export interface WorkflowContext {
/**
* @internal
*/
__internal__call_context: Set<
(
context: WorkflowContext,
inputs: WorkflowEventData<any>[],
next: () => void,
) => void
>;
__internal__call_context: Set<ContextNext>;
}
export const _executorAsyncLocalStorage = createAsyncContext<WorkflowContext>();
@@ -23,7 +52,7 @@ export const _executorAsyncLocalStorage = createAsyncContext<WorkflowContext>();
export function getContext(): WorkflowContext {
const context = _executorAsyncLocalStorage.getStore();
if (!context) {
throw new Error("No context found");
throw new Error("No current context found");
}
return context;
}
+33 -25
View File
@@ -1,22 +1,12 @@
import type {
Handler,
HandlerRef,
WorkflowEvent,
WorkflowEventData,
} from "fluere";
import type { Handler, WorkflowEvent, WorkflowEventData } from "fluere";
import { flattenEvents, isEventData, isPromiseLike } from "../utils";
import { _executorAsyncLocalStorage, type WorkflowContext } from "./context";
import {
_executorAsyncLocalStorage,
type HandlerContext,
type WorkflowContext,
} from "./context";
import { createAsyncContext } from "fluere/async-context";
type HandlerContext = {
abortController: AbortController;
handler: Handler<WorkflowEvent<any>[], any>;
inputs: WorkflowEventData<any>[];
outputs: WorkflowEventData<any>[];
prev: HandlerContext;
next: Set<HandlerContext>;
};
const handlerContextAsyncLocalStorage = createAsyncContext<HandlerContext>();
const eventContextWeakMap = new WeakMap<
@@ -27,7 +17,7 @@ const eventContextWeakMap = new WeakMap<
export type ExecutorParams = {
listeners: ReadonlyMap<
WorkflowEvent<any>[],
Set<HandlerRef<WorkflowEvent<any>[], any>>
Set<Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>>
>;
};
@@ -37,9 +27,10 @@ export const createContext = ({
const queue: WorkflowEventData<any>[] = [];
const runHandler = (
handler: Handler<WorkflowEvent<any>[], any>,
inputEvents: WorkflowEvent<any>[],
inputs: WorkflowEventData<any>[],
parentContext: HandlerContext,
) => {
): void => {
let handlerAbortController: AbortController;
const handlerContext: HandlerContext = {
get abortController() {
@@ -48,11 +39,20 @@ export const createContext = ({
}
return handlerAbortController;
},
async:
"constructor" in handler
? handler.constructor.name === "AsyncFunction"
: false,
pending: null,
handler,
inputEvents,
inputs,
outputs: [],
prev: parentContext,
next: new Set(),
get root() {
return handlerRootContext;
},
};
handlerContext.prev.next.add(handlerContext);
const workflowContext = createWorkflowContext(handlerContext);
@@ -66,11 +66,11 @@ export const createContext = ({
_executorAsyncLocalStorage.run(workflowContext, () => {
//#region middleware
let i = 0;
const next = () => {
const next = (context: HandlerContext) => {
if (i === cbs.length) {
let result: any;
try {
result = handler(...inputs);
result = context.handler(...context.inputs);
} catch (error) {
if (handlerAbortController ?? rootAbortController) {
(handlerAbortController ?? rootAbortController).abort(error);
@@ -81,6 +81,8 @@ export const createContext = ({
}
// return value is a special event
if (isPromiseLike(result)) {
(handlerContext as any).async = true;
(handlerContext as any).pending = result;
result.then((event) => {
if (isEventData(event)) {
workflowContext.sendEvent(event);
@@ -93,10 +95,10 @@ export const createContext = ({
const cb = cbs[i];
if (cb) {
i++;
cb(workflowContext, inputs, next);
cb(context, next);
}
};
next();
next(handlerContext);
//#endregion
});
});
@@ -108,13 +110,13 @@ export const createContext = ({
const inputs = flattenEvents(events, queueSnapshot);
return inputs.length === events.length;
})
.map(([events, refs]) => {
.map(([events, handlers]) => {
const inputs = flattenEvents(events, queueSnapshot);
inputs.forEach((input) => {
queue.splice(queue.indexOf(input), 1);
});
for (const ref of refs) {
runHandler(ref.handler, inputs, handlerContext);
for (const handler of handlers) {
runHandler(handler, events, inputs, handlerContext);
}
});
};
@@ -163,11 +165,17 @@ export const createContext = ({
}
return rootAbortController;
},
async: false,
pending: null,
inputEvents: [],
inputs: [],
outputs: [],
handler: null!,
prev: null!,
next: new Set(),
get root() {
return handlerRootContext;
},
};
const rootWorkflowContext = createWorkflowContext(handlerRootContext);
+23 -35
View File
@@ -1,7 +1,7 @@
import { type WorkflowEvent } from "./event";
import { type WorkflowEvent, type WorkflowEventData } from "./event";
import { createContext } from "./internal/executor";
import { type Handler, type HandlerRef } from "./internal/handler";
import type { WorkflowContext } from "./internal/context";
import { type Handler } from "./internal/handler";
import { getContext, type WorkflowContext } from "./internal/context";
export type Workflow = {
handle<
@@ -10,7 +10,7 @@ export type Workflow = {
>(
accept: AcceptEvents,
handler: Handler<AcceptEvents, Result>,
): HandlerRef<AcceptEvents, Result>;
): void;
createContext(): WorkflowContext;
};
@@ -18,7 +18,7 @@ export function createWorkflow(): Workflow {
const config = {
steps: new Map<
WorkflowEvent<any>[],
Set<HandlerRef<WorkflowEvent<any>[], any>>
Set<Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>>
>(),
};
@@ -29,43 +29,31 @@ export function createWorkflow(): Workflow {
>(
accept: AcceptEvents,
handler: Handler<AcceptEvents, Result>,
): HandlerRef<AcceptEvents, Result> => {
): void => {
// smoke test to check if we are outside the context
let success = false;
try {
getContext();
console.error("Calling handle inside of context is not allowed.");
success = true;
} catch {}
if (success) {
throw new Error("Calling handle inside of context is not allowed.");
}
if (config.steps.has(accept)) {
const set = config.steps.get(accept) as Set<
HandlerRef<AcceptEvents, Result>
Handler<AcceptEvents, Result>
>;
const ref: HandlerRef<AcceptEvents, Result> = {
get handler() {
return handler;
},
unsubscribe: () => {
set.delete(ref);
if (set.size === 0) {
config.steps.delete(accept);
}
},
};
set.add(ref);
return ref;
set.add(handler);
} else {
const set = new Set<HandlerRef<AcceptEvents, Result>>();
const ref: HandlerRef<AcceptEvents, Result> = {
get handler() {
return handler;
},
unsubscribe: () => {
set.delete(ref);
if (set.size === 0) {
config.steps.delete(accept);
}
},
};
set.add(ref);
const set = new Set<Handler<AcceptEvents, Result>>();
set.add(handler);
config.steps.set(
accept,
set as Set<HandlerRef<WorkflowEvent<any>[], any>>,
set as Set<
Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>
>,
);
return ref;
}
},
createContext() {
+36 -12
View File
@@ -1,27 +1,51 @@
import { type WorkflowContext, type Workflow, getContext } from "fluere";
export function withStore<T, Input extends void>(
export function withStore<
T,
Input extends void,
WorkflowLike extends {
createContext(): WorkflowContext;
} = {
createContext(): WorkflowContext;
},
>(
createStore: () => T,
workflow: Workflow,
): Omit<Workflow, "createContext"> & {
workflow: WorkflowLike,
): Omit<WorkflowLike, "createContext"> & {
createContext(): WorkflowContext & {
getStore(): T;
};
getStore(): T;
};
export function withStore<T, Input>(
export function withStore<
T,
Input,
WorkflowLike extends {
createContext(): WorkflowContext;
} = {
createContext(): WorkflowContext;
},
>(
createStore: (input: Input) => T,
workflow: Workflow,
): Omit<Workflow, "createContext"> & {
workflow: WorkflowLike,
): Omit<WorkflowLike, "createContext"> & {
createContext(input: Input): WorkflowContext & {
getStore(): T;
};
getStore(): T;
};
export function withStore<T, Input>(
export function withStore<
T,
Input,
WorkflowLike extends {
createContext(): WorkflowContext;
} = {
createContext(): WorkflowContext;
},
>(
createStore: (input: Input) => T,
workflow: Workflow,
): Omit<Workflow, "createContext"> & {
workflow: WorkflowLike,
): Omit<WorkflowLike, "createContext"> & {
createContext(input: Input): WorkflowContext & {
getStore(): T;
};
@@ -40,9 +64,9 @@ export function withStore<T, Input>(
const context = workflow.createContext() as WorkflowContext & {
getStore: () => T;
};
context.__internal__call_context.add((context, _, next) => {
(context as any).getStore = () => currentStore;
next();
context.__internal__call_context.add((_, next) => {
(getContext() as any).getStore = () => currentStore;
next(_);
});
(context as any).getStore = () => currentStore;
return context;
@@ -0,0 +1,152 @@
import type {
Handler,
Workflow,
WorkflowContext,
WorkflowEvent,
WorkflowEventData,
} from "fluere";
import { isPromiseLike } from "../core/utils";
import {
createHandlerDecorator,
decoratorRegistry,
} from "./trace-events/create-handler-decorator";
import { runOnce } from "./trace-events/run-once";
type TracingContext = Record<string, unknown>;
const tracingWeakMap = new WeakMap<
WorkflowContext,
WeakMap<
WorkflowEvent<any>[],
WeakMap<
Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>,
TracingContext
>
>
>();
export type HandlerRef<
AcceptEvents extends WorkflowEvent<any>[],
Result extends ReturnType<WorkflowEvent<any>["with"]> | void,
Fn extends Handler<AcceptEvents, Result>,
> = {
get handler(): Fn;
};
export function withTraceEvents<
WorkflowLike extends {
handle<
const AcceptEvents extends WorkflowEvent<any>[],
Result extends ReturnType<WorkflowEvent<any>["with"]> | void,
>(
accept: AcceptEvents,
handler: Handler<AcceptEvents, Result>,
): void;
createContext(): WorkflowContext;
},
>(
workflow: WorkflowLike,
): Omit<WorkflowLike, "handle"> & {
handle<
const AcceptEvents extends WorkflowEvent<any>[],
Result extends ReturnType<WorkflowEvent<any>["with"]> | void,
Fn extends Handler<AcceptEvents, Result>,
>(
accept: AcceptEvents,
handler: Fn,
): HandlerRef<AcceptEvents, Result, Fn>;
createContext(): WorkflowContext;
} {
return {
...workflow,
handle: <
const AcceptEvents extends WorkflowEvent<any>[],
Result extends ReturnType<WorkflowEvent<any>["with"]> | void,
Fn extends Handler<AcceptEvents, Result>,
>(
accept: AcceptEvents,
handler: Fn,
): HandlerRef<AcceptEvents, Result, Fn> => {
workflow.handle(accept, handler);
return {
get handler(): Fn {
return handler;
},
};
},
createContext(): WorkflowContext {
const context = workflow.createContext();
tracingWeakMap.set(context, new WeakMap());
context.__internal__call_context.add((handlerContext, next) => {
const inputEvents = handlerContext.inputEvents;
const handlersWeakMap = tracingWeakMap.get(context)!;
if (!handlersWeakMap.has(inputEvents)) {
handlersWeakMap.set(inputEvents, new WeakMap());
}
const handlerWeakMap = handlersWeakMap.get(inputEvents)!;
const originalHandler = handlerContext.handler;
let finalHandler = originalHandler;
let handlerMiddleware: Handler<
WorkflowEvent<any>[],
WorkflowEventData<any> | void
>;
if (!handlerWeakMap) {
throw new Error(
"Handler context is not defined, this should not happen. Please report this issue with a reproducible example.",
);
}
const tracingContext: TracingContext =
handlerWeakMap.get(originalHandler) ?? {};
if (!handlerWeakMap.has(originalHandler)) {
handlerWeakMap.set(originalHandler, tracingContext);
}
const onAfterHandlers = [] as (() => void)[];
const onBeforeHandlers = [] as ((
nextHandler: Handler<
WorkflowEvent<any>[],
WorkflowEventData<any> | void
>,
) => Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>)[];
handlerMiddleware = (...args) => {
const result = onBeforeHandlers.reduce((next, cb) => {
return cb(next);
}, finalHandler)(...args);
if (isPromiseLike(result)) {
return result.then(() => {
onAfterHandlers.forEach((cb) => {
cb();
});
});
} else {
onAfterHandlers.forEach((cb) => {
cb();
});
return result;
}
};
[...decoratorRegistry].forEach(
([name, { getInitialValue, onAfterHandler, onBeforeHandler }]) => {
if (!tracingContext[name]) {
tracingContext[name] = getInitialValue();
}
onBeforeHandlers.push((next) =>
onBeforeHandler(next, handlerContext, tracingContext[name]),
);
onAfterHandlers.push(() => {
tracingContext[name] = onAfterHandler(tracingContext[name]);
});
},
);
next({
...handlerContext,
handler: handlerMiddleware,
});
});
return context;
},
};
}
export { createHandlerDecorator, runOnce };
@@ -0,0 +1,46 @@
import type { Handler, WorkflowEvent, WorkflowEventData } from "fluere";
import type { HandlerContext } from "../../core/internal/context";
const namespace = "decorator" as const;
let counter = 0;
export const decoratorRegistry = new Map<
string,
{
debugLabel: string;
getInitialValue: () => any;
onBeforeHandler: (
handler: Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>,
handlerContext: Readonly<HandlerContext>,
metadata: any,
) => Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>;
onAfterHandler: (metadata: any) => any;
}
>();
export function createHandlerDecorator<Metadata>(config: {
debugLabel?: string;
getInitialValue: () => Metadata;
onBeforeHandler: (
handler: Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>,
handlerContext: HandlerContext,
metadata: Metadata,
) => Handler<WorkflowEvent<any>[], WorkflowEventData<any> | void>;
onAfterHandler: (metadata: Metadata) => Metadata;
}) {
const uid = `${namespace}:${counter++}`;
decoratorRegistry.set(uid, {
debugLabel: config.debugLabel ?? uid,
getInitialValue: config.getInitialValue,
onAfterHandler: config.onAfterHandler,
onBeforeHandler: config.onBeforeHandler,
});
return function <
const AcceptEvents extends WorkflowEvent<any>[],
Result extends ReturnType<WorkflowEvent<any>["with"]> | void,
Fn extends Handler<AcceptEvents, Result>,
>(handler: Fn) {
return handler;
};
}
@@ -0,0 +1,10 @@
import { createHandlerDecorator } from "./create-handler-decorator";
const noop: (...args: any[]) => void = function noop() {};
export const runOnce = createHandlerDecorator({
debugLabel: "onceHook",
getInitialValue: () => false,
onBeforeHandler: (handler, _, tracked) => (tracked ? noop : handler),
onAfterHandler: () => true,
});
+11 -14
View File
@@ -2,7 +2,6 @@ import {
type WorkflowContext,
getContext,
type Handler,
type HandlerRef,
type Workflow,
type WorkflowEvent,
type WorkflowEventData,
@@ -44,7 +43,7 @@ export type WithValidationWorkflow<
>(
accept: AcceptEvents,
handler: ValidationHandler<Validation, AcceptEvents, Result>,
): HandlerRef<AcceptEvents, Result>;
): void;
createContext(): WorkflowContext;
};
@@ -66,22 +65,18 @@ export function withValidation<
const store = getContext();
const originalSendEvent = store.sendEvent;
return (...inputs: WorkflowEventData<any>[]) => {
let matched = false;
for (let i = 0; i < outputs.length; i++) {
const output = outputs[i]!;
if (output.length === inputs.length) {
if (output.every((e, idx) => e.include(inputs[idx]))) {
matched = true;
break;
return originalSendEvent(...inputs);
}
}
}
if (matched) {
console.warn(
"Invalid input detected [%s]",
inputs.map((i) => i.data).join(", "),
);
}
console.warn(
"Invalid input detected [%s]",
inputs.map((i) => i.data).join(", "),
);
return originalSendEvent(...inputs);
};
};
@@ -102,9 +97,11 @@ export function withValidation<
},
createContext(): WorkflowContext {
const context = workflow.createContext();
context.__internal__call_context.add((context, inputs, next) => {
(context as any).safeSendEvent = createSafeSendEvent(...inputs);
next();
context.__internal__call_context.add((context, next) => {
(getContext() as any).safeSendEvent = createSafeSendEvent(
...context.inputs,
);
next(context);
});
return context;
},
+10 -11
View File
@@ -5,16 +5,15 @@ import {
type WorkflowEventConfig,
} from "fluere";
export const zodEvent = <T>(
export const zodEvent = <T, DebugLabel extends string>(
schema: z.ZodType<T>,
config?: WorkflowEventConfig,
): WorkflowEvent<T> => {
const event = workflowEvent<T>(config);
return {
include: event.include,
with(data: T) {
schema.parse(data);
return event.with(data);
},
} as unknown as WorkflowEvent<T>;
config?: WorkflowEventConfig<DebugLabel>,
): WorkflowEvent<T, DebugLabel> => {
const event = workflowEvent<T, DebugLabel>(config);
const originalWith = event.with;
event.with = (data: T) => {
schema.parse(data);
return originalWith(data);
};
return event;
};
@@ -0,0 +1,79 @@
import { describe, test, expect } from "vitest";
import { createWorkflow, eventSource, type WorkflowEvent } from "fluere";
import { withStore } from "fluere/middleware/store";
import { withTraceEvents } from "fluere/middleware/trace-events";
import { withValidation } from "fluere/middleware/validation";
import { zodEvent } from "fluere/util/zod";
import { z } from "zod";
import { webcrypto } from "node:crypto";
describe("full workflow middleware", () => {
const createFullWorkflow = <
const Validation extends [
inputs: WorkflowEvent<any>[],
outputs: WorkflowEvent<any>[],
][],
T,
Input,
>(
validation: Validation,
createStore: (input: Input) => T,
) => {
return withStore(
createStore,
withValidation<Validation>(withTraceEvents(createWorkflow()), validation),
);
};
test("type check", () => {
const startEvent = zodEvent(z.string(), {
debugLabel: "start",
});
const messageEvent = zodEvent(z.string(), {
debugLabel: "message",
});
const stopEvent = zodEvent(z.string(), {
debugLabel: "stop",
});
const workflow = createFullWorkflow(
[[[startEvent], [stopEvent]]],
() => ({}),
);
workflow.handle([startEvent], (sendEvent, events) => {
// @ts-expect-error
sendEvent(messageEvent.with());
sendEvent(stopEvent.with(""));
});
});
test("basic", async () => {
const startEvent = zodEvent(z.string(), {
debugLabel: "start",
});
const stopEvent = zodEvent(z.string(), {
debugLabel: "stop",
});
const workflow = createFullWorkflow(
[[[startEvent], [stopEvent]]],
(id: string) => ({
id,
}),
);
workflow.handle([startEvent], (sendEvent, start) => {
expect(start.data).toBe("start");
sendEvent(stopEvent.with(workflow.getStore().id));
});
const id = webcrypto.randomUUID();
const { sendEvent, stream } = workflow.createContext(id);
const events = [];
sendEvent(startEvent.with("start"));
for await (const event of stream) {
events.push(event);
if (stopEvent.include(event)) {
break;
}
}
expect(events.length).toBe(2);
expect(events.map(eventSource)).toEqual([startEvent, stopEvent]);
});
});
@@ -0,0 +1,110 @@
import { describe, test, vi, expectTypeOf, type Mock, expect } from "vitest";
import { createWorkflow, workflowEvent } from "fluere";
import {
withTraceEvents,
runOnce,
createHandlerDecorator,
} from "fluere/middleware/trace-events";
describe("with trace events", () => {
test("runOnce", () => {
const workflow = withTraceEvents(createWorkflow());
const startEvent = workflowEvent();
const ref = workflow.handle([startEvent], vi.fn(runOnce(() => {})));
expectTypeOf(ref.handler).toEqualTypeOf<Mock<() => void>>();
{
const { sendEvent } = workflow.createContext();
expect(ref.handler).not.toHaveBeenCalled();
sendEvent(startEvent.with());
expect(ref.handler).toBeCalledTimes(1);
sendEvent(startEvent.with());
expect(ref.handler).toBeCalledTimes(1);
}
ref.handler.mockReset();
{
const { sendEvent } = workflow.createContext();
expect(ref.handler).not.toHaveBeenCalled();
sendEvent(startEvent.with());
expect(ref.handler).toBeCalledTimes(1);
}
});
test("example: no parallel", async () => {
const workflow = withTraceEvents(createWorkflow());
const startEvent = workflowEvent();
type Metadata = {
running: boolean;
};
const getInitialValue = vi.fn(
(): Metadata => ({
running: false,
}),
);
const noParallel = createHandlerDecorator<Metadata>({
getInitialValue,
onAfterHandler: () => ({
running: false,
}),
onBeforeHandler: (h, handlerContext, metadata) => async () => {
metadata.running = true;
const root = handlerContext.root;
const similarContexts = [];
const queue = [root];
while (queue.length > 0) {
const current = queue.pop();
if (!current) {
break;
}
if (current.handler === handlerContext.handler) {
similarContexts.push(current);
}
queue.push(...current.next);
}
const asyncContexts = similarContexts.filter((c) => c.async);
for (const context of asyncContexts) {
await context.pending;
}
return h();
},
});
let count = 0;
let resolveNext: () => void = null!;
let p = new Promise<void>((_resolve) => {
resolveNext = _resolve;
});
let result: number[] = [];
const ref = workflow.handle(
[startEvent],
vi.fn(
noParallel(async () => {
count++;
const curr = count;
await p.then(() => {
result.push(curr);
p = new Promise<void>((_resolve) => {
resolveNext = _resolve;
});
});
}),
),
);
expectTypeOf(ref.handler).toEqualTypeOf<Mock<() => Promise<void>>>();
const { sendEvent } = workflow.createContext();
expect(ref.handler).not.toHaveBeenCalled();
sendEvent(startEvent.with());
sendEvent(startEvent.with());
sendEvent(startEvent.with());
expect(ref.handler).toBeCalledTimes(1);
expect(result).toEqual([]);
resolveNext();
vi.waitFor(() => expect(result).toEqual([1]));
resolveNext();
vi.waitFor(() => expect(result).toEqual([1, 2]));
resolveNext();
vi.waitFor(() => expect(result).toEqual([1, 2, 3]));
sendEvent(startEvent.with());
vi.waitFor(() => expect(result).toEqual([1, 2, 3]));
resolveNext();
vi.waitFor(() => expect(result).toEqual([1, 2, 3, 4]));
});
});
+17 -5
View File
@@ -13,12 +13,18 @@ describe("with directed graph", () => {
});
test("basic", async () => {
const startEvent = workflowEvent<void, "start">();
const startEvent = workflowEvent<void, "start">({
debugLabel: "start",
});
const nonEvent = workflowEvent<number, "non">({
debugLabel: "non",
});
const parseEvent = workflowEvent<string, "parse">();
const stopEvent = workflowEvent<number, "stop">();
const parseEvent = workflowEvent<string, "parse">({
debugLabel: "parse",
});
const stopEvent = workflowEvent<number, "stop">({
debugLabel: "stop",
});
const workflow = withValidation(createWorkflow(), [
[[startEvent], [stopEvent]],
[[startEvent], [parseEvent, parseEvent]],
@@ -39,11 +45,17 @@ describe("with directed graph", () => {
sendEvent(startEvent.with());
await find(stream, stopEvent);
expect(fn).toBeCalled();
expect(consoleWarnMock).toHaveBeenCalledOnce();
expect(consoleWarnMock).toHaveBeenLastCalledWith(
expect(consoleWarnMock).toBeCalledTimes(2);
expect(consoleWarnMock).toHaveBeenNthCalledWith(
1,
"Invalid input detected [%s]",
"1",
);
expect(consoleWarnMock).toHaveBeenNthCalledWith(
2,
"Invalid input detected [%s]",
"",
);
});
test("promise", async () => {
+4 -1
View File
@@ -6,7 +6,10 @@
"paths": {
"fluere": ["./src/core/index.ts"],
"fluere/stream": ["./src/stream/index.ts"],
"fluere/async-context": ["./src/async-context/index.browser.ts"]
"fluere/async-context": ["./src/async-context/index.browser.ts"],
"fluere/interrupter/*": ["./src/interrupter/*.ts"],
"fluere/middleware/*": ["./src/middleware/*.ts"],
"fluere/util/*": ["./src/util/*.ts"]
}
},
"include": ["./src"]
+2 -2
View File
@@ -21,8 +21,8 @@
"fluere/async-context": ["./src/async-context/index.ts"],
"fluere/interrupter/*": ["./src/interrupter/*.ts"],
"fluere/middleware/*": ["./src/middleware/*.ts"],
"fluere/util": ["./src/util/index.ts"],
"fluere/stream": ["./src/stream/index.ts"],
"fluere/util/*": ["./src/util/*.ts"],
"fluere/stream": ["./src/stream/index.ts"]
}
},
"include": ["./src"],
+1 -3
View File
@@ -1,3 +1 @@
export default [
'packages/*',
]
export default ["packages/*"];