Make callbacks not be awaited by default, with option to enable waiting globally or per handler

This commit is contained in:
Nuno Campos
2023-05-24 16:31:33 +01:00
parent 4821c431a3
commit 57be29d000
7 changed files with 280 additions and 194 deletions
+1
View File
@@ -51,6 +51,7 @@ module.exports = {
"no-restricted-syntax": 0,
"no-shadow": 0,
"no-continue": 0,
"no-void": 0,
"no-underscore-dangle": 0,
"no-use-before-define": 0,
"no-useless-constructor": 0,
+6
View File
@@ -9,6 +9,12 @@ module.exports = {
transform: {
"^.+\\.m?[tj]sx?$": ["ts-jest", { useESM: true }],
},
transformIgnorePatterns: [
"/node_modules/",
"\\.pnp\\.[^\\/]+$",
"./scripts/jest-setup-after-env.js",
],
setupFiles: ["dotenv/config"],
setupFilesAfterEnv: ["./scripts/jest-setup-after-env.js"],
testTimeout: 20_000,
};
@@ -0,0 +1,3 @@
import { awaitAllCallbacks } from "../src/callbacks/promises.js";
afterAll(awaitAllCallbacks);
+6
View File
@@ -176,6 +176,12 @@ export abstract class BaseCallbackHandler
ignoreAgent = false;
awaitHandlers =
typeof process !== "undefined"
? // eslint-disable-next-line no-process-env
process.env?.LANGCHAIN_AWAIT_CALLBACKS === "true"
: false;
constructor(input?: BaseCallbackHandlerInput) {
super();
if (input) {
+2
View File
@@ -27,3 +27,5 @@ export {
TraceGroup,
traceAsGroup,
} from "./manager.js";
export { awaitAllCallbacks, consumeCallback } from "./promises.js";
+235 -194
View File
@@ -18,6 +18,7 @@ import {
LangChainTracer,
LangChainTracerFields,
} from "./handlers/tracer_langchain.js";
import { consumeCallback } from "./promises.js";
type BaseCallbackManagerMethods = {
[K in keyof CallbackHandlerMethods]?: (
@@ -56,15 +57,17 @@ class BaseRunManager {
async handleText(text: string): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
try {
await handler.handleText?.(text, this.runId, this._parentRunId);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleText: ${err}`
);
}
})
this.handlers.map((handler) =>
consumeCallback(async () => {
try {
await handler.handleText?.(text, this.runId, this._parentRunId);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleText: ${err}`
);
}
}, handler.awaitHandlers)
)
);
}
}
@@ -75,53 +78,67 @@ export class CallbackManagerForLLMRun
{
async handleLLMNewToken(token: string): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMNewToken?.(
token,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMNewToken: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMNewToken?.(
token,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMNewToken: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
async handleLLMError(err: Error | unknown): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMError?.(err, this.runId, this._parentRunId);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMError: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMError?.(
err,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMError: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
async handleLLMEnd(output: LLMResult): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMEnd?.(output, this.runId, this._parentRunId);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMEnd: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMEnd?.(
output,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMEnd: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
}
@@ -139,81 +156,89 @@ export class CallbackManagerForChainRun
async handleChainError(err: Error | unknown): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreChain) {
try {
await handler.handleChainError?.(
err,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleChainError: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreChain) {
try {
await handler.handleChainError?.(
err,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleChainError: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
async handleChainEnd(output: ChainValues): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreChain) {
try {
await handler.handleChainEnd?.(
output,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleChainEnd: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreChain) {
try {
await handler.handleChainEnd?.(
output,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleChainEnd: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
async handleAgentAction(action: AgentAction): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreAgent) {
try {
await handler.handleAgentAction?.(
action,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleAgentAction: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreAgent) {
try {
await handler.handleAgentAction?.(
action,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleAgentAction: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
async handleAgentEnd(action: AgentFinish): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreAgent) {
try {
await handler.handleAgentEnd?.(
action,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleAgentEnd: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreAgent) {
try {
await handler.handleAgentEnd?.(
action,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleAgentEnd: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
}
@@ -231,37 +256,45 @@ export class CallbackManagerForToolRun
async handleToolError(err: Error | unknown): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreAgent) {
try {
await handler.handleToolError?.(err, this.runId, this._parentRunId);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleToolError: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreAgent) {
try {
await handler.handleToolError?.(
err,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleToolError: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
async handleToolEnd(output: string): Promise<void> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreAgent) {
try {
await handler.handleToolEnd?.(
output,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleToolEnd: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreAgent) {
try {
await handler.handleToolEnd?.(
output,
this.runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleToolEnd: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
}
}
@@ -293,23 +326,25 @@ export class CallbackManager
extraParams: Record<string, unknown> | undefined = undefined
): Promise<CallbackManagerForLLMRun> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMStart?.(
llm,
prompts,
runId,
this._parentRunId,
extraParams
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMStart: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreLLM) {
try {
await handler.handleLLMStart?.(
llm,
prompts,
runId,
this._parentRunId,
extraParams
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMStart: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
return new CallbackManagerForLLMRun(
runId,
@@ -328,34 +363,36 @@ export class CallbackManager
): Promise<CallbackManagerForLLMRun> {
let messageStrings: string[];
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreLLM) {
try {
if (handler.handleChatModelStart)
await handler.handleChatModelStart?.(
llm,
messages,
runId,
this._parentRunId,
extraParams
);
else if (handler.handleLLMStart) {
messageStrings = messages.map((x) => getBufferString(x));
await handler.handleLLMStart?.(
llm,
messageStrings,
runId,
this._parentRunId,
extraParams
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreLLM) {
try {
if (handler.handleChatModelStart)
await handler.handleChatModelStart?.(
llm,
messages,
runId,
this._parentRunId,
extraParams
);
else if (handler.handleLLMStart) {
messageStrings = messages.map((x) => getBufferString(x));
await handler.handleLLMStart?.(
llm,
messageStrings,
runId,
this._parentRunId,
extraParams
);
}
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMStart: ${err}`
);
}
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleLLMStart: ${err}`
);
}
}
})
}, handler.awaitHandlers)
)
);
return new CallbackManagerForLLMRun(
runId,
@@ -371,22 +408,24 @@ export class CallbackManager
runId = uuidv4()
): Promise<CallbackManagerForChainRun> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreChain) {
try {
await handler.handleChainStart?.(
chain,
inputs,
runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleChainStart: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreChain) {
try {
await handler.handleChainStart?.(
chain,
inputs,
runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleChainStart: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
return new CallbackManagerForChainRun(
runId,
@@ -402,22 +441,24 @@ export class CallbackManager
runId = uuidv4()
): Promise<CallbackManagerForToolRun> {
await Promise.all(
this.handlers.map(async (handler) => {
if (!handler.ignoreAgent) {
try {
await handler.handleToolStart?.(
tool,
input,
runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleToolStart: ${err}`
);
this.handlers.map((handler) =>
consumeCallback(async () => {
if (!handler.ignoreAgent) {
try {
await handler.handleToolStart?.(
tool,
input,
runId,
this._parentRunId
);
} catch (err) {
console.error(
`Error in handler ${handler.constructor.name}, handleToolStart: ${err}`
);
}
}
}
})
}, handler.awaitHandlers)
)
);
return new CallbackManagerForToolRun(
runId,
+27
View File
@@ -0,0 +1,27 @@
import PQueueMod from "p-queue";
const PQueue =
"default" in PQueueMod ? /* #__PURE__ */ PQueueMod.default : PQueueMod;
const queue = /* #__PURE__ */ new PQueue({
autoStart: true,
});
/**
* Consume a promise, either adding it to the queue or waiting for it to resolve
* @param promise Promise to consume
* @param wait Whether to wait for the promise to resolve or resolve immediately
*/
export async function consumeCallback<T>(
promiseFn: () => Promise<T> | T | void,
wait: boolean
): Promise<void> {
if (wait === true) {
await promiseFn();
} else {
void queue.add(promiseFn);
}
}
export function awaitAllCallbacks(): Promise<void> {
return queue.onIdle();
}