refactor: add drop reason metric (#39613)

This commit is contained in:
Paweł Ledwoń
2025-10-14 12:29:23 +02:00
committed by GitHub
parent ac032db67f
commit 5889fed1bd
16 changed files with 36 additions and 33 deletions

View File

@@ -293,7 +293,7 @@ export class CookielessManager {
// Drop all cookieless events if there are any errors.
// We fail close here as Cookieless is a new feature, not available for general use yet, and we don't want any
// errors to interfere with the processing of other events.
return this.dropAllCookielessEvents(events, 'cookieless-fail-close')
return this.dropAllCookielessEvents(events, 'cookieless_fail_close')
}
}
@@ -324,7 +324,7 @@ export class CookielessManager {
drop_cause: 'cookieless_disallowed_event',
})
.inc()
results[i] = drop('Event type not supported in cookieless mode')
results[i] = drop('cookieless_unsupported_event')
continue
}
if (
@@ -338,7 +338,7 @@ export class CookielessManager {
drop_cause: 'cookieless_stateless_disallowed_identify',
})
.inc()
results[i] = drop('$identify not supported in stateless cookieless mode')
results[i] = drop('cookieless_stateless_no_identify')
continue
}
@@ -353,7 +353,7 @@ export class CookielessManager {
drop_cause: 'cookieless_team_disabled',
})
.inc()
results[i] = drop('Cookieless disabled for team')
results[i] = drop('cookieless_team_disabled')
continue
}
const timestamp = event.timestamp ?? event.sent_at ?? event.now
@@ -365,7 +365,7 @@ export class CookielessManager {
drop_cause: 'cookieless_no_timestamp',
})
.inc()
results[i] = drop('Missing timestamp')
results[i] = drop('cookieless_missing_timestamp')
continue
}
@@ -398,7 +398,9 @@ export class CookielessManager {
: 'cookieless_missing_host',
})
.inc()
results[i] = drop(!userAgent ? 'Missing user agent' : !ip ? 'Missing IP' : 'Missing host')
results[i] = drop(
!userAgent ? 'cookieless_missing_ua' : !ip ? 'cookieless_missing_ip' : 'cookieless_missing_host'
)
continue
}

View File

@@ -46,7 +46,7 @@ describe('createApplyDropRestrictionsStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Event dropped due to token restrictions'))
expect(result).toEqual(drop('blocked_token'))
expect(eventIngestionRestrictionManager.shouldDropEvent).toHaveBeenCalledWith(
'blocked-token-abc',
'blocked-user-def'

View File

@@ -27,7 +27,7 @@ export function createApplyDropRestrictionsStep<T extends { headers: EventHeader
drop_cause: 'blocked_token',
})
.inc()
return drop('Event dropped due to token restrictions')
return drop('blocked_token')
}
return Promise.resolve(ok(input))

View File

@@ -28,7 +28,7 @@ describe('createDropExceptionEventsStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Exception events are processed separately in Rust'))
expect(result).toEqual(drop('exception_event'))
})
it('should allow non-exception events', async () => {

View File

@@ -14,7 +14,7 @@ export function createDropExceptionEventsStep<T extends { event: IncomingEvent }
drop_cause: 'exception_event',
})
.inc()
return Promise.resolve(drop('Exception events are processed separately in Rust'))
return Promise.resolve(drop('exception_event'))
}
return Promise.resolve(ok(input))

View File

@@ -218,7 +218,7 @@ describe('createParseKafkaMessageStep', () => {
const input = { message: mockMessage }
const result = await step(input)
expect(result).toEqual(drop('Failed to parse Kafka message'))
expect(result).toEqual(drop('failed_parse_message'))
expect(mockLogger.warn).toHaveBeenCalledWith('Failed to parse Kafka message', {
error: expect.any(Error),
})
@@ -232,7 +232,7 @@ describe('createParseKafkaMessageStep', () => {
const input = { message: mockMessage }
const result = await step(input)
expect(result).toEqual(drop('Failed to parse Kafka message'))
expect(result).toEqual(drop('failed_parse_message'))
expect(mockLogger.warn).toHaveBeenCalledWith('Failed to parse Kafka message', {
error: expect.any(Error),
})
@@ -246,7 +246,7 @@ describe('createParseKafkaMessageStep', () => {
const input = { message: mockMessage }
const result = await step(input)
expect(result).toEqual(drop('Failed to parse Kafka message'))
expect(result).toEqual(drop('failed_parse_message'))
expect(mockLogger.warn).toHaveBeenCalledWith('Failed to parse Kafka message', {
error: expect.any(Error),
})
@@ -265,7 +265,7 @@ describe('createParseKafkaMessageStep', () => {
const input = { message: mockMessage }
const result = await step(input)
expect(result).toEqual(drop('Failed to parse Kafka message'))
expect(result).toEqual(drop('failed_parse_message'))
expect(mockLogger.warn).toHaveBeenCalledWith('Failed to parse Kafka message', {
error: expect.any(Error),
})
@@ -286,7 +286,7 @@ describe('createParseKafkaMessageStep', () => {
const input = { message: mockMessage }
const result = await step(input)
expect(result).toEqual(drop('Failed to parse Kafka message'))
expect(result).toEqual(drop('failed_parse_message'))
expect(mockLogger.warn).toHaveBeenCalledWith('Failed to parse Kafka message', {
error: expect.any(Error),
})
@@ -305,7 +305,7 @@ describe('createParseKafkaMessageStep', () => {
const input = { message: mockMessage }
const result = await step(input)
expect(result).toEqual(drop('Failed to parse Kafka message'))
expect(result).toEqual(drop('failed_parse_message'))
expect(mockLogger.warn).toHaveBeenCalledWith('Failed to parse Kafka message', {
error: expect.any(Error),
})

View File

@@ -30,7 +30,7 @@ export function createParseKafkaMessageStep<T extends { message: Message }>(): P
const parsedEvent = parseKafkaMessage(message)
if (!parsedEvent) {
return Promise.resolve(drop('Failed to parse Kafka message'))
return Promise.resolve(drop('failed_parse_message'))
}
return Promise.resolve(ok({ ...input, event: parsedEvent }))

View File

@@ -77,7 +77,7 @@ describe('createResolveTeamStep()', () => {
event: { event: { ...pipelineEvent }, message: {} as Message } as IncomingEvent,
}
const response = await step(input)
expect(response).toEqual(drop('Failed to resolve team'))
expect(response).toEqual(drop('failed_resolve_team'))
expect(await getMetricValues('ingestion_event_dropped_total')).toEqual([
{
labels: {
@@ -96,7 +96,7 @@ describe('createResolveTeamStep()', () => {
event: { event: { ...pipelineEvent, token: 'unknown' }, message: {} as Message } as IncomingEvent,
}
const response = await step(input)
expect(response).toEqual(drop('Failed to resolve team'))
expect(response).toEqual(drop('failed_resolve_team'))
expect(await getMetricValues('ingestion_event_dropped_total')).toEqual([
{
labels: {
@@ -137,7 +137,7 @@ describe('createResolveTeamStep()', () => {
event: { event: { ...pipelineEvent, team_id: 3 }, message: {} as Message } as IncomingEvent,
}
const response = await step(input)
expect(response).toEqual(drop('Failed to resolve team'))
expect(response).toEqual(drop('failed_resolve_team'))
expect(await getMetricValues('ingestion_event_dropped_total')).toEqual([
{
labels: {

View File

@@ -58,7 +58,7 @@ export function createResolveTeamStep<T extends { message: Message; headers: Eve
const eventWithTeam = await resolveTeam(hub, message, headers, event.event)
if (!eventWithTeam) {
return drop('Failed to resolve team')
return drop('failed_resolve_team')
}
return ok({ ...input, eventWithTeam })

View File

@@ -55,7 +55,7 @@ describe('createValidateEventPropertiesStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Group key too long'))
expect(result).toEqual(drop('group_key_too_long'))
})
it('should allow $groupidentify events with group_key shorter than 400 characters', async () => {

View File

@@ -35,7 +35,7 @@ export function createValidateEventPropertiesStep<T extends { eventWithTeam: Inc
drop_cause: 'group_key_too_long',
})
.inc()
return drop('Group key too long')
return drop('group_key_too_long')
}
}

View File

@@ -61,7 +61,7 @@ describe('createValidateEventUuidStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Event has invalid UUID'))
expect(result).toEqual(drop('invalid_uuid'))
expect(mockCaptureIngestionWarning).toHaveBeenCalledWith(
mockHub.db.kafkaProducer,
1,
@@ -76,7 +76,7 @@ describe('createValidateEventUuidStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Event has invalid UUID'))
expect(result).toEqual(drop('invalid_uuid'))
expect(mockCaptureIngestionWarning).toHaveBeenCalledWith(
mockHub.db.kafkaProducer,
1,
@@ -91,7 +91,7 @@ describe('createValidateEventUuidStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Event has invalid UUID'))
expect(result).toEqual(drop('invalid_uuid'))
expect(mockCaptureIngestionWarning).toHaveBeenCalledWith(
mockHub.db.kafkaProducer,
1,
@@ -106,7 +106,7 @@ describe('createValidateEventUuidStep', () => {
const result = await step(input)
expect(result).toEqual(drop('Event has invalid UUID'))
expect(result).toEqual(drop('invalid_uuid'))
expect(mockCaptureIngestionWarning).toHaveBeenCalledWith(
mockHub.db.kafkaProducer,
1,

View File

@@ -44,7 +44,7 @@ export function createValidateEventUuidStep<T extends { eventWithTeam: IncomingE
const { eventWithTeam } = input
const isValid = await isEventUuidValid(eventWithTeam, hub)
if (!isValid) {
return drop('Event has invalid UUID')
return drop('invalid_uuid')
}
return ok(input)
}

View File

@@ -46,6 +46,7 @@ export const invalidTimestampCounter = new Counter({
export const droppedEventCounter = new Counter({
name: 'event_pipeline_dropped_events_total',
help: 'Count of events dropped by plugin server',
labelNames: ['reason'],
})
export const tokenOrTeamPresentCounter = new Counter({

View File

@@ -197,7 +197,7 @@ export class EventPipelineRunner {
)
)
return drop('Invalid event for provided flags', kafkaAcks)
return drop('invalid_event_for_flags', kafkaAcks)
}
// If person processing is disabled, go ahead and remove person related keys before
@@ -239,7 +239,7 @@ export class EventPipelineRunner {
{ alwaysSend: true }
)
return drop('Client ingestion warning event', kafkaAcks)
return drop('client_ingestion_warning', kafkaAcks)
}
if (event.event === '$$heatmap') {
@@ -261,7 +261,7 @@ export class EventPipelineRunner {
if (dropOldEventsResult == null) {
// TODO: We pass kafkaAcks, so the side effects should be merged, but this needs to be refactored
return drop('Event too old', kafkaAcks)
return drop('event_too_old', kafkaAcks)
}
const transformResult = await this.runStep<TransformationResult, typeof transformEventStep>(
@@ -279,7 +279,7 @@ export class EventPipelineRunner {
if (transformedEvent === null) {
// TODO: We pass kafkaAcks, so the side effects should be merged, but this needs to be refactored
return drop('Event dropped by transformation', kafkaAcks)
return drop('dropped_by_transformation', kafkaAcks)
}
const normalizeResult = await this.runStep<[PluginEvent, DateTime], typeof normalizeEventStep>(

View File

@@ -330,5 +330,5 @@ export function logDroppedMessage(originalMessage: Message, reason: string, step
reason,
})
droppedEventCounter.inc()
droppedEventCounter.labels({ reason }).inc()
}