chore(workflows): remove Mailjet as email provider for Workflows (#41141)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Haven
2025-11-10 10:43:59 -06:00
committed by GitHub
parent 331865023a
commit fa9b9401d3
21 changed files with 93 additions and 1034 deletions

View File

@@ -125,7 +125,6 @@ export class CdpApi {
router.post('/public/webhooks/:webhook_id', asyncHandler(this.handleWebhook()))
router.get('/public/webhooks/:webhook_id', asyncHandler(this.handleWebhook()))
router.get('/public/m/pixel', asyncHandler(this.getEmailTrackingPixel()))
router.post('/public/m/mailjet_webhook', asyncHandler(this.postMailjetWebhook()))
router.post('/public/m/ses_webhook', express.text(), asyncHandler(this.postSesWebhook()))
router.get('/public/m/redirect', asyncHandler(this.getEmailTrackingRedirect()))
@@ -510,17 +509,6 @@ export class CdpApi {
}
}
private postMailjetWebhook =
() =>
async (req: ModifiedRequest, res: express.Response): Promise<any> => {
try {
const { status, message } = await this.emailTrackingService.handleMailjetWebhook(req)
return res.status(status).json({ message })
} catch (error) {
return res.status(500).json({ error: 'Internal error' })
}
}
private postSesWebhook =
() =>
async (req: ModifiedRequest, res: express.Response): Promise<any> => {

View File

@@ -6,21 +6,16 @@ import supertest from 'supertest'
import express from 'ultimate-express'
import { setupExpressApp } from '~/api/router'
import { FixtureHogFlowBuilder } from '~/cdp/_tests/builders/hogflow.builder'
import { insertHogFunction } from '~/cdp/_tests/fixtures'
import { insertHogFlow } from '~/cdp/_tests/fixtures-hogflows'
import { CdpApi } from '~/cdp/cdp-api'
import { HogFunctionType } from '~/cdp/types'
import { KAFKA_APP_METRICS_2 } from '~/config/kafka-topics'
import { HogFlow } from '~/schema/hogflow'
import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql'
import { closeHub, createHub } from '~/utils/db/hub'
import { UUIDT } from '~/utils/utils'
import { Hub, Team } from '../../../types'
import { PIXEL_GIF } from './email-tracking.service'
import { generateEmailTrackingCode } from './helpers/tracking-code'
import { MailjetEventBase, MailjetWebhookEvent } from './types'
describe('EmailTrackingService', () => {
let hub: Hub
@@ -28,10 +23,7 @@ describe('EmailTrackingService', () => {
beforeEach(async () => {
await resetTestDatabase()
hub = await createHub({
MAILJET_SECRET_KEY: 'mailjet-secret-key',
MAILJET_PUBLIC_KEY: 'mailjet-public-key',
})
hub = await createHub({})
team = await getFirstTeam(hub)
mockFetch.mockClear()
@@ -46,10 +38,8 @@ describe('EmailTrackingService', () => {
let api: CdpApi
let app: express.Application
let hogFunction: HogFunctionType
let hogFlow: HogFlow
const invocationId = 'invocation-id'
let server: Server
let exampleEvent: MailjetWebhookEvent
beforeEach(async () => {
api = new CdpApi(hub)
@@ -58,117 +48,12 @@ describe('EmailTrackingService', () => {
server = app.listen(0, () => {})
hogFunction = await insertHogFunction(hub.postgres, team.id)
hogFlow = await insertHogFlow(hub.postgres, new FixtureHogFlowBuilder().withTeamId(team.id).build())
exampleEvent = {
event: 'sent',
time: Date.now(),
email: 'test@example.com',
mj_campaign_id: 1,
mj_contact_id: 1,
mj_message_id: 'test-message-id',
smtp_reply: 'test-smtp-reply',
MessageID: 1,
Message_GUID: 'test-message-guid',
customcampaign: 'test-custom-campaign',
CustomID: '',
Payload: generateEmailTrackingCode({ functionId: hogFunction.id, id: invocationId }),
}
})
afterEach(() => {
server.close()
})
describe('mailjet webhook', () => {
const sendValidEvent = async (mailjetEvent: MailjetEventBase): Promise<supertest.Response> => {
const payload = JSON.stringify(mailjetEvent)
const res = await supertest(app)
.post(`/public/m/mailjet_webhook`)
.set({
'content-type': 'application/json',
})
.send(payload)
return res
}
describe('validation', () => {
it('should return 403 if body is missing', async () => {
const res = await supertest(app).post(`/public/m/mailjet_webhook`).send()
expect(res.status).toBe(403)
expect(res.body).toEqual({
message: 'Missing request body',
})
})
})
it('should not track a metric if the hog function or flow is not found', async () => {
const mailjetEvent: MailjetEventBase = {
...exampleEvent,
Payload: 'ph_fn_id=invalid-function-id&ph_inv_id=invalid-invocation-id',
}
const res = await sendValidEvent(mailjetEvent)
expect(res.status).toBe(200)
expect(res.body).toEqual({ message: 'OK' })
const messages = mockProducerObserver.getProducedKafkaMessagesForTopic(KAFKA_APP_METRICS_2)
expect(messages).toHaveLength(0)
})
it('should track a hog flow if given', async () => {
const mailjetEvent: MailjetEventBase = {
...exampleEvent,
Payload: generateEmailTrackingCode({ functionId: hogFlow.id, id: invocationId }),
}
const res = await sendValidEvent(mailjetEvent)
expect(res.status).toBe(200)
expect(res.body).toEqual({ message: 'OK' })
const messages = mockProducerObserver.getProducedKafkaMessagesForTopic(KAFKA_APP_METRICS_2)
expect(messages).toHaveLength(1)
expect(messages[0].value).toMatchObject({
app_source: 'hog_flow',
app_source_id: hogFlow.id,
count: 1,
instance_id: invocationId,
metric_kind: 'email',
metric_name: 'email_sent',
team_id: team.id,
})
})
it.each([
['open', 'email_opened'],
['click', 'email_link_clicked'],
['bounce', 'email_bounced'],
['spam', 'email_spam'],
['unsub', 'email_unsubscribed'],
] as const)('should handle valid %s event', async (event, metric) => {
const mailjetEvent: MailjetEventBase = {
...exampleEvent,
event,
}
const res = await sendValidEvent(mailjetEvent)
expect(res.status).toBe(200)
expect(res.body).toEqual({ message: 'OK' })
const messages = mockProducerObserver.getProducedKafkaMessagesForTopic(KAFKA_APP_METRICS_2)
expect(messages).toHaveLength(1)
expect(messages[0].value).toMatchObject({
app_source: 'hog_function',
app_source_id: hogFunction.id,
count: 1,
instance_id: invocationId,
metric_kind: 'email',
metric_name: metric,
team_id: team.id,
})
})
})
describe('handleEmailTrackingRedirect', () => {
it('should redirect to the target url and track the click metric', async () => {
const res = await supertest(app).get(

View File

@@ -13,27 +13,12 @@ import { HogFlowManagerService } from '../hogflows/hogflow-manager.service'
import { HogFunctionManagerService } from '../managers/hog-function-manager.service'
import { HogFunctionMonitoringService } from '../monitoring/hog-function-monitoring.service'
import { SesWebhookHandler } from './helpers/ses'
import {
generateEmailTrackingCode,
generateEmailTrackingPixelUrl,
parseEmailTrackingCode,
} from './helpers/tracking-code'
import { MailjetEventType, MailjetWebhookEvent } from './types'
import { generateEmailTrackingCode, generateEmailTrackingPixelUrl } from './helpers/tracking-code'
export const PIXEL_GIF = Buffer.from('R0lGODlhAQABAAAAACH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==', 'base64')
const LINK_REGEX =
/<a\b[^>]*\bhref\s*=\s*(?:"(?!javascript:)([^"]*)"|'(?!javascript:)([^']*)'|(?!javascript:)([^'">\s]+))[^>]*>([\s\S]*?)<\/a>/gi
const EVENT_TYPE_TO_CATEGORY: Record<MailjetEventType, MinimalAppMetric['metric_name'] | undefined> = {
sent: 'email_sent',
open: 'email_opened',
click: 'email_link_clicked',
bounce: 'email_bounced',
blocked: 'email_blocked',
spam: 'email_spam',
unsub: 'email_unsubscribed',
}
const trackingEventsCounter = new Counter({
name: 'email_tracking_events_total',
help: 'Total number of email tracking events received',
@@ -90,7 +75,7 @@ export class EmailTrackingService {
functionId?: string
invocationId?: string
metricName: MinimalAppMetric['metric_name']
source: 'mailjet' | 'direct' | 'ses'
source: 'direct' | 'ses'
}): Promise<void> {
if (!functionId || !invocationId) {
logger.error('[EmailTrackingService] trackMetric: Invalid custom ID', {
@@ -143,40 +128,6 @@ export class EmailTrackingService {
})
}
public async handleMailjetWebhook(req: ModifiedRequest): Promise<{ status: number; message?: string }> {
const okResponse = { status: 200, message: 'OK' }
if (!req.rawBody) {
return { status: 403, message: 'Missing request body' }
}
try {
const event = req.body as MailjetWebhookEvent
const { functionId, invocationId } = parseEmailTrackingCode(event.Payload || '') || {}
const category = EVENT_TYPE_TO_CATEGORY[event.event]
if (!category) {
logger.error('[EmailTrackingService] trackMetric: Unmapped event type', { event })
emailTrackingErrorsCounter.inc({ error_type: 'unmapped_event_type' })
return { status: 400, message: 'Unmapped event type' }
}
await this.trackMetric({
functionId,
invocationId,
metricName: category,
source: 'mailjet',
})
return okResponse
} catch (error) {
emailTrackingErrorsCounter.inc({ error_type: error.name || 'unknown' })
logger.error('[EmailService] handleWebhook: Mailjet webhook error', { error })
throw error
}
}
public async handleSesWebhook(req: ModifiedRequest): Promise<{ status: number; message?: string }> {
if (!req.body) {
return { status: 403, message: 'Missing request body' }

View File

@@ -6,7 +6,6 @@ import { CyclotronInvocationQueueParametersEmailType } from '~/schema/cyclotron'
import { waitForExpect } from '~/tests/helpers/expectations'
import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql'
import { closeHub, createHub } from '~/utils/db/hub'
import { parseJSON } from '~/utils/json-parse'
import { Hub, Team } from '../../../types'
import { EmailService } from './email.service'
@@ -31,7 +30,7 @@ describe('EmailService', () => {
let team: Team
beforeEach(async () => {
await resetTestDatabase()
hub = await createHub({ MAILJET_SECRET_KEY: 'mailjet-secret-key', MAILJET_PUBLIC_KEY: 'mailjet-public-key' })
hub = await createHub({})
team = await getFirstTeam(hub)
service = new EmailService(hub)
mockFetch.mockClear()
@@ -41,6 +40,7 @@ describe('EmailService', () => {
})
describe('executeSendEmail', () => {
let invocation: CyclotronJobInvocationHogFunction
let sendEmailSpy: jest.SpyInstance
beforeEach(async () => {
await insertIntegration(hub.postgres, team.id, {
id: 1,
@@ -50,7 +50,7 @@ describe('EmailService', () => {
name: 'Test User',
domain: 'posthog.com',
verified: true,
provider: 'mailjet',
provider: 'ses',
},
})
invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' })
@@ -59,6 +59,11 @@ describe('EmailService', () => {
stack: [],
} as any
invocation.queueParameters = createEmailParams({ from: { integrationId: 1, email: 'test@posthog.com' } })
// Mock SES sendEmail to avoid actual AWS calls
sendEmailSpy = jest.spyOn(service.ses, 'sendEmail').mockReturnValue({
promise: () => Promise.resolve({ MessageId: 'test-message-id' }),
} as any)
})
describe('integration validation', () => {
beforeEach(async () => {
@@ -106,14 +111,8 @@ describe('EmailService', () => {
})
const result = await service.executeSendEmail(invocation)
expect(result.error).toBeUndefined()
expect(parseJSON(mockFetch.mock.calls[0][1].body).Messages[0].From).toMatchInlineSnapshot(
`
{
"Email": "test@posthog.com",
"Name": "Test User",
}
`
)
expect(sendEmailSpy).toHaveBeenCalled()
expect(sendEmailSpy.mock.calls[0][0].Source).toBe('"Test User" <test@posthog.com>')
})
it('should validate if the email domain is not verified', async () => {
invocation.queueParameters = createEmailParams({
@@ -134,21 +133,23 @@ describe('EmailService', () => {
it('should send an email', async () => {
const result = await service.executeSendEmail(invocation)
expect(result.error).toBeUndefined()
expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(
`
[
"https://api.mailjet.com/v3.1/send",
{
"body": "{"Messages":[{"From":{"Email":"test@posthog.com","Name":"Test User"},"To":[{"Email":"test@example.com","Name":"Test User"}],"Subject":"Test Subject","TextPart":"Test Text","HTMLPart":"Test HTML","EventPayload":"ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE"}]}",
"headers": {
"Authorization": "Basic bWFpbGpldC1wdWJsaWMta2V5Om1haWxqZXQtc2VjcmV0LWtleQ==",
"Content-Type": "application/json",
expect(sendEmailSpy).toHaveBeenCalled()
expect(sendEmailSpy.mock.calls[0][0]).toMatchObject({
Source: '"Test User" <test@posthog.com>',
Destination: {
ToAddresses: ['"Test User" <test@example.com>'],
},
Message: {
Subject: {
Data: 'Test Subject',
},
"method": "POST",
},
]
`
)
Body: {
Text: {
Data: 'Test Text',
},
},
},
})
})
})
})
@@ -160,8 +161,6 @@ describe('EmailService', () => {
mockFetch.mockImplementation((...args: any[]): Promise<any> => {
return actualFetch(...args) as any
})
hub.MAILJET_PUBLIC_KEY = ''
hub.MAILJET_SECRET_KEY = ''
await insertIntegration(hub.postgres, team.id, {
id: 1,
kind: 'email',
@@ -216,8 +215,6 @@ describe('EmailService', () => {
mockFetch.mockImplementation((...args: any[]): Promise<any> => {
return actualFetch(...args) as any
})
hub.MAILJET_PUBLIC_KEY = ''
hub.MAILJET_SECRET_KEY = ''
await insertIntegration(hub.postgres, team.id, {
id: 1,
kind: 'email',

View File

@@ -4,7 +4,6 @@ import { CyclotronJobInvocationHogFunction, CyclotronJobInvocationResult, Integr
import { createAddLogFunction, logEntry } from '~/cdp/utils'
import { createInvocationResult } from '~/cdp/utils/invocation-utils'
import { CyclotronInvocationQueueParametersEmailType } from '~/schema/cyclotron'
import { fetch } from '~/utils/request'
import { Hub } from '../../../types'
import { addTrackingToEmail } from './email-tracking.service'
@@ -52,13 +51,10 @@ export class EmailService {
this.validateEmailDomain(integration, params)
switch (integration.config.provider ?? 'mailjet') {
switch (integration.config.provider ?? 'ses') {
case 'maildev':
await this.sendEmailWithMaildev(result, params)
break
case 'mailjet':
await this.sendEmailWithMailjet(result, params)
break
case 'ses':
await this.sendEmailWithSES(result, params)
break
@@ -109,49 +105,6 @@ export class EmailService {
params.from.name = integration.config.name
}
private async sendEmailWithMailjet(
result: CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>,
params: CyclotronInvocationQueueParametersEmailType
): Promise<void> {
// First we need to lookup the email sending domain of the given team
const response = await fetch('https://api.mailjet.com/v3.1/send', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Basic ${Buffer.from(
`${this.hub.MAILJET_PUBLIC_KEY}:${this.hub.MAILJET_SECRET_KEY}`
).toString('base64')}`,
},
body: JSON.stringify({
Messages: [
{
From: {
Email: params.from.email,
Name: params.from.name,
},
To: [
{
Email: params.to.email,
Name: params.to.name,
},
],
Subject: params.subject,
TextPart: params.text,
HTMLPart: params.html,
EventPayload: generateEmailTrackingCode(result.invocation),
},
],
}),
})
// TODO: Add support for retries - in fact if it fails should we actually crash out the service?
if (response.status >= 400) {
throw new Error(
`Failed to send email to ${params.to.email} (status ${response.status}): ${await response.text()}`
)
}
}
// Send email to local maildev instance for testing (DEBUG=1 only)
private async sendEmailWithMaildev(
result: CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>,

View File

@@ -1,186 +0,0 @@
// Mailjet Webhook Event Types
// Based on official Mailjet documentation: https://dev.mailjet.com/email/guides/webhooks/
export type MailjetEventType = 'sent' | 'open' | 'click' | 'bounce' | 'blocked' | 'spam' | 'unsub'
// Base interface shared by all events
export interface MailjetEventBase {
/** The event type */
event: MailjetEventType
/** Unix timestamp when the event occurred */
time: number
/** Legacy Message ID (numeric) */
MessageID: number
/** Unique 128-bit ID for this message (UUID format) */
Message_GUID: string
/** Recipient email address */
email: string
/** Mailjet campaign ID */
mj_campaign_id: number
/** Mailjet contact ID */
mj_contact_id: number
/** Custom campaign identifier */
customcampaign: string
/** Custom ID provided when sending (for tracking) */
CustomID?: string
/** Custom payload provided when sending */
Payload?: string
}
// Specific event interfaces
export interface MailjetSentEvent extends MailjetEventBase {
event: 'sent'
/** Mailjet message ID (string format) */
mj_message_id: string
/** SMTP server response */
smtp_reply: string
}
export interface MailjetOpenEvent extends MailjetEventBase {
event: 'open'
/** IP address where the open occurred */
ip: string
/** Geographic location (country code) */
geo: string
/** User agent string of the client */
agent: string
}
export interface MailjetClickEvent extends MailjetEventBase {
event: 'click'
/** The URL that was clicked */
url: string
/** IP address where the click occurred */
ip: string
/** Geographic location (country code) */
geo: string
/** User agent string of the client */
agent: string
}
export interface MailjetBounceEvent extends MailjetEventBase {
event: 'bounce'
/** Whether this is a blocked email */
blocked: boolean
/** Whether this is a hard bounce (permanent failure) */
hard_bounce: boolean
/** What the error is related to (e.g., "recipient", "content") */
error_related_to: string
/** Detailed error message */
error: string
/** Additional comments about the bounce */
comment?: string
}
export interface MailjetBlockedEvent extends MailjetEventBase {
event: 'blocked'
/** What the error is related to (e.g., "recipient", "content") */
error_related_to: string
/** Detailed error message */
error: string
}
export interface MailjetSpamEvent extends MailjetEventBase {
event: 'spam'
/** Source of the spam report (e.g., "JMRPP") */
source: string
}
export interface MailjetUnsubEvent extends MailjetEventBase {
event: 'unsub'
/** Mailjet list ID from which the user unsubscribed */
mj_list_id: number
/** IP address where the unsubscribe occurred */
ip: string
/** Geographic location (country code) */
geo: string
/** User agent string of the client */
agent: string
}
// Union type for all possible webhook events
export type MailjetWebhookEvent =
| MailjetSentEvent
| MailjetOpenEvent
| MailjetClickEvent
| MailjetBounceEvent
| MailjetBlockedEvent
| MailjetSpamEvent
| MailjetUnsubEvent
// Event type to category mapping (for your existing code compatibility)
export const EVENT_TYPE_TO_CATEGORY = {
sent: 'email_sent',
open: 'email_opened',
click: 'email_link_clicked',
bounce: 'email_bounced',
blocked: 'email_blocked',
spam: 'email_spam',
unsub: 'email_unsubscribed',
} as const
// Type guards to narrow event types
export function isSentEvent(event: MailjetWebhookEvent): event is MailjetSentEvent {
return event.event === 'sent'
}
export function isOpenEvent(event: MailjetWebhookEvent): event is MailjetOpenEvent {
return event.event === 'open'
}
export function isClickEvent(event: MailjetWebhookEvent): event is MailjetClickEvent {
return event.event === 'click'
}
export function isBounceEvent(event: MailjetWebhookEvent): event is MailjetBounceEvent {
return event.event === 'bounce'
}
export function isBlockedEvent(event: MailjetWebhookEvent): event is MailjetBlockedEvent {
return event.event === 'blocked'
}
export function isSpamEvent(event: MailjetWebhookEvent): event is MailjetSpamEvent {
return event.event === 'spam'
}
export function isUnsubEvent(event: MailjetWebhookEvent): event is MailjetUnsubEvent {
return event.event === 'unsub'
}
// Error categorization for bounce/blocked events
export type MailjetErrorType =
| 'recipient' // Invalid recipient
| 'content' // Content-related issue
| 'domain' // Domain-related issue
| 'reputation' // Sender reputation issue
| 'policy' // Policy violation
| 'system' // System error
| 'timeout' // Connection timeout
| 'quota' // Quota exceeded
| 'unknown' // Unknown error
// Common bounce/error reasons
export type MailjetBounceReason =
| 'user unknown' // Email address doesn't exist
| 'domain not found' // Domain doesn't exist
| 'mailbox full' // Recipient's mailbox is full
| 'message too large' // Message exceeds size limits
| 'content blocked' // Content triggered spam filters
| 'policy violation' // Violated sending policy
| 'reputation blocked' // Sender reputation issue
| 'rate limit exceeded' // Too many messages sent
| 'connection timeout' // Connection timed out
| 'duplicate in campaign' // X-Mailjet-DeduplicateCampaign duplicate
| 'preblocked' // Address preblocked by Mailjet
| 'spam content' // Content classified as spam
| string // Other specific error messages
// Extended interfaces with more specific error typing
export interface MailjetBounceEventTyped extends Omit<MailjetBounceEvent, 'error'> {
error: MailjetBounceReason
}
export interface MailjetBlockedEventTyped extends Omit<MailjetBlockedEvent, 'error'> {
error: MailjetBounceReason
}

View File

@@ -343,11 +343,7 @@ export function getDefaultConfig(): PluginsServerConfig {
GROUPS_DUAL_WRITE_COMPARISON_ENABLED: false,
USE_DYNAMIC_EVENT_INGESTION_RESTRICTION_CONFIG: false,
// Workflows
MAILJET_PUBLIC_KEY: '',
MAILJET_SECRET_KEY: '',
// SES
// SES (Workflows email sending)
SES_ENDPOINT: isTestEnv() || isDevEnv() ? 'http://localhost:4566' : '',
SES_ACCESS_KEY_ID: isTestEnv() || isDevEnv() ? 'test' : '',
SES_SECRET_ACCESS_KEY: isTestEnv() || isDevEnv() ? 'test' : '',

View File

@@ -473,11 +473,7 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig,
PERSON_JSONB_SIZE_ESTIMATE_ENABLE: number
USE_DYNAMIC_EVENT_INGESTION_RESTRICTION_CONFIG: boolean
// Workflows
MAILJET_PUBLIC_KEY: string
MAILJET_SECRET_KEY: string
// SES
// SES (Workflows email sending)
SES_ENDPOINT: string
SES_ACCESS_KEY_ID: string
SES_SECRET_ACCESS_KEY: string