mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(llma): eval retrieval performance (#40861)
This commit is contained in:
@@ -47,7 +47,7 @@ describe('TemporalService', () => {
|
||||
|
||||
describe('connection management', () => {
|
||||
it('creates client with correct config', async () => {
|
||||
await service.startEvaluationRunWorkflow('test', 'test')
|
||||
await service.startEvaluationRunWorkflow('test', 'test', '2024-01-01T00:00:00Z')
|
||||
|
||||
expect(Connection.connect).toHaveBeenCalledWith({
|
||||
address: 'localhost:7233',
|
||||
@@ -61,7 +61,7 @@ describe('TemporalService', () => {
|
||||
hub.TEMPORAL_CLIENT_KEY = 'client-key'
|
||||
|
||||
const newService = new TemporalService(hub)
|
||||
await newService.startEvaluationRunWorkflow('test', 'test')
|
||||
await newService.startEvaluationRunWorkflow('test', 'test', '2024-01-01T00:00:00Z')
|
||||
|
||||
expect(Connection.connect).toHaveBeenCalledWith({
|
||||
address: 'localhost:7233',
|
||||
@@ -76,7 +76,7 @@ describe('TemporalService', () => {
|
||||
})
|
||||
|
||||
it('disconnects client properly', async () => {
|
||||
await service.startEvaluationRunWorkflow('test', 'test')
|
||||
await service.startEvaluationRunWorkflow('test', 'test', '2024-01-01T00:00:00Z')
|
||||
await service.disconnect()
|
||||
|
||||
expect(mockConnection.close).toHaveBeenCalled()
|
||||
@@ -85,7 +85,7 @@ describe('TemporalService', () => {
|
||||
|
||||
describe('workflow triggering', () => {
|
||||
it('starts evaluation run workflow with correct parameters', async () => {
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-456')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-456', '2024-01-01T00:00:00Z')
|
||||
|
||||
expect(mockClient.workflow.start).toHaveBeenCalledWith('run-evaluation', {
|
||||
taskQueue: 'general-purpose-task-queue',
|
||||
@@ -95,14 +95,15 @@ describe('TemporalService', () => {
|
||||
{
|
||||
evaluation_id: 'eval-123',
|
||||
target_event_id: 'event-456',
|
||||
timestamp: '2024-01-01T00:00:00Z',
|
||||
},
|
||||
],
|
||||
})
|
||||
})
|
||||
|
||||
it('generates deterministic workflow IDs', async () => {
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-456')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-456')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-456', '2024-01-01T00:00:00Z')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-456', '2024-01-01T00:00:00Z')
|
||||
|
||||
const calls = (mockClient.workflow.start as jest.Mock).mock.calls
|
||||
const workflowId1 = calls[0][1].workflowId
|
||||
@@ -113,8 +114,8 @@ describe('TemporalService', () => {
|
||||
})
|
||||
|
||||
it('generates different workflow IDs for different events', async () => {
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-1')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-2')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-1', '2024-01-01T00:00:00Z')
|
||||
await service.startEvaluationRunWorkflow('eval-123', 'event-2', '2024-01-01T00:00:00Z')
|
||||
|
||||
const calls = (mockClient.workflow.start as jest.Mock).mock.calls
|
||||
const workflowId1 = calls[0][1].workflowId
|
||||
@@ -126,7 +127,7 @@ describe('TemporalService', () => {
|
||||
})
|
||||
|
||||
it('returns workflow handle on success', async () => {
|
||||
const handle = await service.startEvaluationRunWorkflow('eval-123', 'event-456')
|
||||
const handle = await service.startEvaluationRunWorkflow('eval-123', 'event-456', '2024-01-01T00:00:00Z')
|
||||
|
||||
expect(handle).toBeDefined()
|
||||
expect(handle).toBe(mockWorkflowHandle)
|
||||
@@ -135,9 +136,9 @@ describe('TemporalService', () => {
|
||||
it('throws on workflow start failure', async () => {
|
||||
;(mockClient.workflow.start as jest.Mock).mockRejectedValue(new Error('Temporal unavailable'))
|
||||
|
||||
await expect(service.startEvaluationRunWorkflow('eval-123', 'event-456')).rejects.toThrow(
|
||||
'Temporal unavailable'
|
||||
)
|
||||
await expect(
|
||||
service.startEvaluationRunWorkflow('eval-123', 'event-456', '2024-01-01T00:00:00Z')
|
||||
).rejects.toThrow('Temporal unavailable')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -93,7 +93,11 @@ export class TemporalService {
|
||||
return client
|
||||
}
|
||||
|
||||
async startEvaluationRunWorkflow(evaluationId: string, targetEventId: string): Promise<WorkflowHandle> {
|
||||
async startEvaluationRunWorkflow(
|
||||
evaluationId: string,
|
||||
targetEventId: string,
|
||||
timestamp: string
|
||||
): Promise<WorkflowHandle> {
|
||||
const client = await this.ensureConnected()
|
||||
|
||||
const workflowId = `${evaluationId}-${targetEventId}-ingestion`
|
||||
@@ -103,6 +107,7 @@ export class TemporalService {
|
||||
{
|
||||
evaluation_id: evaluationId,
|
||||
target_event_id: targetEventId,
|
||||
timestamp: timestamp,
|
||||
},
|
||||
],
|
||||
taskQueue: EVALUATION_TASK_QUEUE,
|
||||
@@ -116,6 +121,7 @@ export class TemporalService {
|
||||
workflowId,
|
||||
evaluationId,
|
||||
targetEventId,
|
||||
timestamp,
|
||||
})
|
||||
|
||||
return handle
|
||||
|
||||
@@ -69,6 +69,7 @@ export function filterAndParseMessages(messages: Message[]): RawKafkaEvent[] {
|
||||
}
|
||||
})
|
||||
.filter((event): event is RawKafkaEvent => event !== null)
|
||||
.filter((event) => event.event === '$ai_generation')
|
||||
}
|
||||
|
||||
export function groupEventsByTeam(events: RawKafkaEvent[]): Map<number, RawKafkaEvent[]> {
|
||||
@@ -288,6 +289,6 @@ async function processEventEvaluationMatch(
|
||||
|
||||
evaluationMatchesCounter.labels({ outcome: 'matched' }).inc()
|
||||
|
||||
await temporalService.startEvaluationRunWorkflow(evaluationDefinition.id, event.uuid)
|
||||
await temporalService.startEvaluationRunWorkflow(evaluationDefinition.id, event.uuid, event.timestamp)
|
||||
evaluationSchedulerEventsProcessed.labels({ status: 'success' }).inc()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user