diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 11b24cacaa..14fd4e74e5 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -442,6 +442,17 @@ services: networks: - otel_network + localstack: + container_name: '${LOCALSTACK_DOCKER_NAME:-localstack-main}' + image: localstack/localstack + ports: + - '127.0.0.1:4566:4566' # LocalStack Gateway + - '127.0.0.1:4510-4559:4510-4559' # external services port range + environment: + - DEBUG=${DEBUG:-0} + volumes: + - '/var/run/docker.sock:/var/run/docker.sock' + networks: otel_network: driver: bridge diff --git a/docker-compose.dev-full.yml b/docker-compose.dev-full.yml index 8a6088d2ae..da3d9c90de 100644 --- a/docker-compose.dev-full.yml +++ b/docker-compose.dev-full.yml @@ -258,6 +258,11 @@ services: file: docker-compose.base.yml service: jaeger + localstack: + extends: + file: docker-compose.base.yml + service: localstack + networks: otel_network: driver: bridge diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index a7d24a2ce1..98307511b2 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -248,6 +248,11 @@ services: file: docker-compose.base.yml service: jaeger + localstack: + extends: + file: docker-compose.base.yml + service: localstack + networks: otel_network: driver: bridge diff --git a/frontend/__snapshots__/scenes-app-insights-funnels--funnel-left-to-right--light.png b/frontend/__snapshots__/scenes-app-insights-funnels--funnel-left-to-right--light.png index fc27f4e6b1..31e8143052 100644 Binary files a/frontend/__snapshots__/scenes-app-insights-funnels--funnel-left-to-right--light.png and b/frontend/__snapshots__/scenes-app-insights-funnels--funnel-left-to-right--light.png differ diff --git a/frontend/__snapshots__/scenes-app-settings-environment--settings-environment-details--dark.png b/frontend/__snapshots__/scenes-app-settings-environment--settings-environment-details--dark.png index 06e51eede3..f7079cd7e9 100644 Binary files a/frontend/__snapshots__/scenes-app-settings-environment--settings-environment-details--dark.png and b/frontend/__snapshots__/scenes-app-settings-environment--settings-environment-details--dark.png differ diff --git a/frontend/src/lib/constants.tsx b/frontend/src/lib/constants.tsx index 789b5efc98..634bf61a91 100644 --- a/frontend/src/lib/constants.tsx +++ b/frontend/src/lib/constants.tsx @@ -209,6 +209,7 @@ export const FEATURE_FLAGS = { MESSAGING: 'messaging', // owner @haven #team-messaging MESSAGING_EARLY_ACCESS: 'messaging-product', // owner @haven #team-messaging MESSAGING_TRIGGER_WEBHOOK: 'messaging-trigger-webhook', // owner #team-messaging + MESSAGING_SES: 'messaging-ses', // owner #team-messaging ENVIRONMENTS_ROLLBACK: 'environments-rollback', // owner: @yasen-posthog #team-platform-features SELF_SERVE_CREDIT_OVERRIDE: 'self-serve-credit-override', // owner: @zach CUSTOM_CSS_THEMES: 'custom-css-themes', // owner: @daibhin diff --git a/frontend/src/lib/integrations/IntegrationEmailDomainView.tsx b/frontend/src/lib/integrations/IntegrationEmailDomainView.tsx index d4cea45f3f..5a00ed7d65 100644 --- a/frontend/src/lib/integrations/IntegrationEmailDomainView.tsx +++ b/frontend/src/lib/integrations/IntegrationEmailDomainView.tsx @@ -16,7 +16,7 @@ const isVerificationRequired = (integration: IntegrationType): boolean => { const isVerified = (integration: IntegrationType): boolean => { switch (integration.kind) { case 'email': - return integration.config.mailjet_verified === true + return integration.config.verified === true default: return true } diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index 5f995e5adb..9ce2131a57 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -126,6 +126,7 @@ export class CdpApi { router.get('/public/webhooks/:webhook_id', asyncHandler(this.handleWebhook())) router.get('/public/m/pixel', asyncHandler(this.getEmailTrackingPixel())) router.post('/public/m/mailjet_webhook', asyncHandler(this.postMailjetWebhook())) + router.post('/public/m/ses_webhook', asyncHandler(this.postSesWebhook())) router.get('/public/m/redirect', asyncHandler(this.getEmailTrackingRedirect())) return router @@ -555,6 +556,17 @@ export class CdpApi { } } + private postSesWebhook = + () => + async (req: ModifiedRequest, res: express.Response): Promise => { + try { + const { status, message } = await this.emailTrackingService.handleSesWebhook(req) + return res.status(status).json({ message }) + } catch (error) { + return res.status(500).json({ error: 'Internal error' }) + } + } + private getEmailTrackingPixel = () => async (req: ModifiedRequest, res: express.Response): Promise => { diff --git a/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts b/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts index c63d5f91bb..5490faef8b 100644 --- a/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts +++ b/plugin-server/src/cdp/services/messaging/email-tracking.service.test.ts @@ -18,7 +18,8 @@ import { closeHub, createHub } from '~/utils/db/hub' import { UUIDT } from '~/utils/utils' import { Hub, Team } from '../../../types' -import { PIXEL_GIF, generateEmailTrackingCode } from './email-tracking.service' +import { PIXEL_GIF } from './email-tracking.service' +import { generateEmailTrackingCode } from './helpers/tracking-code' import { MailjetEventBase, MailjetWebhookEvent } from './types' describe('EmailTrackingService', () => { diff --git a/plugin-server/src/cdp/services/messaging/email-tracking.service.ts b/plugin-server/src/cdp/services/messaging/email-tracking.service.ts index 3fb75648cb..0494b45d3b 100644 --- a/plugin-server/src/cdp/services/messaging/email-tracking.service.ts +++ b/plugin-server/src/cdp/services/messaging/email-tracking.service.ts @@ -2,7 +2,7 @@ import { Counter } from 'prom-client' import express from 'ultimate-express' import { ModifiedRequest } from '~/api/router' -import { AppMetricType, CyclotronJobInvocationHogFunction, MinimalAppMetric } from '~/cdp/types' +import { CyclotronJobInvocationHogFunction, MinimalAppMetric } from '~/cdp/types' import { defaultConfig } from '~/config/config' import { captureException } from '~/utils/posthog' @@ -11,6 +11,12 @@ import { logger } from '../../../utils/logger' import { HogFlowManagerService } from '../hogflows/hogflow-manager.service' import { HogFunctionManagerService } from '../managers/hog-function-manager.service' import { HogFunctionMonitoringService } from '../monitoring/hog-function-monitoring.service' +import { SesWebhookHandler } from './helpers/ses' +import { + generateEmailTrackingCode, + generateEmailTrackingPixelUrl, + parseEmailTrackingCode, +} from './helpers/tracking-code' import { MailjetEventType, MailjetWebhookEvent } from './types' export const PIXEL_GIF = Buffer.from('R0lGODlhAQABAAAAACH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==', 'base64') @@ -39,38 +45,11 @@ const emailTrackingErrorsCounter = new Counter({ labelNames: ['error_type', 'source'], }) -export const parseEmailTrackingCode = (customId: string): { functionId: string; invocationId: string } | null => { - // customId is like ph_fn_id=function-1&ph_inv_id=invocation-1 - try { - const params = new URLSearchParams(customId) - const functionId = params.get('ph_fn_id') - const invocationId = params.get('ph_inv_id') - if (!functionId || !invocationId) { - return null - } - return { functionId, invocationId } - } catch { - return null - } -} - -export const generateEmailTrackingCode = ( - invocation: Pick -): string => { - return `ph_fn_id=${invocation.functionId}&ph_inv_id=${invocation.id}` -} - -export const generateEmailTrackingPixelUrl = ( - invocation: Pick -): string => { - return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/pixel?${generateEmailTrackingCode(invocation)}` -} - export const generateTrackingRedirectUrl = ( invocation: Pick, targetUrl: string ): string => { - return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/redirect?${generateEmailTrackingCode(invocation)}&target=${encodeURIComponent(targetUrl)}` + return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/redirect?ph_id=${generateEmailTrackingCode(invocation)}&target=${encodeURIComponent(targetUrl)}` } export const addTrackingToEmail = (html: string, invocation: CyclotronJobInvocationHogFunction): string => { @@ -90,12 +69,16 @@ export const addTrackingToEmail = (html: string, invocation: CyclotronJobInvocat } export class EmailTrackingService { + private sesWebhookHandler: SesWebhookHandler + constructor( private hub: Hub, private hogFunctionManager: HogFunctionManagerService, private hogFlowManager: HogFlowManagerService, private hogFunctionMonitoringService: HogFunctionMonitoringService - ) {} + ) { + this.sesWebhookHandler = new SesWebhookHandler() + } private async trackMetric({ functionId, @@ -106,7 +89,7 @@ export class EmailTrackingService { functionId?: string invocationId?: string metricName: MinimalAppMetric['metric_name'] - source: 'mailjet' | 'direct' + source: 'mailjet' | 'direct' | 'ses' }): Promise { if (!functionId || !invocationId) { logger.error('[EmailTrackingService] trackMetric: Invalid custom ID', { @@ -159,9 +142,7 @@ export class EmailTrackingService { }) } - public async handleMailjetWebhook( - req: ModifiedRequest - ): Promise<{ status: number; message?: string; metrics?: AppMetricType[] }> { + public async handleMailjetWebhook(req: ModifiedRequest): Promise<{ status: number; message?: string }> { const okResponse = { status: 200, message: 'OK' } if (!req.rawBody) { @@ -195,6 +176,35 @@ export class EmailTrackingService { } } + public async handleSesWebhook(req: ModifiedRequest): Promise<{ status: number; message?: string }> { + if (!req.rawBody) { + return { status: 403, message: 'Missing request body' } + } + + try { + const { status, body, metrics } = await this.sesWebhookHandler.handleWebhook({ + body: req.rawBody, + headers: req.headers, + verifySignature: true, + }) + + for (const metric of metrics || []) { + await this.trackMetric({ + functionId: metric.functionId, + invocationId: metric.invocationId, + metricName: metric.metricName, + source: 'ses', + }) + } + + return { status, message: body as string } + } catch (error) { + emailTrackingErrorsCounter.inc({ error_type: error.name || 'unknown' }) + logger.error('[EmailService] handleWebhook: SES webhook error', { error }) + throw error + } + } + public async handleEmailTrackingPixel(req: ModifiedRequest, res: express.Response): Promise { // NOTE: this is somewhat naieve. We should expand with UA checking for things like apple's tracking prevention etc. const { ph_fn_id, ph_inv_id } = req.query diff --git a/plugin-server/src/cdp/services/messaging/email.service.test.ts b/plugin-server/src/cdp/services/messaging/email.service.test.ts index f6075132c1..e5648a27fb 100644 --- a/plugin-server/src/cdp/services/messaging/email.service.test.ts +++ b/plugin-server/src/cdp/services/messaging/email.service.test.ts @@ -45,11 +45,19 @@ describe('EmailService', () => { await insertIntegration(hub.postgres, team.id, { id: 1, kind: 'email', - config: { email: 'test@posthog.com', name: 'Test User', domain: 'posthog.com', mailjet_verified: true }, + config: { + email: 'test@posthog.com', + name: 'Test User', + domain: 'posthog.com', + verified: true, + provider: 'mailjet', + }, }) invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' }) invocation.id = 'invocation-1' - invocation.state.vmState = { stack: [] } as any + invocation.state.vmState = { + stack: [], + } as any invocation.queueParameters = createEmailParams({ from: { integrationId: 1, email: 'test@posthog.com' } }) }) describe('integration validation', () => { @@ -61,7 +69,7 @@ describe('EmailService', () => { email: 'test@other-domain.com', name: 'Test User', domain: 'other-domain.com', - mailjet_verified: false, + verified: false, }, }) await insertIntegration(hub.postgres, team.id, { @@ -131,7 +139,7 @@ describe('EmailService', () => { [ "https://api.mailjet.com/v3.1/send", { - "body": "{"Messages":[{"From":{"Email":"test@posthog.com","Name":"Test User"},"To":[{"Email":"test@example.com","Name":"Test User"}],"Subject":"Test Subject","TextPart":"Test Text","HTMLPart":"Test HTML","EventPayload":"ph_fn_id=function-1&ph_inv_id=invocation-1"}]}", + "body": "{"Messages":[{"From":{"Email":"test@posthog.com","Name":"Test User"},"To":[{"Email":"test@example.com","Name":"Test User"}],"Subject":"Test Subject","TextPart":"Test Text","HTMLPart":"Test HTML","EventPayload":"ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE"}]}", "headers": { "Authorization": "Basic bWFpbGpldC1wdWJsaWMta2V5Om1haWxqZXQtc2VjcmV0LWtleQ==", "Content-Type": "application/json", @@ -144,10 +152,9 @@ describe('EmailService', () => { }) }) }) - describe('native email sending', () => { + describe('native email sending with maildev', () => { let invocation: CyclotronJobInvocationHogFunction const mailDevAPI = new MailDevAPI() - beforeEach(async () => { const actualFetch = jest.requireActual('~/utils/request').fetch as jest.Mock mockFetch.mockImplementation((...args: any[]): Promise => { @@ -158,41 +165,36 @@ describe('EmailService', () => { await insertIntegration(hub.postgres, team.id, { id: 1, kind: 'email', - config: { email: 'test@posthog.com', name: 'Test User', domain: 'posthog.com', mailjet_verified: true }, + config: { + email: 'test@posthog.com', + name: 'Test User', + domain: 'posthog.com', + verified: true, + provider: 'maildev', + }, }) invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' }) invocation.id = 'invocation-1' - invocation.state.vmState = { stack: [] } as any + invocation.state.vmState = { + stack: [], + } as any invocation.queueParameters = createEmailParams({ from: { integrationId: 1, email: 'test@posthog.com' } }) await mailDevAPI.clearEmails() }) - it('should send an email', async () => { const result = await service.executeSendEmail(invocation) expect(result.error).toBeUndefined() - await waitForExpect(async () => expect(mailDevAPI.getEmails()).resolves.toHaveLength(1)) const emails = await mailDevAPI.getEmails() expect(emails).toHaveLength(1) expect(emails[0]).toMatchObject({ - from: [ - { - address: 'test@posthog.com', - name: 'Test User', - }, - ], + from: [{ address: 'test@posthog.com', name: 'Test User' }], html: 'Test HTML', subject: 'Test Subject', text: 'Test Text', - to: [ - { - address: 'test@example.com', - name: 'Test User', - }, - ], + to: [{ address: 'test@example.com', name: 'Test User' }], }) }) - it('should include tracking code in the email', async () => { invocation.queueParameters = createEmailParams({ html: 'Hi! Click me', @@ -202,8 +204,93 @@ describe('EmailService', () => { const emails = await mailDevAPI.getEmails() expect(emails).toHaveLength(1) expect(emails[0].html).toEqual( - `Hi! Click me` + `Hi! Click me` ) }) }) + describe('native email sending with ses', () => { + let invocation: CyclotronJobInvocationHogFunction + let sendEmailSpy: jest.SpyInstance + beforeEach(async () => { + const actualFetch = jest.requireActual('~/utils/request').fetch as jest.Mock + mockFetch.mockImplementation((...args: any[]): Promise => { + return actualFetch(...args) as any + }) + hub.MAILJET_PUBLIC_KEY = '' + hub.MAILJET_SECRET_KEY = '' + await insertIntegration(hub.postgres, team.id, { + id: 1, + kind: 'email', + config: { + email: 'test@posthog-test.com', + name: 'Test User', + domain: 'posthog-test.com', + verified: true, + provider: 'ses', + }, + }) + invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' }) + invocation.id = 'invocation-1' + invocation.state.vmState = { + stack: [], + } as any + invocation.queueParameters = createEmailParams({ + from: { integrationId: 1, email: 'test@posthog-test.com' }, + }) + sendEmailSpy = jest.spyOn(service.ses, 'sendEmail') + + // Check if identity exists before trying to delete it to avoid localstack bug + await service.ses + .deleteIdentity({ Identity: 'posthog-test.com' }) + .promise() + .catch(() => {}) // Ensure the domain is deleted - we dont care if it fails + }) + + it('should error if not verified', async () => { + const result = await service.executeSendEmail(invocation) + expect(result.error).toEqual( + 'Failed to send email via SES: Email address not verified "Test User" ' + ) + }) + + it('should send an email if verified', async () => { + // Localstack auto-approves verification + await service.ses.verifyDomainIdentity({ Domain: 'posthog-test.com' }).promise() + const result = await service.executeSendEmail(invocation) + expect(result.error).toBeUndefined() + expect(sendEmailSpy.mock.calls[0][0]).toMatchInlineSnapshot(` + { + "ConfigurationSetName": "posthog-messaging", + "Destination": { + "ToAddresses": [ + ""Test User" ", + ], + }, + "Message": { + "Body": { + "Html": { + "Charset": "UTF-8", + "Data": "Test HTML", + }, + "Text": { + "Charset": "UTF-8", + "Data": "Test Text", + }, + }, + "Subject": { + "Charset": "UTF-8", + "Data": "Test Subject", + }, + }, + "Source": ""Test User" ", + "Tags": [ + { + "Name": "ph_id", + "Value": "ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE", + }, + ], + } + `) + }) + }) }) diff --git a/plugin-server/src/cdp/services/messaging/email.service.ts b/plugin-server/src/cdp/services/messaging/email.service.ts index 2eb9c28693..fda8817498 100644 --- a/plugin-server/src/cdp/services/messaging/email.service.ts +++ b/plugin-server/src/cdp/services/messaging/email.service.ts @@ -1,3 +1,5 @@ +import AWS from 'aws-sdk' + import { CyclotronJobInvocationHogFunction, CyclotronJobInvocationResult, IntegrationType } from '~/cdp/types' import { createAddLogFunction, logEntry } from '~/cdp/utils' import { createInvocationResult } from '~/cdp/utils/invocation-utils' @@ -5,11 +7,21 @@ import { CyclotronInvocationQueueParametersEmailType } from '~/schema/cyclotron' import { fetch } from '~/utils/request' import { Hub } from '../../../types' -import { addTrackingToEmail, generateEmailTrackingCode } from './email-tracking.service' +import { addTrackingToEmail } from './email-tracking.service' import { mailDevTransport, mailDevWebUrl } from './helpers/maildev' +import { generateEmailTrackingCode } from './helpers/tracking-code' export class EmailService { - constructor(private hub: Hub) {} + ses: AWS.SES + + constructor(private hub: Hub) { + this.ses = new AWS.SES({ + accessKeyId: this.hub.SES_ACCESS_KEY_ID, + secretAccessKey: this.hub.SES_SECRET_ACCESS_KEY, + region: this.hub.SES_REGION, + endpoint: this.hub.SES_ENDPOINT || undefined, + }) + } // Send email public async executeSendEmail( @@ -40,13 +52,16 @@ export class EmailService { this.validateEmailDomain(integration, params) - switch (this.getEmailDeliveryMode()) { + switch (integration.config.provider ?? 'mailjet') { case 'maildev': await this.sendEmailWithMaildev(result, params) break case 'mailjet': await this.sendEmailWithMailjet(result, params) break + case 'ses': + await this.sendEmailWithSES(result, params) + break case 'unsupported': throw new Error('Email delivery mode not supported') } @@ -82,7 +97,7 @@ export class EmailService { ): void { // Currently we enforce using the name and email set on the integration - if (!integration.config.mailjet_verified) { + if (!integration.config.verified) { throw new Error('The selected email integration domain is not verified') } @@ -94,17 +109,6 @@ export class EmailService { params.from.name = integration.config.name } - private getEmailDeliveryMode(): 'mailjet' | 'maildev' | 'unsupported' { - if (this.hub.MAILJET_PUBLIC_KEY && this.hub.MAILJET_SECRET_KEY) { - return 'mailjet' - } - - if (mailDevTransport) { - return 'maildev' - } - return 'unsupported' - } - private async sendEmailWithMailjet( result: CyclotronJobInvocationResult, params: CyclotronInvocationQueueParametersEmailType @@ -168,4 +172,46 @@ export class EmailService { result.logs.push(logEntry('debug', `Email sent to your local maildev server: ${mailDevWebUrl}`)) } + + private async sendEmailWithSES( + result: CyclotronJobInvocationResult, + params: CyclotronInvocationQueueParametersEmailType + ): Promise { + const trackingCode = generateEmailTrackingCode(result.invocation) + const htmlWithTracking = addTrackingToEmail(params.html, result.invocation) + + const sendEmailParams = { + Source: params.from.name ? `"${params.from.name}" <${params.from.email}>` : params.from.email, + Destination: { + ToAddresses: [params.to.name ? `"${params.to.name}" <${params.to.email}>` : params.to.email], + }, + Message: { + Subject: { + Data: params.subject, + Charset: 'UTF-8', + }, + Body: { + Html: { + Data: htmlWithTracking, + Charset: 'UTF-8', + }, + Text: { + Data: params.text, + Charset: 'UTF-8', + }, + }, + }, + ConfigurationSetName: 'posthog-messaging', // This triggers the SNS notifications for email tracking + Tags: [{ Name: 'ph_id', Value: trackingCode }], + } + + try { + const response = await this.ses.sendEmail(sendEmailParams).promise() + if (!response.MessageId) { + throw new Error('No messageId returned from SES') + } + } catch (error) { + throw new Error(`Failed to send email via SES: ${error.message}`) + } + } } diff --git a/plugin-server/src/cdp/services/messaging/helpers/ses.ts b/plugin-server/src/cdp/services/messaging/helpers/ses.ts new file mode 100644 index 0000000000..4ce0347ccd --- /dev/null +++ b/plugin-server/src/cdp/services/messaging/helpers/ses.ts @@ -0,0 +1,357 @@ +import crypto from 'node:crypto' +import { z } from 'zod' + +import { MinimalAppMetric } from '~/cdp/types' +import { parseJSON } from '~/utils/json-parse' +import { logger } from '~/utils/logger' +import { fetch } from '~/utils/request' + +import { parseEmailTrackingCode } from './tracking-code' + +/** + * ---------- SNS envelope types ---------- + * If raw_message_delivery=false (default), SNS wraps your message in this envelope. + */ +const SnsEnvelopeSchema = z.object({ + Type: z.enum(['SubscriptionConfirmation', 'Notification', 'UnsubscribeConfirmation']), + MessageId: z.string(), + Token: z.string().optional(), + TopicArn: z.string(), + Subject: z.string().optional(), + Message: z.string(), // either SES event JSON (Notification) or a confirmation message + Timestamp: z.string(), + SignatureVersion: z.enum(['1']), + Signature: z.string(), + SigningCertURL: z.string().url(), + UnsubscribeURL: z.string().url().optional(), +}) + +export type SnsEnvelope = z.infer + +/** + * ---------- SES event types ---------- + * AWS posts an array of records in the "Message" (or directly as body if raw_message_delivery=true). + * We model the common fields and create specific event detail types. + */ +const SesMailSchema = z.object({ + timestamp: z.string(), + source: z.string(), // From address + messageId: z.string(), + destination: z.array(z.string()), + headersTruncated: z.boolean().optional(), + headers: z.array(z.object({ name: z.string(), value: z.string() })).optional(), + tags: z.record(z.array(z.string())).optional(), // your message tags: { user_id: ["u_123"] } +}) + +const SesCommonEventBase = z.object({ + eventType: z.enum([ + 'Send', + 'Reject', + 'Bounce', + 'Complaint', + 'Delivery', + 'Open', + 'Click', + 'RenderingFailure', + 'DeliveryDelay', + ]), + mail: SesMailSchema, +}) + +const SesOpenEventSchema = SesCommonEventBase.extend({ + eventType: z.literal('Open'), + open: z.object({ + ipAddress: z.string().optional(), + userAgent: z.string().optional(), + timestamp: z.string(), + }), +}) + +const SesClickEventSchema = SesCommonEventBase.extend({ + eventType: z.literal('Click'), + click: z.object({ + ipAddress: z.string().optional(), + link: z.string(), + userAgent: z.string().optional(), + timestamp: z.string(), + }), +}) + +const SesDeliveryEventSchema = SesCommonEventBase.extend({ + eventType: z.literal('Delivery'), + delivery: z.object({ + processingTimeMillis: z.number().optional(), + smtpResponse: z.string().optional(), + reportingMTA: z.string().optional(), + timestamp: z.string(), + recipients: z.array(z.string()).optional(), + }), +}) + +const SesBounceEventSchema = SesCommonEventBase.extend({ + eventType: z.literal('Bounce'), + bounce: z.object({ + bounceType: z.enum(['Undetermined', 'Permanent', 'Transient']), + bounceSubType: z.string().optional(), + bouncedRecipients: z.array( + z.object({ + emailAddress: z.string(), + action: z.string().optional(), + status: z.string().optional(), + diagnosticCode: z.string().optional(), + }) + ), + timestamp: z.string(), + reportingMTA: z.string().optional(), + }), +}) + +const SesComplaintEventSchema = SesCommonEventBase.extend({ + eventType: z.literal('Complaint'), + complaint: z.object({ + complainedRecipients: z.array(z.object({ emailAddress: z.string() })), + timestamp: z.string(), + complaintFeedbackType: z.string().optional(), + userAgent: z.string().optional(), + feedbackId: z.string().optional(), + }), +}) + +const SesRenderingFailureSchema = SesCommonEventBase.extend({ + eventType: z.literal('RenderingFailure'), + renderingFailure: z.object({ + errorMessage: z.string(), + templateName: z.string().optional(), + }), +}) + +const SesSendEventSchema = SesCommonEventBase.extend({ eventType: z.literal('Send') }) +const SesRejectEventSchema = SesCommonEventBase.extend({ eventType: z.literal('Reject') }) + +const SesEventRecordSchema = z.union([ + SesOpenEventSchema, + SesClickEventSchema, + SesDeliveryEventSchema, + SesBounceEventSchema, + SesComplaintEventSchema, + SesRenderingFailureSchema, + SesSendEventSchema, + SesRejectEventSchema, +]) + +const SesEventBatchSchema = z.array(SesEventRecordSchema) + +export type SesEventRecord = z.infer + +const EVENT_TYPE_TO_METRIC_NAME: Record = { + Open: 'email_opened', + Click: 'email_link_clicked', + // Delivery: 'email_sent', + Bounce: 'email_bounced', + Complaint: 'email_blocked', + RenderingFailure: 'email_failed', + Send: 'email_sent', + Reject: 'email_failed', + Delivery: 'email_sent', +} + +export class SesWebhookHandler { + certCache: Record | undefined> = {} + + private async fetchText(url: string): Promise { + const response = await fetch(url) + if (response.status >= 400) { + throw new Error(`Failed to fetch ${url}: ${response.status}`) + } + return await response.text() + } + + private async fetchCert(url: string): Promise { + // Validate that the URL is from AWS SNS + if (!this.isValidSnsCertUrl(url)) { + throw new Error(`Invalid SNS certificate URL: ${url}`) + } + + if (this.certCache[url]) { + return await this.certCache[url]! + } + this.certCache[url] = this.fetchText(url) + return this.certCache[url]! + } + + private isValidSnsCertUrl(url: string): boolean { + try { + const parsedUrl = new URL(url) + + // Must be HTTPS + if (parsedUrl.protocol !== 'https:') { + return false + } + + // Must be from sns.{region}.amazonaws.com + const hostname = parsedUrl.hostname + if (!hostname.match(/^sns\.[a-z0-9-]+\.amazonaws\.com$/)) { + return false + } + + // Must end with .pem + if (!parsedUrl.pathname.endsWith('.pem')) { + return false + } + + return true + } catch { + return false + } + } + + /** + * Parse incoming body accounting for SNS raw vs envelope + */ + private parseIncomingBody( + body: unknown + ): + | { mode: 'raw' | 'sns'; records: SesEventRecord[] } + | { mode: 'sns'; envelope: SnsEnvelope; records?: SesEventRecord[] } { + // If it's already an object with "Type", it's probably the SNS envelope (raw=false) + if (body && typeof body === 'object' && 'Type' in (body as any)) { + const env = SnsEnvelopeSchema.parse(body) + if (env.Type === 'Notification') { + const inner = parseJSON(env.Message) + const records = SesEventBatchSchema.parse(inner) + return { mode: 'sns', envelope: env, records } + } + // For non-Notification, return envelope; caller decides how to handle + return { mode: 'sns', envelope: env } + } + + // raw_message_delivery=true → body is already the SES array + const records = SesEventBatchSchema.parse(body) + return { mode: 'raw', records } + } + + /** + * Verify SNS signature + * Best practice: verify the signature unless you're behind a trusted ALB / private VPC. + */ + private async verifySnsSignature(envelope: SnsEnvelope): Promise { + try { + // 1) Fetch cert + const cert = await this.fetchCert(envelope.SigningCertURL) + // 2) Build string to sign (per SNS docs) + const stringToSign = this.buildStringToSign(envelope) + // 3) Verify + const verifier = crypto.createVerify('RSA-SHA1') // SNS SignatureVersion=1 uses SHA1 + verifier.update(stringToSign, 'utf8') + return verifier.verify(cert, envelope.Signature, 'base64') + } catch { + return false + } + } + + /** + * Build string to sign for SNS signature verification + */ + private buildStringToSign(m: SnsEnvelope): string { + // Follows AWS SNS docs for SignatureVersion=1 + // For Notification: + // "Message\n{Message}\nMessageId\n{MessageId}\nSubject\n{Subject}\nTimestamp\n{Timestamp}\nTopicArn\n{TopicArn}\nType\n{Type}\n" + // Subject line omitted if not present. + const lines: string[] = [] + const pushKV = (k: string, v?: string) => { + if (v !== undefined) { + lines.push(k) + lines.push(v) + } + } + if (m.Type === 'Notification') { + pushKV('Message', m.Message) + pushKV('MessageId', m.MessageId) + pushKV('Subject', m.Subject) + pushKV('Timestamp', m.Timestamp) + pushKV('TopicArn', m.TopicArn) + pushKV('Type', m.Type) + } else if (m.Type === 'SubscriptionConfirmation' || m.Type === 'UnsubscribeConfirmation') { + pushKV('Message', m.Message) + pushKV('MessageId', m.MessageId) + pushKV('SubscribeURL', (m as any).SubscribeURL) // present in confirmation payload body, not in envelope schema + pushKV('Timestamp', m.Timestamp) + pushKV('Token', m.Token!) + pushKV('TopicArn', m.TopicArn) + pushKV('Type', m.Type) + } + return lines.join('\n') + '\n' + } + + async handleWebhook(opts: { + body: any + headers: Record + verifySignature?: boolean + }): Promise<{ + status: number + body: unknown + metrics?: { + functionId?: string + invocationId?: string + metricName: MinimalAppMetric['metric_name'] + }[] + }> { + logger.info('[SesWebhookHandler] handleWebhook', { body: opts.body, headers: opts.headers }) + const parsed = this.parseIncomingBody(opts.body) + + logger.info('[SesWebhookHandler] parsed', { parsed }) + + // If SNS envelope present and verification requested, verify signature + if ('envelope' in parsed && opts.verifySignature) { + logger.info('[SesWebhookHandler] verifying signature', { envelope: parsed.envelope }) + const ok = await this.verifySnsSignature(parsed.envelope) + logger.info('[SesWebhookHandler] signature verified', { ok }) + if (!ok) { + return { status: 403, body: { error: 'Invalid SNS signature' } } + } + } + + // Handle confirmation flow + if (parsed.mode === 'sns' && 'envelope' in parsed && parsed.envelope?.Type === 'SubscriptionConfirmation') { + logger.info('[SesWebhookHandler] confirming subscription', { envelope: parsed.envelope }) + // Confirm by visiting SubscribeURL (contained in the *message JSON*, not envelope.Message field here) + // We need to fetch the inner message JSON to get SubscribeURL + const env = parsed.envelope + const inner = parseJSON(env.Message) as { SubscribeURL?: string } + logger.info('[SesWebhookHandler] confirming subscription', { inner }) + if (inner.SubscribeURL) { + await this.fetchText(inner.SubscribeURL) + } + return { status: 200, body: { ok: true } } + } + + if (parsed.mode === 'sns' && 'envelope' in parsed && parsed.envelope?.Type === 'UnsubscribeConfirmation') { + logger.info('[SesWebhookHandler] confirming unsubscribe', { envelope: parsed.envelope }) + return { status: 200, body: { ok: true } } + } + + // Process SES events + const records = parsed.mode === 'raw' ? parsed.records : parsed.records! + const metrics: { + functionId?: string + invocationId?: string + metricName: MinimalAppMetric['metric_name'] + }[] = [] + + for (const rec of records) { + logger.info('[SesWebhookHandler] processing record', { rec }) + const tags = rec.mail.tags + const { functionId, invocationId } = parseEmailTrackingCode(tags?.ph_id?.[0] || '') || {} + + if (!functionId && !invocationId) { + logger.error('[SesWebhookHandler] handleWebhook: No functionId or invocationId found', { rec }) + continue + } + + const metricName = EVENT_TYPE_TO_METRIC_NAME[rec.eventType] + metrics.push({ functionId, invocationId, metricName }) + } + + return { status: 200, body: { ok: true }, metrics } + } +} diff --git a/plugin-server/src/cdp/services/messaging/helpers/tracking-code.ts b/plugin-server/src/cdp/services/messaging/helpers/tracking-code.ts new file mode 100644 index 0000000000..5f83b33864 --- /dev/null +++ b/plugin-server/src/cdp/services/messaging/helpers/tracking-code.ts @@ -0,0 +1,48 @@ +import { CyclotronJobInvocationHogFunction } from '~/cdp/types' +import { defaultConfig } from '~/config/config' + +function toBase64UrlSafe(input: string) { + // Encode to normal base64 + const b64 = Buffer.from(input, 'utf8').toString('base64') + // Make URL safe and strip padding + return b64.replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '') +} + +function fromBase64UrlSafe(b64url: string) { + // Restore base64 from URL-safe variant + let b64 = b64url.replace(/-/g, '+').replace(/_/g, '/') + // Pad to length multiple of 4 + while (b64.length % 4) { + b64 += '=' + } + return Buffer.from(b64, 'base64').toString('utf8') +} + +export const parseEmailTrackingCode = ( + encodedTrackingCode: string +): { functionId: string; invocationId: string } | null => { + // customId is like ph_fn_id=function-1&ph_inv_id=invocation-1 + const decodedTrackingCode = fromBase64UrlSafe(encodedTrackingCode) + try { + const [functionId, invocationId] = decodedTrackingCode.split(':') + if (!functionId || !invocationId) { + return null + } + return { functionId, invocationId } + } catch { + return null + } +} + +export const generateEmailTrackingCode = ( + invocation: Pick +): string => { + // Generate a base64 encoded string free of equal signs + return toBase64UrlSafe(`${invocation.functionId}:${invocation.id}`) +} + +export const generateEmailTrackingPixelUrl = ( + invocation: Pick +): string => { + return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/pixel?ph_id=${generateEmailTrackingCode(invocation)}` +} diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index c167acc4ee..2a4665033f 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -318,6 +318,12 @@ export function getDefaultConfig(): PluginsServerConfig { // Messaging MAILJET_PUBLIC_KEY: '', MAILJET_SECRET_KEY: '', + + // SES + SES_ENDPOINT: isTestEnv() || isDevEnv() ? 'http://localhost:4566' : '', + SES_ACCESS_KEY_ID: isTestEnv() || isDevEnv() ? 'test' : '', + SES_SECRET_ACCESS_KEY: isTestEnv() || isDevEnv() ? 'test' : '', + SES_REGION: 'us-east-1', } } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 33fccab785..002ba8d4e5 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -460,6 +460,12 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig // Messaging MAILJET_PUBLIC_KEY: string MAILJET_SECRET_KEY: string + + // SES + SES_ENDPOINT: string + SES_ACCESS_KEY_ID: string + SES_SECRET_ACCESS_KEY: string + SES_REGION: string } export interface Hub extends PluginsServerConfig { diff --git a/posthog/api/integration.py b/posthog/api/integration.py index cd9b025718..5a1769aa6d 100644 --- a/posthog/api/integration.py +++ b/posthog/api/integration.py @@ -3,6 +3,7 @@ import json from typing import Any from urllib.parse import urlencode +from django.conf import settings from django.core.cache import cache from django.http import HttpResponse from django.shortcuts import redirect @@ -35,6 +36,7 @@ from posthog.models.integration import ( class NativeEmailIntegrationSerializer(serializers.Serializer): email = serializers.EmailField() name = serializers.CharField() + provider = serializers.ChoiceField(choices=["ses", "mailjet", "maildev"] if settings.DEBUG else ["ses", "mailjet"]) class IntegrationSerializer(serializers.ModelSerializer): diff --git a/posthog/api/test/test_integration.py b/posthog/api/test/test_integration.py index a20c0239cc..f1ef15f84f 100644 --- a/posthog/api/test/test_integration.py +++ b/posthog/api/test/test_integration.py @@ -226,8 +226,8 @@ class TestEmailIntegration: "email": self.valid_config["email"], "name": self.valid_config["name"], "domain": "posthog.com", - "mailjet_verified": False, - "aws_ses_verified": False, + "verified": False, + "provider": "mailjet", } assert integration.sensitive_config == {} assert integration.created_by == self.user @@ -274,8 +274,8 @@ class TestEmailIntegration: "email": self.valid_config["email"], "name": self.valid_config["name"], "domain": "posthog.com", - "mailjet_verified": False, - "aws_ses_verified": False, + "verified": False, + "provider": "mailjet", } @patch("posthog.models.integration.MailjetProvider") @@ -318,8 +318,8 @@ class TestEmailIntegration: "email": self.valid_config["email"], "name": self.valid_config["name"], "domain": "posthog.com", - "mailjet_verified": True, - "aws_ses_verified": False, + "verified": True, + "provider": "mailjet", } @patch("posthog.models.integration.MailjetProvider") @@ -371,10 +371,10 @@ class TestEmailIntegration: self.user, ) - assert not integration1.config["mailjet_verified"] - assert not integration2.config["mailjet_verified"] - assert not integrationOtherDomain.config["mailjet_verified"] - assert not integrationOtherTeam.config["mailjet_verified"] + assert not integration1.config["verified"] + assert not integration2.config["verified"] + assert not integrationOtherDomain.config["verified"] + assert not integrationOtherTeam.config["verified"] email_integration = EmailIntegration(integration1) verification_result = email_integration.verify() @@ -385,7 +385,7 @@ class TestEmailIntegration: integrationOtherDomain.refresh_from_db() integrationOtherTeam.refresh_from_db() - assert integration1.config["mailjet_verified"] - assert integration2.config["mailjet_verified"] - assert not integrationOtherDomain.config["mailjet_verified"] - assert not integrationOtherTeam.config["mailjet_verified"] + assert integration1.config["verified"] + assert integration2.config["verified"] + assert not integrationOtherDomain.config["verified"] + assert not integrationOtherTeam.config["verified"] diff --git a/posthog/models/integration.py b/posthog/models/integration.py index 02c49dc88c..66a075f5c6 100644 --- a/posthog/models/integration.py +++ b/posthog/models/integration.py @@ -33,7 +33,7 @@ from posthog.models.user import User from posthog.plugins.plugin_server_api import reload_integrations_on_workers from posthog.sync import database_sync_to_async -from products.messaging.backend.providers import MailjetProvider, TwilioProvider +from products.messaging.backend.providers import MailjetProvider, SESProvider, TwilioProvider logger = structlog.get_logger(__name__) @@ -1075,16 +1075,28 @@ class EmailIntegration: def mailjet_provider(self) -> MailjetProvider: return MailjetProvider() + @property + def ses_provider(self) -> SESProvider: + return SESProvider() + @classmethod def create_native_integration(cls, config: dict, team_id: int, created_by: User | None = None) -> Integration: email_address: str = config["email"] name: str = config["name"] domain: str = email_address.split("@")[1] + provider: str = config.get("provider", "mailjet") # Default to mailjet for backward compatibility - mailjet = MailjetProvider() - - # TODO: Look for integration belonging to the team with the same domain - mailjet.create_email_domain(domain, team_id=team_id) + # Create domain in the appropriate provider + if provider == "ses": + ses = SESProvider() + ses.create_email_domain(domain, team_id=team_id) + elif provider == "mailjet": + mailjet = MailjetProvider() + mailjet.create_email_domain(domain, team_id=team_id) + elif provider == "maildev" and settings.DEBUG: + pass + else: + raise ValueError(f"Invalid provider: must be either 'ses' or 'mailjet'") integration, created = Integration.objects.update_or_create( team_id=team_id, @@ -1095,8 +1107,8 @@ class EmailIntegration: "email": email_address, "domain": domain, "name": name, - "mailjet_verified": False, - "aws_ses_verified": False, + "provider": provider, + "verified": True if provider == "maildev" else False, }, "created_by": created_by, }, @@ -1135,18 +1147,31 @@ class EmailIntegration: def verify(self): domain = self.integration.config.get("domain") + provider = self.integration.config.get("provider", "mailjet") - verification_result = self.mailjet_provider.verify_email_domain(domain, team_id=self.integration.team_id) + # Use the appropriate provider for verification + if provider == "ses": + verification_result = self.ses_provider.verify_email_domain(domain, team_id=self.integration.team_id) + elif provider == "mailjet": + verification_result = self.mailjet_provider.verify_email_domain(domain, team_id=self.integration.team_id) + elif provider == "maildev": + verification_result = { + "status": "success", + "dnsRecords": [], + } + else: + raise ValueError(f"Invalid provider: {provider}") if verification_result.get("status") == "success": - # We can validate all other integrations with the same domain + # We can validate all other integrations with the same domain and provider other_integrations = Integration.objects.filter( team_id=self.integration.team_id, kind="email", config__domain=domain, + config__provider=provider, ) for integration in other_integrations: - integration.config["mailjet_verified"] = True + integration.config["verified"] = True integration.save() return verification_result diff --git a/posthog/settings/__init__.py b/posthog/settings/__init__.py index 9750732f08..eabfe27383 100644 --- a/posthog/settings/__init__.py +++ b/posthog/settings/__init__.py @@ -48,6 +48,7 @@ from posthog.settings.session_replay_v2 import * from posthog.settings.integrations import * from posthog.settings.pagerduty import * from posthog.settings.payments import * +from posthog.settings.ses import * from posthog.settings.utils import get_from_env, str_to_bool diff --git a/posthog/settings/ses.py b/posthog/settings/ses.py new file mode 100644 index 0000000000..91f636f428 --- /dev/null +++ b/posthog/settings/ses.py @@ -0,0 +1,15 @@ +import os +from typing import Optional + +from posthog.settings.base_variables import DEBUG, TEST + +if TEST or DEBUG: + SES_ENDPOINT = os.getenv("SES_ENDPOINT", "http://localhost:4566") + SES_ACCESS_KEY_ID: Optional[str] = os.getenv("SES_ACCESS_KEY_ID", "test") + SES_SECRET_ACCESS_KEY: Optional[str] = os.getenv("SES_SECRET_ACCESS_KEY", "test") +else: + SES_ENDPOINT = os.getenv("SES_ENDPOINT", "") + SES_ACCESS_KEY_ID = os.getenv("SES_ACCESS_KEY_ID", "") or None + SES_SECRET_ACCESS_KEY = os.getenv("SES_SECRET_ACCESS_KEY", "") or None + +SES_REGION = os.getenv("SES_REGION", "us-east-1") diff --git a/products/messaging/backend/providers/__init__.py b/products/messaging/backend/providers/__init__.py index 4117abad96..2a40e63f18 100644 --- a/products/messaging/backend/providers/__init__.py +++ b/products/messaging/backend/providers/__init__.py @@ -1,4 +1,5 @@ from .mailjet import MailjetProvider +from .ses import SESProvider from .twilio import TwilioProvider -__all__ = ["MailjetProvider", "TwilioProvider"] +__all__ = ["MailjetProvider", "TwilioProvider", "SESProvider"] diff --git a/products/messaging/backend/providers/ses.py b/products/messaging/backend/providers/ses.py new file mode 100644 index 0000000000..792d5ee15d --- /dev/null +++ b/products/messaging/backend/providers/ses.py @@ -0,0 +1,165 @@ +import re +import logging + +from django.conf import settings + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError +from rest_framework import exceptions + +logger = logging.getLogger(__name__) + + +class SESProvider: + def __init__(self): + self.access_key_id = self.get_access_key_id() + self.secret_access_key = self.get_secret_access_key() + self.region = self.get_region() + self.endpoint_url = self.get_endpoint_url() + + # Initialize SES client + self.client = boto3.client( + "ses", + aws_access_key_id=self.access_key_id, + aws_secret_access_key=self.secret_access_key, + region_name=self.region, + endpoint_url=self.endpoint_url if self.endpoint_url else None, + ) + + @classmethod + def get_access_key_id(cls) -> str: + access_key_id = settings.SES_ACCESS_KEY_ID + if not access_key_id: + raise ValueError("SES_ACCESS_KEY_ID is not set in environment or settings") + return access_key_id + + @classmethod + def get_secret_access_key(cls) -> str: + secret_access_key = settings.SES_SECRET_ACCESS_KEY + if not secret_access_key: + raise ValueError("SES_SECRET_ACCESS_KEY is not set in environment or settings") + return secret_access_key + + @classmethod + def get_region(cls) -> str: + return settings.SES_REGION + + @classmethod + def get_endpoint_url(cls) -> str | None: + return settings.SES_ENDPOINT + + def create_email_domain(self, domain: str, team_id: int): + # NOTE: For sesv1 creation is done through verification + self.verify_email_domain(domain, team_id) + + def verify_email_domain(self, domain: str, team_id: int): + # Validate the domain contains valid characters for a domain name + DOMAIN_REGEX = r"(?i)^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$" + if not re.match(DOMAIN_REGEX, domain): + raise exceptions.ValidationError("Please enter a valid domain or subdomain name.") + + dns_records = [] + + # Start/ensure domain verification (TXT at _amazonses.domain) --- + verification_token = None + try: + resp = self.client.verify_domain_identity(Domain=domain) + verification_token = resp.get("VerificationToken") + except ClientError as e: + # If already requested/exists, carry on; SES v1 is idempotent-ish here + if e.response["Error"]["Code"] not in ("InvalidParameterValue",): + raise + + if verification_token: + dns_records.append( + { + "type": "verification", + "recordType": "TXT", + "recordHostname": f"_amazonses.{domain}", + "recordValue": verification_token, + "status": "pending", + } + ) + + # Start/ensure DKIM (three CNAMEs) --- + dkim_tokens: list[str] = [] + try: + resp = self.client.verify_domain_dkim(Domain=domain) + dkim_tokens = resp.get("DkimTokens", []) or [] + except ClientError as e: + if e.response["Error"]["Code"] not in ("InvalidParameterValue",): + raise + + for t in dkim_tokens: + dns_records.append( + { + "type": "dkim", + "recordType": "CNAME", + "recordHostname": f"{t}._domainkey.{domain}", + "recordValue": f"{t}.dkim.amazonses.com", + "status": "pending", + } + ) + + dns_records.append( + { + "type": "spf", + "recordType": "TXT", + "recordHostname": "@", + "recordValue": "v=spf1 include:amazonses.com ~all", + "status": "pending", + } + ) + + # Current verification / DKIM statuses to compute overall status & per-record statuses --- + try: + id_attrs = self.client.get_identity_verification_attributes(Identities=[domain]) + verification_status = ( + id_attrs["VerificationAttributes"].get(domain, {}).get("VerificationStatus", "Unknown") + ) + except ClientError: + verification_status = "Unknown" + + try: + dkim_attrs = self.client.get_identity_dkim_attributes(Identities=[domain]) + dkim_status = dkim_attrs["DkimAttributes"].get(domain, {}).get("DkimVerificationStatus", "Unknown") + except ClientError: + dkim_status = "Unknown" + + # Normalize overall status + if verification_status == "Success" and dkim_status == "Success": + overall = "success" + elif "Failed" in (verification_status, dkim_status): + overall = "failed" + else: + overall = "pending" + + # Upgrade per-record statuses if SES reports success + # - Domain verification TXT is considered verified when VerificationStatus == Success + # - DKIM CNAMEs considered verified when DkimVerificationStatus == Success + if verification_status == "Success": + for r in dns_records: + if r["type"] == "verification": + r["status"] = "success" + if dkim_status == "Success": + for r in dns_records: + if r["type"] == "dkim": + r["status"] = "success" + + # If MAIL FROM attrs said Success earlier, MX already marked verified + + return { + "status": overall, + "dnsRecords": dns_records if overall != "success" else [], + } + + def delete_identity(self, identity: str): + """ + Delete an identity from SES + """ + try: + self.client.delete_identity(Identity=identity) + logger.info(f"Identity {identity} deleted from SES") + except (ClientError, BotoCoreError) as e: + logger.exception(f"SES API error deleting identity: {e}") + raise diff --git a/products/messaging/backend/test/test_ses_provider.py b/products/messaging/backend/test/test_ses_provider.py new file mode 100644 index 0000000000..5aee9222ea --- /dev/null +++ b/products/messaging/backend/test/test_ses_provider.py @@ -0,0 +1,131 @@ +import pytest +from unittest import TestCase +from unittest.mock import patch + +from django.test import override_settings + +from products.messaging.backend.providers.ses import SESProvider + +TEST_DOMAIN = "test.posthog.com" + + +class TestSESProvider(TestCase): + def setUp(self): + # Remove all domains from SES + ses_provider = SESProvider() + if TEST_DOMAIN in ses_provider.client.list_identities()["Identities"]: + ses_provider.delete_identity(TEST_DOMAIN) + + def test_init_with_valid_credentials(self): + with override_settings( + SES_ACCESS_KEY_ID="test_access_key", + SES_SECRET_ACCESS_KEY="test_secret_key", + SES_REGION="us-east-1", + SES_ENDPOINT="", + ): + provider = SESProvider() + assert provider.access_key_id == "test_access_key" + assert provider.secret_access_key == "test_secret_key" + assert provider.region == "us-east-1" + + def test_init_missing_access_key(self): + with override_settings(SES_ACCESS_KEY_ID="", SES_SECRET_ACCESS_KEY="test_secret_key"): + with pytest.raises(ValueError, match="SES_ACCESS_KEY_ID is not set"): + SESProvider() + + def test_init_missing_secret_key(self): + with override_settings(SES_ACCESS_KEY_ID="test_access_key", SES_SECRET_ACCESS_KEY=""): + with pytest.raises(ValueError, match="SES_SECRET_ACCESS_KEY is not set"): + SESProvider() + + def test_create_email_domain_success(self): + provider = SESProvider() + provider.create_email_domain(TEST_DOMAIN, team_id=1) + + @patch("products.messaging.backend.providers.ses.boto3.client") + def test_create_email_domain_invalid_domain(self, mock_boto_client): + with override_settings( + SES_ACCESS_KEY_ID="test_access_key", SES_SECRET_ACCESS_KEY="test_secret_key", SES_REGION="us-east-1" + ): + provider = SESProvider() + with pytest.raises(Exception, match="Please enter a valid domain"): + provider.create_email_domain("invalid-domain", team_id=1) + + def test_verify_email_domain_initial_setup(self): + provider = SESProvider() + + # Mock the client on the provider instance + with patch.object(provider, "client") as mock_client: + # Mock the verification attributes to return a non-success status + mock_client.get_identity_verification_attributes.return_value = { + "VerificationAttributes": { + TEST_DOMAIN: { + "VerificationStatus": "Pending", # Non-success status + "VerificationToken": "test-token-123", + } + } + } + + # Mock DKIM attributes to return a non-success status + mock_client.get_identity_dkim_attributes.return_value = { + "DkimAttributes": { + TEST_DOMAIN: { + "DkimVerificationStatus": "Pending" # Non-success status + } + } + } + + # Mock the domain verification and DKIM setup calls + mock_client.verify_domain_identity.return_value = {"VerificationToken": "test-token-123"} + mock_client.verify_domain_dkim.return_value = {"DkimTokens": ["token1", "token2", "token3"]} + + result = provider.verify_email_domain(TEST_DOMAIN, team_id=1) + + # Should return pending status with DNS records + assert result == { + "status": "pending", + "dnsRecords": [ + { + "type": "verification", + "recordType": "TXT", + "recordHostname": "_amazonses.test.posthog.com", + "recordValue": "test-token-123", + "status": "pending", + }, + { + "type": "dkim", + "recordType": "CNAME", + "recordHostname": "token1._domainkey.test.posthog.com", + "recordValue": "token1.dkim.amazonses.com", + "status": "pending", + }, + { + "type": "dkim", + "recordType": "CNAME", + "recordHostname": "token2._domainkey.test.posthog.com", + "recordValue": "token2.dkim.amazonses.com", + "status": "pending", + }, + { + "type": "dkim", + "recordType": "CNAME", + "recordHostname": "token3._domainkey.test.posthog.com", + "recordValue": "token3.dkim.amazonses.com", + "status": "pending", + }, + { + "type": "spf", + "recordType": "TXT", + "recordHostname": "@", + "recordValue": "v=spf1 include:amazonses.com ~all", + "status": "pending", + }, + ], + } + + def test_verify_email_domain_success(self): + provider = SESProvider() + + result = provider.verify_email_domain(TEST_DOMAIN, team_id=1) + # Should return verified status with no DNS records needed + assert result == {"status": "success", "dnsRecords": []} diff --git a/products/messaging/frontend/Channels/EmailSetup/EmailSetupModal.tsx b/products/messaging/frontend/Channels/EmailSetup/EmailSetupModal.tsx index 416d0305e0..d39d495cef 100644 --- a/products/messaging/frontend/Channels/EmailSetup/EmailSetupModal.tsx +++ b/products/messaging/frontend/Channels/EmailSetup/EmailSetupModal.tsx @@ -2,8 +2,9 @@ import { useActions, useValues } from 'kea' import { Form } from 'kea-forms' import { IconCheckCircle, IconCopy, IconWarning } from '@posthog/icons' -import { LemonButton, LemonInput, LemonModal, Spinner, lemonToast } from '@posthog/lemon-ui' +import { LemonButton, LemonInput, LemonModal, LemonSelect, Spinner, lemonToast } from '@posthog/lemon-ui' +import { FlaggedFeature } from 'lib/components/FlaggedFeature' import { LemonField } from 'lib/lemon-ui/LemonField' import { DnsRecord, EmailSetupModalLogicProps, emailSetupModalLogic } from './emailSetupModalLogic' @@ -20,6 +21,18 @@ export const EmailSetupModal = (props: EmailSetupModalLogicProps): JSX.Element = modalContent = (
+ + {/* NOTE: We probably dont want to actually give the options - this is just for our own testing */} + + + + diff --git a/products/messaging/frontend/Channels/EmailSetup/emailSetupModalLogic.ts b/products/messaging/frontend/Channels/EmailSetup/emailSetupModalLogic.ts index 8fff25b011..4b567dc850 100644 --- a/products/messaging/frontend/Channels/EmailSetup/emailSetupModalLogic.ts +++ b/products/messaging/frontend/Channels/EmailSetup/emailSetupModalLogic.ts @@ -26,6 +26,7 @@ export interface DnsRecord { export interface EmailSenderFormType { email: string name: string + provider: 'ses' | 'mailjet' | 'maildev' } const EMAIL_REGEX = /^[^\s@]+@[^\s@]+\.[^\s@]+$/i @@ -41,10 +42,11 @@ export const emailSetupModalLogic = kea([ forms(({ actions }) => ({ emailSender: { defaults: { + provider: 'mailjet', email: '', name: '', - }, - errors: ({ email, name }) => { + } as EmailSenderFormType, + errors: ({ email, name, provider }) => { let emailError = undefined if (!email) { emailError = 'Email is required' @@ -55,6 +57,7 @@ export const emailSetupModalLogic = kea([ return { email: emailError, name: !name ? 'Name is required' : undefined, + provider: !provider ? 'Provider is required' : undefined, } }, submit: async (config) => { diff --git a/products/messaging/frontend/Channels/messageChannelLogic.ts b/products/messaging/frontend/Channels/messageChannelLogic.ts index aeff3815a5..934289e14d 100644 --- a/products/messaging/frontend/Channels/messageChannelLogic.ts +++ b/products/messaging/frontend/Channels/messageChannelLogic.ts @@ -35,7 +35,7 @@ export const messageChannelLogic = kea([ ({ integration }): boolean => { switch (integration?.kind) { case 'email': - return integration.config.mailjet_verified === true + return integration.config.verified === true default: return true }