mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(messaging): Added SES support for email integration (#38634)
This commit is contained in:
@@ -442,6 +442,17 @@ services:
|
||||
networks:
|
||||
- otel_network
|
||||
|
||||
localstack:
|
||||
container_name: '${LOCALSTACK_DOCKER_NAME:-localstack-main}'
|
||||
image: localstack/localstack
|
||||
ports:
|
||||
- '127.0.0.1:4566:4566' # LocalStack Gateway
|
||||
- '127.0.0.1:4510-4559:4510-4559' # external services port range
|
||||
environment:
|
||||
- DEBUG=${DEBUG:-0}
|
||||
volumes:
|
||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||
|
||||
networks:
|
||||
otel_network:
|
||||
driver: bridge
|
||||
|
||||
@@ -258,6 +258,11 @@ services:
|
||||
file: docker-compose.base.yml
|
||||
service: jaeger
|
||||
|
||||
localstack:
|
||||
extends:
|
||||
file: docker-compose.base.yml
|
||||
service: localstack
|
||||
|
||||
networks:
|
||||
otel_network:
|
||||
driver: bridge
|
||||
|
||||
@@ -248,6 +248,11 @@ services:
|
||||
file: docker-compose.base.yml
|
||||
service: jaeger
|
||||
|
||||
localstack:
|
||||
extends:
|
||||
file: docker-compose.base.yml
|
||||
service: localstack
|
||||
|
||||
networks:
|
||||
otel_network:
|
||||
driver: bridge
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 100 KiB After Width: | Height: | Size: 99 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 235 KiB After Width: | Height: | Size: 235 KiB |
@@ -209,6 +209,7 @@ export const FEATURE_FLAGS = {
|
||||
MESSAGING: 'messaging', // owner @haven #team-messaging
|
||||
MESSAGING_EARLY_ACCESS: 'messaging-product', // owner @haven #team-messaging
|
||||
MESSAGING_TRIGGER_WEBHOOK: 'messaging-trigger-webhook', // owner #team-messaging
|
||||
MESSAGING_SES: 'messaging-ses', // owner #team-messaging
|
||||
ENVIRONMENTS_ROLLBACK: 'environments-rollback', // owner: @yasen-posthog #team-platform-features
|
||||
SELF_SERVE_CREDIT_OVERRIDE: 'self-serve-credit-override', // owner: @zach
|
||||
CUSTOM_CSS_THEMES: 'custom-css-themes', // owner: @daibhin
|
||||
|
||||
@@ -16,7 +16,7 @@ const isVerificationRequired = (integration: IntegrationType): boolean => {
|
||||
const isVerified = (integration: IntegrationType): boolean => {
|
||||
switch (integration.kind) {
|
||||
case 'email':
|
||||
return integration.config.mailjet_verified === true
|
||||
return integration.config.verified === true
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -126,6 +126,7 @@ export class CdpApi {
|
||||
router.get('/public/webhooks/:webhook_id', asyncHandler(this.handleWebhook()))
|
||||
router.get('/public/m/pixel', asyncHandler(this.getEmailTrackingPixel()))
|
||||
router.post('/public/m/mailjet_webhook', asyncHandler(this.postMailjetWebhook()))
|
||||
router.post('/public/m/ses_webhook', asyncHandler(this.postSesWebhook()))
|
||||
router.get('/public/m/redirect', asyncHandler(this.getEmailTrackingRedirect()))
|
||||
|
||||
return router
|
||||
@@ -555,6 +556,17 @@ export class CdpApi {
|
||||
}
|
||||
}
|
||||
|
||||
private postSesWebhook =
|
||||
() =>
|
||||
async (req: ModifiedRequest, res: express.Response): Promise<any> => {
|
||||
try {
|
||||
const { status, message } = await this.emailTrackingService.handleSesWebhook(req)
|
||||
return res.status(status).json({ message })
|
||||
} catch (error) {
|
||||
return res.status(500).json({ error: 'Internal error' })
|
||||
}
|
||||
}
|
||||
|
||||
private getEmailTrackingPixel =
|
||||
() =>
|
||||
async (req: ModifiedRequest, res: express.Response): Promise<any> => {
|
||||
|
||||
@@ -18,7 +18,8 @@ import { closeHub, createHub } from '~/utils/db/hub'
|
||||
import { UUIDT } from '~/utils/utils'
|
||||
|
||||
import { Hub, Team } from '../../../types'
|
||||
import { PIXEL_GIF, generateEmailTrackingCode } from './email-tracking.service'
|
||||
import { PIXEL_GIF } from './email-tracking.service'
|
||||
import { generateEmailTrackingCode } from './helpers/tracking-code'
|
||||
import { MailjetEventBase, MailjetWebhookEvent } from './types'
|
||||
|
||||
describe('EmailTrackingService', () => {
|
||||
|
||||
@@ -2,7 +2,7 @@ import { Counter } from 'prom-client'
|
||||
import express from 'ultimate-express'
|
||||
|
||||
import { ModifiedRequest } from '~/api/router'
|
||||
import { AppMetricType, CyclotronJobInvocationHogFunction, MinimalAppMetric } from '~/cdp/types'
|
||||
import { CyclotronJobInvocationHogFunction, MinimalAppMetric } from '~/cdp/types'
|
||||
import { defaultConfig } from '~/config/config'
|
||||
import { captureException } from '~/utils/posthog'
|
||||
|
||||
@@ -11,6 +11,12 @@ import { logger } from '../../../utils/logger'
|
||||
import { HogFlowManagerService } from '../hogflows/hogflow-manager.service'
|
||||
import { HogFunctionManagerService } from '../managers/hog-function-manager.service'
|
||||
import { HogFunctionMonitoringService } from '../monitoring/hog-function-monitoring.service'
|
||||
import { SesWebhookHandler } from './helpers/ses'
|
||||
import {
|
||||
generateEmailTrackingCode,
|
||||
generateEmailTrackingPixelUrl,
|
||||
parseEmailTrackingCode,
|
||||
} from './helpers/tracking-code'
|
||||
import { MailjetEventType, MailjetWebhookEvent } from './types'
|
||||
|
||||
export const PIXEL_GIF = Buffer.from('R0lGODlhAQABAAAAACH5BAEKAAEALAAAAAABAAEAAAICTAEAOw==', 'base64')
|
||||
@@ -39,38 +45,11 @@ const emailTrackingErrorsCounter = new Counter({
|
||||
labelNames: ['error_type', 'source'],
|
||||
})
|
||||
|
||||
export const parseEmailTrackingCode = (customId: string): { functionId: string; invocationId: string } | null => {
|
||||
// customId is like ph_fn_id=function-1&ph_inv_id=invocation-1
|
||||
try {
|
||||
const params = new URLSearchParams(customId)
|
||||
const functionId = params.get('ph_fn_id')
|
||||
const invocationId = params.get('ph_inv_id')
|
||||
if (!functionId || !invocationId) {
|
||||
return null
|
||||
}
|
||||
return { functionId, invocationId }
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export const generateEmailTrackingCode = (
|
||||
invocation: Pick<CyclotronJobInvocationHogFunction, 'functionId' | 'id'>
|
||||
): string => {
|
||||
return `ph_fn_id=${invocation.functionId}&ph_inv_id=${invocation.id}`
|
||||
}
|
||||
|
||||
export const generateEmailTrackingPixelUrl = (
|
||||
invocation: Pick<CyclotronJobInvocationHogFunction, 'functionId' | 'id'>
|
||||
): string => {
|
||||
return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/pixel?${generateEmailTrackingCode(invocation)}`
|
||||
}
|
||||
|
||||
export const generateTrackingRedirectUrl = (
|
||||
invocation: Pick<CyclotronJobInvocationHogFunction, 'functionId' | 'id'>,
|
||||
targetUrl: string
|
||||
): string => {
|
||||
return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/redirect?${generateEmailTrackingCode(invocation)}&target=${encodeURIComponent(targetUrl)}`
|
||||
return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/redirect?ph_id=${generateEmailTrackingCode(invocation)}&target=${encodeURIComponent(targetUrl)}`
|
||||
}
|
||||
|
||||
export const addTrackingToEmail = (html: string, invocation: CyclotronJobInvocationHogFunction): string => {
|
||||
@@ -90,12 +69,16 @@ export const addTrackingToEmail = (html: string, invocation: CyclotronJobInvocat
|
||||
}
|
||||
|
||||
export class EmailTrackingService {
|
||||
private sesWebhookHandler: SesWebhookHandler
|
||||
|
||||
constructor(
|
||||
private hub: Hub,
|
||||
private hogFunctionManager: HogFunctionManagerService,
|
||||
private hogFlowManager: HogFlowManagerService,
|
||||
private hogFunctionMonitoringService: HogFunctionMonitoringService
|
||||
) {}
|
||||
) {
|
||||
this.sesWebhookHandler = new SesWebhookHandler()
|
||||
}
|
||||
|
||||
private async trackMetric({
|
||||
functionId,
|
||||
@@ -106,7 +89,7 @@ export class EmailTrackingService {
|
||||
functionId?: string
|
||||
invocationId?: string
|
||||
metricName: MinimalAppMetric['metric_name']
|
||||
source: 'mailjet' | 'direct'
|
||||
source: 'mailjet' | 'direct' | 'ses'
|
||||
}): Promise<void> {
|
||||
if (!functionId || !invocationId) {
|
||||
logger.error('[EmailTrackingService] trackMetric: Invalid custom ID', {
|
||||
@@ -159,9 +142,7 @@ export class EmailTrackingService {
|
||||
})
|
||||
}
|
||||
|
||||
public async handleMailjetWebhook(
|
||||
req: ModifiedRequest
|
||||
): Promise<{ status: number; message?: string; metrics?: AppMetricType[] }> {
|
||||
public async handleMailjetWebhook(req: ModifiedRequest): Promise<{ status: number; message?: string }> {
|
||||
const okResponse = { status: 200, message: 'OK' }
|
||||
|
||||
if (!req.rawBody) {
|
||||
@@ -195,6 +176,35 @@ export class EmailTrackingService {
|
||||
}
|
||||
}
|
||||
|
||||
public async handleSesWebhook(req: ModifiedRequest): Promise<{ status: number; message?: string }> {
|
||||
if (!req.rawBody) {
|
||||
return { status: 403, message: 'Missing request body' }
|
||||
}
|
||||
|
||||
try {
|
||||
const { status, body, metrics } = await this.sesWebhookHandler.handleWebhook({
|
||||
body: req.rawBody,
|
||||
headers: req.headers,
|
||||
verifySignature: true,
|
||||
})
|
||||
|
||||
for (const metric of metrics || []) {
|
||||
await this.trackMetric({
|
||||
functionId: metric.functionId,
|
||||
invocationId: metric.invocationId,
|
||||
metricName: metric.metricName,
|
||||
source: 'ses',
|
||||
})
|
||||
}
|
||||
|
||||
return { status, message: body as string }
|
||||
} catch (error) {
|
||||
emailTrackingErrorsCounter.inc({ error_type: error.name || 'unknown' })
|
||||
logger.error('[EmailService] handleWebhook: SES webhook error', { error })
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
public async handleEmailTrackingPixel(req: ModifiedRequest, res: express.Response): Promise<void> {
|
||||
// NOTE: this is somewhat naieve. We should expand with UA checking for things like apple's tracking prevention etc.
|
||||
const { ph_fn_id, ph_inv_id } = req.query
|
||||
|
||||
@@ -45,11 +45,19 @@ describe('EmailService', () => {
|
||||
await insertIntegration(hub.postgres, team.id, {
|
||||
id: 1,
|
||||
kind: 'email',
|
||||
config: { email: 'test@posthog.com', name: 'Test User', domain: 'posthog.com', mailjet_verified: true },
|
||||
config: {
|
||||
email: 'test@posthog.com',
|
||||
name: 'Test User',
|
||||
domain: 'posthog.com',
|
||||
verified: true,
|
||||
provider: 'mailjet',
|
||||
},
|
||||
})
|
||||
invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' })
|
||||
invocation.id = 'invocation-1'
|
||||
invocation.state.vmState = { stack: [] } as any
|
||||
invocation.state.vmState = {
|
||||
stack: [],
|
||||
} as any
|
||||
invocation.queueParameters = createEmailParams({ from: { integrationId: 1, email: 'test@posthog.com' } })
|
||||
})
|
||||
describe('integration validation', () => {
|
||||
@@ -61,7 +69,7 @@ describe('EmailService', () => {
|
||||
email: 'test@other-domain.com',
|
||||
name: 'Test User',
|
||||
domain: 'other-domain.com',
|
||||
mailjet_verified: false,
|
||||
verified: false,
|
||||
},
|
||||
})
|
||||
await insertIntegration(hub.postgres, team.id, {
|
||||
@@ -131,7 +139,7 @@ describe('EmailService', () => {
|
||||
[
|
||||
"https://api.mailjet.com/v3.1/send",
|
||||
{
|
||||
"body": "{"Messages":[{"From":{"Email":"test@posthog.com","Name":"Test User"},"To":[{"Email":"test@example.com","Name":"Test User"}],"Subject":"Test Subject","TextPart":"Test Text","HTMLPart":"Test HTML","EventPayload":"ph_fn_id=function-1&ph_inv_id=invocation-1"}]}",
|
||||
"body": "{"Messages":[{"From":{"Email":"test@posthog.com","Name":"Test User"},"To":[{"Email":"test@example.com","Name":"Test User"}],"Subject":"Test Subject","TextPart":"Test Text","HTMLPart":"Test HTML","EventPayload":"ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE"}]}",
|
||||
"headers": {
|
||||
"Authorization": "Basic bWFpbGpldC1wdWJsaWMta2V5Om1haWxqZXQtc2VjcmV0LWtleQ==",
|
||||
"Content-Type": "application/json",
|
||||
@@ -144,10 +152,9 @@ describe('EmailService', () => {
|
||||
})
|
||||
})
|
||||
})
|
||||
describe('native email sending', () => {
|
||||
describe('native email sending with maildev', () => {
|
||||
let invocation: CyclotronJobInvocationHogFunction
|
||||
const mailDevAPI = new MailDevAPI()
|
||||
|
||||
beforeEach(async () => {
|
||||
const actualFetch = jest.requireActual('~/utils/request').fetch as jest.Mock
|
||||
mockFetch.mockImplementation((...args: any[]): Promise<any> => {
|
||||
@@ -158,41 +165,36 @@ describe('EmailService', () => {
|
||||
await insertIntegration(hub.postgres, team.id, {
|
||||
id: 1,
|
||||
kind: 'email',
|
||||
config: { email: 'test@posthog.com', name: 'Test User', domain: 'posthog.com', mailjet_verified: true },
|
||||
config: {
|
||||
email: 'test@posthog.com',
|
||||
name: 'Test User',
|
||||
domain: 'posthog.com',
|
||||
verified: true,
|
||||
provider: 'maildev',
|
||||
},
|
||||
})
|
||||
invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' })
|
||||
invocation.id = 'invocation-1'
|
||||
invocation.state.vmState = { stack: [] } as any
|
||||
invocation.state.vmState = {
|
||||
stack: [],
|
||||
} as any
|
||||
invocation.queueParameters = createEmailParams({ from: { integrationId: 1, email: 'test@posthog.com' } })
|
||||
await mailDevAPI.clearEmails()
|
||||
})
|
||||
|
||||
it('should send an email', async () => {
|
||||
const result = await service.executeSendEmail(invocation)
|
||||
expect(result.error).toBeUndefined()
|
||||
|
||||
await waitForExpect(async () => expect(mailDevAPI.getEmails()).resolves.toHaveLength(1))
|
||||
const emails = await mailDevAPI.getEmails()
|
||||
expect(emails).toHaveLength(1)
|
||||
expect(emails[0]).toMatchObject({
|
||||
from: [
|
||||
{
|
||||
address: 'test@posthog.com',
|
||||
name: 'Test User',
|
||||
},
|
||||
],
|
||||
from: [{ address: 'test@posthog.com', name: 'Test User' }],
|
||||
html: 'Test HTML',
|
||||
subject: 'Test Subject',
|
||||
text: 'Test Text',
|
||||
to: [
|
||||
{
|
||||
address: 'test@example.com',
|
||||
name: 'Test User',
|
||||
},
|
||||
],
|
||||
to: [{ address: 'test@example.com', name: 'Test User' }],
|
||||
})
|
||||
})
|
||||
|
||||
it('should include tracking code in the email', async () => {
|
||||
invocation.queueParameters = createEmailParams({
|
||||
html: '<body>Hi! <a href="https://example.com">Click me</a></body>',
|
||||
@@ -202,8 +204,93 @@ describe('EmailService', () => {
|
||||
const emails = await mailDevAPI.getEmails()
|
||||
expect(emails).toHaveLength(1)
|
||||
expect(emails[0].html).toEqual(
|
||||
`<body>Hi! <a href="http://localhost:8010/public/m/redirect?ph_fn_id=function-1&ph_inv_id=invocation-1&target=https%3A%2F%2Fexample.com">Click me</a><img src="http://localhost:8010/public/m/pixel?ph_fn_id=function-1&ph_inv_id=invocation-1" style="display: none;" /></body>`
|
||||
`<body>Hi! <a href="http://localhost:8010/public/m/redirect?ph_id=ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE&target=https%3A%2F%2Fexample.com">Click me</a><img src="http://localhost:8010/public/m/pixel?ph_id=ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE" style="display: none;" /></body>`
|
||||
)
|
||||
})
|
||||
})
|
||||
describe('native email sending with ses', () => {
|
||||
let invocation: CyclotronJobInvocationHogFunction
|
||||
let sendEmailSpy: jest.SpyInstance
|
||||
beforeEach(async () => {
|
||||
const actualFetch = jest.requireActual('~/utils/request').fetch as jest.Mock
|
||||
mockFetch.mockImplementation((...args: any[]): Promise<any> => {
|
||||
return actualFetch(...args) as any
|
||||
})
|
||||
hub.MAILJET_PUBLIC_KEY = ''
|
||||
hub.MAILJET_SECRET_KEY = ''
|
||||
await insertIntegration(hub.postgres, team.id, {
|
||||
id: 1,
|
||||
kind: 'email',
|
||||
config: {
|
||||
email: 'test@posthog-test.com',
|
||||
name: 'Test User',
|
||||
domain: 'posthog-test.com',
|
||||
verified: true,
|
||||
provider: 'ses',
|
||||
},
|
||||
})
|
||||
invocation = createExampleInvocation({ team_id: team.id, id: 'function-1' })
|
||||
invocation.id = 'invocation-1'
|
||||
invocation.state.vmState = {
|
||||
stack: [],
|
||||
} as any
|
||||
invocation.queueParameters = createEmailParams({
|
||||
from: { integrationId: 1, email: 'test@posthog-test.com' },
|
||||
})
|
||||
sendEmailSpy = jest.spyOn(service.ses, 'sendEmail')
|
||||
|
||||
// Check if identity exists before trying to delete it to avoid localstack bug
|
||||
await service.ses
|
||||
.deleteIdentity({ Identity: 'posthog-test.com' })
|
||||
.promise()
|
||||
.catch(() => {}) // Ensure the domain is deleted - we dont care if it fails
|
||||
})
|
||||
|
||||
it('should error if not verified', async () => {
|
||||
const result = await service.executeSendEmail(invocation)
|
||||
expect(result.error).toEqual(
|
||||
'Failed to send email via SES: Email address not verified "Test User" <test@posthog-test.com>'
|
||||
)
|
||||
})
|
||||
|
||||
it('should send an email if verified', async () => {
|
||||
// Localstack auto-approves verification
|
||||
await service.ses.verifyDomainIdentity({ Domain: 'posthog-test.com' }).promise()
|
||||
const result = await service.executeSendEmail(invocation)
|
||||
expect(result.error).toBeUndefined()
|
||||
expect(sendEmailSpy.mock.calls[0][0]).toMatchInlineSnapshot(`
|
||||
{
|
||||
"ConfigurationSetName": "posthog-messaging",
|
||||
"Destination": {
|
||||
"ToAddresses": [
|
||||
""Test User" <test@example.com>",
|
||||
],
|
||||
},
|
||||
"Message": {
|
||||
"Body": {
|
||||
"Html": {
|
||||
"Charset": "UTF-8",
|
||||
"Data": "Test HTML",
|
||||
},
|
||||
"Text": {
|
||||
"Charset": "UTF-8",
|
||||
"Data": "Test Text",
|
||||
},
|
||||
},
|
||||
"Subject": {
|
||||
"Charset": "UTF-8",
|
||||
"Data": "Test Subject",
|
||||
},
|
||||
},
|
||||
"Source": ""Test User" <test@posthog-test.com>",
|
||||
"Tags": [
|
||||
{
|
||||
"Name": "ph_id",
|
||||
"Value": "ZnVuY3Rpb24tMTppbnZvY2F0aW9uLTE",
|
||||
},
|
||||
],
|
||||
}
|
||||
`)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import AWS from 'aws-sdk'
|
||||
|
||||
import { CyclotronJobInvocationHogFunction, CyclotronJobInvocationResult, IntegrationType } from '~/cdp/types'
|
||||
import { createAddLogFunction, logEntry } from '~/cdp/utils'
|
||||
import { createInvocationResult } from '~/cdp/utils/invocation-utils'
|
||||
@@ -5,11 +7,21 @@ import { CyclotronInvocationQueueParametersEmailType } from '~/schema/cyclotron'
|
||||
import { fetch } from '~/utils/request'
|
||||
|
||||
import { Hub } from '../../../types'
|
||||
import { addTrackingToEmail, generateEmailTrackingCode } from './email-tracking.service'
|
||||
import { addTrackingToEmail } from './email-tracking.service'
|
||||
import { mailDevTransport, mailDevWebUrl } from './helpers/maildev'
|
||||
import { generateEmailTrackingCode } from './helpers/tracking-code'
|
||||
|
||||
export class EmailService {
|
||||
constructor(private hub: Hub) {}
|
||||
ses: AWS.SES
|
||||
|
||||
constructor(private hub: Hub) {
|
||||
this.ses = new AWS.SES({
|
||||
accessKeyId: this.hub.SES_ACCESS_KEY_ID,
|
||||
secretAccessKey: this.hub.SES_SECRET_ACCESS_KEY,
|
||||
region: this.hub.SES_REGION,
|
||||
endpoint: this.hub.SES_ENDPOINT || undefined,
|
||||
})
|
||||
}
|
||||
|
||||
// Send email
|
||||
public async executeSendEmail(
|
||||
@@ -40,13 +52,16 @@ export class EmailService {
|
||||
|
||||
this.validateEmailDomain(integration, params)
|
||||
|
||||
switch (this.getEmailDeliveryMode()) {
|
||||
switch (integration.config.provider ?? 'mailjet') {
|
||||
case 'maildev':
|
||||
await this.sendEmailWithMaildev(result, params)
|
||||
break
|
||||
case 'mailjet':
|
||||
await this.sendEmailWithMailjet(result, params)
|
||||
break
|
||||
case 'ses':
|
||||
await this.sendEmailWithSES(result, params)
|
||||
break
|
||||
case 'unsupported':
|
||||
throw new Error('Email delivery mode not supported')
|
||||
}
|
||||
@@ -82,7 +97,7 @@ export class EmailService {
|
||||
): void {
|
||||
// Currently we enforce using the name and email set on the integration
|
||||
|
||||
if (!integration.config.mailjet_verified) {
|
||||
if (!integration.config.verified) {
|
||||
throw new Error('The selected email integration domain is not verified')
|
||||
}
|
||||
|
||||
@@ -94,17 +109,6 @@ export class EmailService {
|
||||
params.from.name = integration.config.name
|
||||
}
|
||||
|
||||
private getEmailDeliveryMode(): 'mailjet' | 'maildev' | 'unsupported' {
|
||||
if (this.hub.MAILJET_PUBLIC_KEY && this.hub.MAILJET_SECRET_KEY) {
|
||||
return 'mailjet'
|
||||
}
|
||||
|
||||
if (mailDevTransport) {
|
||||
return 'maildev'
|
||||
}
|
||||
return 'unsupported'
|
||||
}
|
||||
|
||||
private async sendEmailWithMailjet(
|
||||
result: CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>,
|
||||
params: CyclotronInvocationQueueParametersEmailType
|
||||
@@ -168,4 +172,46 @@ export class EmailService {
|
||||
|
||||
result.logs.push(logEntry('debug', `Email sent to your local maildev server: ${mailDevWebUrl}`))
|
||||
}
|
||||
|
||||
private async sendEmailWithSES(
|
||||
result: CyclotronJobInvocationResult<CyclotronJobInvocationHogFunction>,
|
||||
params: CyclotronInvocationQueueParametersEmailType
|
||||
): Promise<void> {
|
||||
const trackingCode = generateEmailTrackingCode(result.invocation)
|
||||
const htmlWithTracking = addTrackingToEmail(params.html, result.invocation)
|
||||
|
||||
const sendEmailParams = {
|
||||
Source: params.from.name ? `"${params.from.name}" <${params.from.email}>` : params.from.email,
|
||||
Destination: {
|
||||
ToAddresses: [params.to.name ? `"${params.to.name}" <${params.to.email}>` : params.to.email],
|
||||
},
|
||||
Message: {
|
||||
Subject: {
|
||||
Data: params.subject,
|
||||
Charset: 'UTF-8',
|
||||
},
|
||||
Body: {
|
||||
Html: {
|
||||
Data: htmlWithTracking,
|
||||
Charset: 'UTF-8',
|
||||
},
|
||||
Text: {
|
||||
Data: params.text,
|
||||
Charset: 'UTF-8',
|
||||
},
|
||||
},
|
||||
},
|
||||
ConfigurationSetName: 'posthog-messaging', // This triggers the SNS notifications for email tracking
|
||||
Tags: [{ Name: 'ph_id', Value: trackingCode }],
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await this.ses.sendEmail(sendEmailParams).promise()
|
||||
if (!response.MessageId) {
|
||||
throw new Error('No messageId returned from SES')
|
||||
}
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to send email via SES: ${error.message}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
357
plugin-server/src/cdp/services/messaging/helpers/ses.ts
Normal file
357
plugin-server/src/cdp/services/messaging/helpers/ses.ts
Normal file
@@ -0,0 +1,357 @@
|
||||
import crypto from 'node:crypto'
|
||||
import { z } from 'zod'
|
||||
|
||||
import { MinimalAppMetric } from '~/cdp/types'
|
||||
import { parseJSON } from '~/utils/json-parse'
|
||||
import { logger } from '~/utils/logger'
|
||||
import { fetch } from '~/utils/request'
|
||||
|
||||
import { parseEmailTrackingCode } from './tracking-code'
|
||||
|
||||
/**
|
||||
* ---------- SNS envelope types ----------
|
||||
* If raw_message_delivery=false (default), SNS wraps your message in this envelope.
|
||||
*/
|
||||
const SnsEnvelopeSchema = z.object({
|
||||
Type: z.enum(['SubscriptionConfirmation', 'Notification', 'UnsubscribeConfirmation']),
|
||||
MessageId: z.string(),
|
||||
Token: z.string().optional(),
|
||||
TopicArn: z.string(),
|
||||
Subject: z.string().optional(),
|
||||
Message: z.string(), // either SES event JSON (Notification) or a confirmation message
|
||||
Timestamp: z.string(),
|
||||
SignatureVersion: z.enum(['1']),
|
||||
Signature: z.string(),
|
||||
SigningCertURL: z.string().url(),
|
||||
UnsubscribeURL: z.string().url().optional(),
|
||||
})
|
||||
|
||||
export type SnsEnvelope = z.infer<typeof SnsEnvelopeSchema>
|
||||
|
||||
/**
|
||||
* ---------- SES event types ----------
|
||||
* AWS posts an array of records in the "Message" (or directly as body if raw_message_delivery=true).
|
||||
* We model the common fields and create specific event detail types.
|
||||
*/
|
||||
const SesMailSchema = z.object({
|
||||
timestamp: z.string(),
|
||||
source: z.string(), // From address
|
||||
messageId: z.string(),
|
||||
destination: z.array(z.string()),
|
||||
headersTruncated: z.boolean().optional(),
|
||||
headers: z.array(z.object({ name: z.string(), value: z.string() })).optional(),
|
||||
tags: z.record(z.array(z.string())).optional(), // your message tags: { user_id: ["u_123"] }
|
||||
})
|
||||
|
||||
const SesCommonEventBase = z.object({
|
||||
eventType: z.enum([
|
||||
'Send',
|
||||
'Reject',
|
||||
'Bounce',
|
||||
'Complaint',
|
||||
'Delivery',
|
||||
'Open',
|
||||
'Click',
|
||||
'RenderingFailure',
|
||||
'DeliveryDelay',
|
||||
]),
|
||||
mail: SesMailSchema,
|
||||
})
|
||||
|
||||
const SesOpenEventSchema = SesCommonEventBase.extend({
|
||||
eventType: z.literal('Open'),
|
||||
open: z.object({
|
||||
ipAddress: z.string().optional(),
|
||||
userAgent: z.string().optional(),
|
||||
timestamp: z.string(),
|
||||
}),
|
||||
})
|
||||
|
||||
const SesClickEventSchema = SesCommonEventBase.extend({
|
||||
eventType: z.literal('Click'),
|
||||
click: z.object({
|
||||
ipAddress: z.string().optional(),
|
||||
link: z.string(),
|
||||
userAgent: z.string().optional(),
|
||||
timestamp: z.string(),
|
||||
}),
|
||||
})
|
||||
|
||||
const SesDeliveryEventSchema = SesCommonEventBase.extend({
|
||||
eventType: z.literal('Delivery'),
|
||||
delivery: z.object({
|
||||
processingTimeMillis: z.number().optional(),
|
||||
smtpResponse: z.string().optional(),
|
||||
reportingMTA: z.string().optional(),
|
||||
timestamp: z.string(),
|
||||
recipients: z.array(z.string()).optional(),
|
||||
}),
|
||||
})
|
||||
|
||||
const SesBounceEventSchema = SesCommonEventBase.extend({
|
||||
eventType: z.literal('Bounce'),
|
||||
bounce: z.object({
|
||||
bounceType: z.enum(['Undetermined', 'Permanent', 'Transient']),
|
||||
bounceSubType: z.string().optional(),
|
||||
bouncedRecipients: z.array(
|
||||
z.object({
|
||||
emailAddress: z.string(),
|
||||
action: z.string().optional(),
|
||||
status: z.string().optional(),
|
||||
diagnosticCode: z.string().optional(),
|
||||
})
|
||||
),
|
||||
timestamp: z.string(),
|
||||
reportingMTA: z.string().optional(),
|
||||
}),
|
||||
})
|
||||
|
||||
const SesComplaintEventSchema = SesCommonEventBase.extend({
|
||||
eventType: z.literal('Complaint'),
|
||||
complaint: z.object({
|
||||
complainedRecipients: z.array(z.object({ emailAddress: z.string() })),
|
||||
timestamp: z.string(),
|
||||
complaintFeedbackType: z.string().optional(),
|
||||
userAgent: z.string().optional(),
|
||||
feedbackId: z.string().optional(),
|
||||
}),
|
||||
})
|
||||
|
||||
const SesRenderingFailureSchema = SesCommonEventBase.extend({
|
||||
eventType: z.literal('RenderingFailure'),
|
||||
renderingFailure: z.object({
|
||||
errorMessage: z.string(),
|
||||
templateName: z.string().optional(),
|
||||
}),
|
||||
})
|
||||
|
||||
const SesSendEventSchema = SesCommonEventBase.extend({ eventType: z.literal('Send') })
|
||||
const SesRejectEventSchema = SesCommonEventBase.extend({ eventType: z.literal('Reject') })
|
||||
|
||||
const SesEventRecordSchema = z.union([
|
||||
SesOpenEventSchema,
|
||||
SesClickEventSchema,
|
||||
SesDeliveryEventSchema,
|
||||
SesBounceEventSchema,
|
||||
SesComplaintEventSchema,
|
||||
SesRenderingFailureSchema,
|
||||
SesSendEventSchema,
|
||||
SesRejectEventSchema,
|
||||
])
|
||||
|
||||
const SesEventBatchSchema = z.array(SesEventRecordSchema)
|
||||
|
||||
export type SesEventRecord = z.infer<typeof SesEventRecordSchema>
|
||||
|
||||
const EVENT_TYPE_TO_METRIC_NAME: Record<SesEventRecord['eventType'], MinimalAppMetric['metric_name']> = {
|
||||
Open: 'email_opened',
|
||||
Click: 'email_link_clicked',
|
||||
// Delivery: 'email_sent',
|
||||
Bounce: 'email_bounced',
|
||||
Complaint: 'email_blocked',
|
||||
RenderingFailure: 'email_failed',
|
||||
Send: 'email_sent',
|
||||
Reject: 'email_failed',
|
||||
Delivery: 'email_sent',
|
||||
}
|
||||
|
||||
export class SesWebhookHandler {
|
||||
certCache: Record<string, Promise<string> | undefined> = {}
|
||||
|
||||
private async fetchText(url: string): Promise<string> {
|
||||
const response = await fetch(url)
|
||||
if (response.status >= 400) {
|
||||
throw new Error(`Failed to fetch ${url}: ${response.status}`)
|
||||
}
|
||||
return await response.text()
|
||||
}
|
||||
|
||||
private async fetchCert(url: string): Promise<string> {
|
||||
// Validate that the URL is from AWS SNS
|
||||
if (!this.isValidSnsCertUrl(url)) {
|
||||
throw new Error(`Invalid SNS certificate URL: ${url}`)
|
||||
}
|
||||
|
||||
if (this.certCache[url]) {
|
||||
return await this.certCache[url]!
|
||||
}
|
||||
this.certCache[url] = this.fetchText(url)
|
||||
return this.certCache[url]!
|
||||
}
|
||||
|
||||
private isValidSnsCertUrl(url: string): boolean {
|
||||
try {
|
||||
const parsedUrl = new URL(url)
|
||||
|
||||
// Must be HTTPS
|
||||
if (parsedUrl.protocol !== 'https:') {
|
||||
return false
|
||||
}
|
||||
|
||||
// Must be from sns.{region}.amazonaws.com
|
||||
const hostname = parsedUrl.hostname
|
||||
if (!hostname.match(/^sns\.[a-z0-9-]+\.amazonaws\.com$/)) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Must end with .pem
|
||||
if (!parsedUrl.pathname.endsWith('.pem')) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse incoming body accounting for SNS raw vs envelope
|
||||
*/
|
||||
private parseIncomingBody(
|
||||
body: unknown
|
||||
):
|
||||
| { mode: 'raw' | 'sns'; records: SesEventRecord[] }
|
||||
| { mode: 'sns'; envelope: SnsEnvelope; records?: SesEventRecord[] } {
|
||||
// If it's already an object with "Type", it's probably the SNS envelope (raw=false)
|
||||
if (body && typeof body === 'object' && 'Type' in (body as any)) {
|
||||
const env = SnsEnvelopeSchema.parse(body)
|
||||
if (env.Type === 'Notification') {
|
||||
const inner = parseJSON(env.Message)
|
||||
const records = SesEventBatchSchema.parse(inner)
|
||||
return { mode: 'sns', envelope: env, records }
|
||||
}
|
||||
// For non-Notification, return envelope; caller decides how to handle
|
||||
return { mode: 'sns', envelope: env }
|
||||
}
|
||||
|
||||
// raw_message_delivery=true → body is already the SES array
|
||||
const records = SesEventBatchSchema.parse(body)
|
||||
return { mode: 'raw', records }
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify SNS signature
|
||||
* Best practice: verify the signature unless you're behind a trusted ALB / private VPC.
|
||||
*/
|
||||
private async verifySnsSignature(envelope: SnsEnvelope): Promise<boolean> {
|
||||
try {
|
||||
// 1) Fetch cert
|
||||
const cert = await this.fetchCert(envelope.SigningCertURL)
|
||||
// 2) Build string to sign (per SNS docs)
|
||||
const stringToSign = this.buildStringToSign(envelope)
|
||||
// 3) Verify
|
||||
const verifier = crypto.createVerify('RSA-SHA1') // SNS SignatureVersion=1 uses SHA1
|
||||
verifier.update(stringToSign, 'utf8')
|
||||
return verifier.verify(cert, envelope.Signature, 'base64')
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build string to sign for SNS signature verification
|
||||
*/
|
||||
private buildStringToSign(m: SnsEnvelope): string {
|
||||
// Follows AWS SNS docs for SignatureVersion=1
|
||||
// For Notification:
|
||||
// "Message\n{Message}\nMessageId\n{MessageId}\nSubject\n{Subject}\nTimestamp\n{Timestamp}\nTopicArn\n{TopicArn}\nType\n{Type}\n"
|
||||
// Subject line omitted if not present.
|
||||
const lines: string[] = []
|
||||
const pushKV = (k: string, v?: string) => {
|
||||
if (v !== undefined) {
|
||||
lines.push(k)
|
||||
lines.push(v)
|
||||
}
|
||||
}
|
||||
if (m.Type === 'Notification') {
|
||||
pushKV('Message', m.Message)
|
||||
pushKV('MessageId', m.MessageId)
|
||||
pushKV('Subject', m.Subject)
|
||||
pushKV('Timestamp', m.Timestamp)
|
||||
pushKV('TopicArn', m.TopicArn)
|
||||
pushKV('Type', m.Type)
|
||||
} else if (m.Type === 'SubscriptionConfirmation' || m.Type === 'UnsubscribeConfirmation') {
|
||||
pushKV('Message', m.Message)
|
||||
pushKV('MessageId', m.MessageId)
|
||||
pushKV('SubscribeURL', (m as any).SubscribeURL) // present in confirmation payload body, not in envelope schema
|
||||
pushKV('Timestamp', m.Timestamp)
|
||||
pushKV('Token', m.Token!)
|
||||
pushKV('TopicArn', m.TopicArn)
|
||||
pushKV('Type', m.Type)
|
||||
}
|
||||
return lines.join('\n') + '\n'
|
||||
}
|
||||
|
||||
async handleWebhook(opts: {
|
||||
body: any
|
||||
headers: Record<string, string | string[] | undefined>
|
||||
verifySignature?: boolean
|
||||
}): Promise<{
|
||||
status: number
|
||||
body: unknown
|
||||
metrics?: {
|
||||
functionId?: string
|
||||
invocationId?: string
|
||||
metricName: MinimalAppMetric['metric_name']
|
||||
}[]
|
||||
}> {
|
||||
logger.info('[SesWebhookHandler] handleWebhook', { body: opts.body, headers: opts.headers })
|
||||
const parsed = this.parseIncomingBody(opts.body)
|
||||
|
||||
logger.info('[SesWebhookHandler] parsed', { parsed })
|
||||
|
||||
// If SNS envelope present and verification requested, verify signature
|
||||
if ('envelope' in parsed && opts.verifySignature) {
|
||||
logger.info('[SesWebhookHandler] verifying signature', { envelope: parsed.envelope })
|
||||
const ok = await this.verifySnsSignature(parsed.envelope)
|
||||
logger.info('[SesWebhookHandler] signature verified', { ok })
|
||||
if (!ok) {
|
||||
return { status: 403, body: { error: 'Invalid SNS signature' } }
|
||||
}
|
||||
}
|
||||
|
||||
// Handle confirmation flow
|
||||
if (parsed.mode === 'sns' && 'envelope' in parsed && parsed.envelope?.Type === 'SubscriptionConfirmation') {
|
||||
logger.info('[SesWebhookHandler] confirming subscription', { envelope: parsed.envelope })
|
||||
// Confirm by visiting SubscribeURL (contained in the *message JSON*, not envelope.Message field here)
|
||||
// We need to fetch the inner message JSON to get SubscribeURL
|
||||
const env = parsed.envelope
|
||||
const inner = parseJSON(env.Message) as { SubscribeURL?: string }
|
||||
logger.info('[SesWebhookHandler] confirming subscription', { inner })
|
||||
if (inner.SubscribeURL) {
|
||||
await this.fetchText(inner.SubscribeURL)
|
||||
}
|
||||
return { status: 200, body: { ok: true } }
|
||||
}
|
||||
|
||||
if (parsed.mode === 'sns' && 'envelope' in parsed && parsed.envelope?.Type === 'UnsubscribeConfirmation') {
|
||||
logger.info('[SesWebhookHandler] confirming unsubscribe', { envelope: parsed.envelope })
|
||||
return { status: 200, body: { ok: true } }
|
||||
}
|
||||
|
||||
// Process SES events
|
||||
const records = parsed.mode === 'raw' ? parsed.records : parsed.records!
|
||||
const metrics: {
|
||||
functionId?: string
|
||||
invocationId?: string
|
||||
metricName: MinimalAppMetric['metric_name']
|
||||
}[] = []
|
||||
|
||||
for (const rec of records) {
|
||||
logger.info('[SesWebhookHandler] processing record', { rec })
|
||||
const tags = rec.mail.tags
|
||||
const { functionId, invocationId } = parseEmailTrackingCode(tags?.ph_id?.[0] || '') || {}
|
||||
|
||||
if (!functionId && !invocationId) {
|
||||
logger.error('[SesWebhookHandler] handleWebhook: No functionId or invocationId found', { rec })
|
||||
continue
|
||||
}
|
||||
|
||||
const metricName = EVENT_TYPE_TO_METRIC_NAME[rec.eventType]
|
||||
metrics.push({ functionId, invocationId, metricName })
|
||||
}
|
||||
|
||||
return { status: 200, body: { ok: true }, metrics }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
import { CyclotronJobInvocationHogFunction } from '~/cdp/types'
|
||||
import { defaultConfig } from '~/config/config'
|
||||
|
||||
function toBase64UrlSafe(input: string) {
|
||||
// Encode to normal base64
|
||||
const b64 = Buffer.from(input, 'utf8').toString('base64')
|
||||
// Make URL safe and strip padding
|
||||
return b64.replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '')
|
||||
}
|
||||
|
||||
function fromBase64UrlSafe(b64url: string) {
|
||||
// Restore base64 from URL-safe variant
|
||||
let b64 = b64url.replace(/-/g, '+').replace(/_/g, '/')
|
||||
// Pad to length multiple of 4
|
||||
while (b64.length % 4) {
|
||||
b64 += '='
|
||||
}
|
||||
return Buffer.from(b64, 'base64').toString('utf8')
|
||||
}
|
||||
|
||||
export const parseEmailTrackingCode = (
|
||||
encodedTrackingCode: string
|
||||
): { functionId: string; invocationId: string } | null => {
|
||||
// customId is like ph_fn_id=function-1&ph_inv_id=invocation-1
|
||||
const decodedTrackingCode = fromBase64UrlSafe(encodedTrackingCode)
|
||||
try {
|
||||
const [functionId, invocationId] = decodedTrackingCode.split(':')
|
||||
if (!functionId || !invocationId) {
|
||||
return null
|
||||
}
|
||||
return { functionId, invocationId }
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export const generateEmailTrackingCode = (
|
||||
invocation: Pick<CyclotronJobInvocationHogFunction, 'functionId' | 'id'>
|
||||
): string => {
|
||||
// Generate a base64 encoded string free of equal signs
|
||||
return toBase64UrlSafe(`${invocation.functionId}:${invocation.id}`)
|
||||
}
|
||||
|
||||
export const generateEmailTrackingPixelUrl = (
|
||||
invocation: Pick<CyclotronJobInvocationHogFunction, 'functionId' | 'id'>
|
||||
): string => {
|
||||
return `${defaultConfig.CDP_EMAIL_TRACKING_URL}/public/m/pixel?ph_id=${generateEmailTrackingCode(invocation)}`
|
||||
}
|
||||
@@ -318,6 +318,12 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
// Messaging
|
||||
MAILJET_PUBLIC_KEY: '',
|
||||
MAILJET_SECRET_KEY: '',
|
||||
|
||||
// SES
|
||||
SES_ENDPOINT: isTestEnv() || isDevEnv() ? 'http://localhost:4566' : '',
|
||||
SES_ACCESS_KEY_ID: isTestEnv() || isDevEnv() ? 'test' : '',
|
||||
SES_SECRET_ACCESS_KEY: isTestEnv() || isDevEnv() ? 'test' : '',
|
||||
SES_REGION: 'us-east-1',
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -460,6 +460,12 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig
|
||||
// Messaging
|
||||
MAILJET_PUBLIC_KEY: string
|
||||
MAILJET_SECRET_KEY: string
|
||||
|
||||
// SES
|
||||
SES_ENDPOINT: string
|
||||
SES_ACCESS_KEY_ID: string
|
||||
SES_SECRET_ACCESS_KEY: string
|
||||
SES_REGION: string
|
||||
}
|
||||
|
||||
export interface Hub extends PluginsServerConfig {
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpResponse
|
||||
from django.shortcuts import redirect
|
||||
@@ -35,6 +36,7 @@ from posthog.models.integration import (
|
||||
class NativeEmailIntegrationSerializer(serializers.Serializer):
|
||||
email = serializers.EmailField()
|
||||
name = serializers.CharField()
|
||||
provider = serializers.ChoiceField(choices=["ses", "mailjet", "maildev"] if settings.DEBUG else ["ses", "mailjet"])
|
||||
|
||||
|
||||
class IntegrationSerializer(serializers.ModelSerializer):
|
||||
|
||||
@@ -226,8 +226,8 @@ class TestEmailIntegration:
|
||||
"email": self.valid_config["email"],
|
||||
"name": self.valid_config["name"],
|
||||
"domain": "posthog.com",
|
||||
"mailjet_verified": False,
|
||||
"aws_ses_verified": False,
|
||||
"verified": False,
|
||||
"provider": "mailjet",
|
||||
}
|
||||
assert integration.sensitive_config == {}
|
||||
assert integration.created_by == self.user
|
||||
@@ -274,8 +274,8 @@ class TestEmailIntegration:
|
||||
"email": self.valid_config["email"],
|
||||
"name": self.valid_config["name"],
|
||||
"domain": "posthog.com",
|
||||
"mailjet_verified": False,
|
||||
"aws_ses_verified": False,
|
||||
"verified": False,
|
||||
"provider": "mailjet",
|
||||
}
|
||||
|
||||
@patch("posthog.models.integration.MailjetProvider")
|
||||
@@ -318,8 +318,8 @@ class TestEmailIntegration:
|
||||
"email": self.valid_config["email"],
|
||||
"name": self.valid_config["name"],
|
||||
"domain": "posthog.com",
|
||||
"mailjet_verified": True,
|
||||
"aws_ses_verified": False,
|
||||
"verified": True,
|
||||
"provider": "mailjet",
|
||||
}
|
||||
|
||||
@patch("posthog.models.integration.MailjetProvider")
|
||||
@@ -371,10 +371,10 @@ class TestEmailIntegration:
|
||||
self.user,
|
||||
)
|
||||
|
||||
assert not integration1.config["mailjet_verified"]
|
||||
assert not integration2.config["mailjet_verified"]
|
||||
assert not integrationOtherDomain.config["mailjet_verified"]
|
||||
assert not integrationOtherTeam.config["mailjet_verified"]
|
||||
assert not integration1.config["verified"]
|
||||
assert not integration2.config["verified"]
|
||||
assert not integrationOtherDomain.config["verified"]
|
||||
assert not integrationOtherTeam.config["verified"]
|
||||
|
||||
email_integration = EmailIntegration(integration1)
|
||||
verification_result = email_integration.verify()
|
||||
@@ -385,7 +385,7 @@ class TestEmailIntegration:
|
||||
integrationOtherDomain.refresh_from_db()
|
||||
integrationOtherTeam.refresh_from_db()
|
||||
|
||||
assert integration1.config["mailjet_verified"]
|
||||
assert integration2.config["mailjet_verified"]
|
||||
assert not integrationOtherDomain.config["mailjet_verified"]
|
||||
assert not integrationOtherTeam.config["mailjet_verified"]
|
||||
assert integration1.config["verified"]
|
||||
assert integration2.config["verified"]
|
||||
assert not integrationOtherDomain.config["verified"]
|
||||
assert not integrationOtherTeam.config["verified"]
|
||||
|
||||
@@ -33,7 +33,7 @@ from posthog.models.user import User
|
||||
from posthog.plugins.plugin_server_api import reload_integrations_on_workers
|
||||
from posthog.sync import database_sync_to_async
|
||||
|
||||
from products.messaging.backend.providers import MailjetProvider, TwilioProvider
|
||||
from products.messaging.backend.providers import MailjetProvider, SESProvider, TwilioProvider
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -1075,16 +1075,28 @@ class EmailIntegration:
|
||||
def mailjet_provider(self) -> MailjetProvider:
|
||||
return MailjetProvider()
|
||||
|
||||
@property
|
||||
def ses_provider(self) -> SESProvider:
|
||||
return SESProvider()
|
||||
|
||||
@classmethod
|
||||
def create_native_integration(cls, config: dict, team_id: int, created_by: User | None = None) -> Integration:
|
||||
email_address: str = config["email"]
|
||||
name: str = config["name"]
|
||||
domain: str = email_address.split("@")[1]
|
||||
provider: str = config.get("provider", "mailjet") # Default to mailjet for backward compatibility
|
||||
|
||||
mailjet = MailjetProvider()
|
||||
|
||||
# TODO: Look for integration belonging to the team with the same domain
|
||||
mailjet.create_email_domain(domain, team_id=team_id)
|
||||
# Create domain in the appropriate provider
|
||||
if provider == "ses":
|
||||
ses = SESProvider()
|
||||
ses.create_email_domain(domain, team_id=team_id)
|
||||
elif provider == "mailjet":
|
||||
mailjet = MailjetProvider()
|
||||
mailjet.create_email_domain(domain, team_id=team_id)
|
||||
elif provider == "maildev" and settings.DEBUG:
|
||||
pass
|
||||
else:
|
||||
raise ValueError(f"Invalid provider: must be either 'ses' or 'mailjet'")
|
||||
|
||||
integration, created = Integration.objects.update_or_create(
|
||||
team_id=team_id,
|
||||
@@ -1095,8 +1107,8 @@ class EmailIntegration:
|
||||
"email": email_address,
|
||||
"domain": domain,
|
||||
"name": name,
|
||||
"mailjet_verified": False,
|
||||
"aws_ses_verified": False,
|
||||
"provider": provider,
|
||||
"verified": True if provider == "maildev" else False,
|
||||
},
|
||||
"created_by": created_by,
|
||||
},
|
||||
@@ -1135,18 +1147,31 @@ class EmailIntegration:
|
||||
|
||||
def verify(self):
|
||||
domain = self.integration.config.get("domain")
|
||||
provider = self.integration.config.get("provider", "mailjet")
|
||||
|
||||
verification_result = self.mailjet_provider.verify_email_domain(domain, team_id=self.integration.team_id)
|
||||
# Use the appropriate provider for verification
|
||||
if provider == "ses":
|
||||
verification_result = self.ses_provider.verify_email_domain(domain, team_id=self.integration.team_id)
|
||||
elif provider == "mailjet":
|
||||
verification_result = self.mailjet_provider.verify_email_domain(domain, team_id=self.integration.team_id)
|
||||
elif provider == "maildev":
|
||||
verification_result = {
|
||||
"status": "success",
|
||||
"dnsRecords": [],
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Invalid provider: {provider}")
|
||||
|
||||
if verification_result.get("status") == "success":
|
||||
# We can validate all other integrations with the same domain
|
||||
# We can validate all other integrations with the same domain and provider
|
||||
other_integrations = Integration.objects.filter(
|
||||
team_id=self.integration.team_id,
|
||||
kind="email",
|
||||
config__domain=domain,
|
||||
config__provider=provider,
|
||||
)
|
||||
for integration in other_integrations:
|
||||
integration.config["mailjet_verified"] = True
|
||||
integration.config["verified"] = True
|
||||
integration.save()
|
||||
|
||||
return verification_result
|
||||
|
||||
@@ -48,6 +48,7 @@ from posthog.settings.session_replay_v2 import *
|
||||
from posthog.settings.integrations import *
|
||||
from posthog.settings.pagerduty import *
|
||||
from posthog.settings.payments import *
|
||||
from posthog.settings.ses import *
|
||||
|
||||
from posthog.settings.utils import get_from_env, str_to_bool
|
||||
|
||||
|
||||
15
posthog/settings/ses.py
Normal file
15
posthog/settings/ses.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from posthog.settings.base_variables import DEBUG, TEST
|
||||
|
||||
if TEST or DEBUG:
|
||||
SES_ENDPOINT = os.getenv("SES_ENDPOINT", "http://localhost:4566")
|
||||
SES_ACCESS_KEY_ID: Optional[str] = os.getenv("SES_ACCESS_KEY_ID", "test")
|
||||
SES_SECRET_ACCESS_KEY: Optional[str] = os.getenv("SES_SECRET_ACCESS_KEY", "test")
|
||||
else:
|
||||
SES_ENDPOINT = os.getenv("SES_ENDPOINT", "")
|
||||
SES_ACCESS_KEY_ID = os.getenv("SES_ACCESS_KEY_ID", "") or None
|
||||
SES_SECRET_ACCESS_KEY = os.getenv("SES_SECRET_ACCESS_KEY", "") or None
|
||||
|
||||
SES_REGION = os.getenv("SES_REGION", "us-east-1")
|
||||
@@ -1,4 +1,5 @@
|
||||
from .mailjet import MailjetProvider
|
||||
from .ses import SESProvider
|
||||
from .twilio import TwilioProvider
|
||||
|
||||
__all__ = ["MailjetProvider", "TwilioProvider"]
|
||||
__all__ = ["MailjetProvider", "TwilioProvider", "SESProvider"]
|
||||
|
||||
165
products/messaging/backend/providers/ses.py
Normal file
165
products/messaging/backend/providers/ses.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import re
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import BotoCoreError, ClientError
|
||||
from rest_framework import exceptions
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SESProvider:
|
||||
def __init__(self):
|
||||
self.access_key_id = self.get_access_key_id()
|
||||
self.secret_access_key = self.get_secret_access_key()
|
||||
self.region = self.get_region()
|
||||
self.endpoint_url = self.get_endpoint_url()
|
||||
|
||||
# Initialize SES client
|
||||
self.client = boto3.client(
|
||||
"ses",
|
||||
aws_access_key_id=self.access_key_id,
|
||||
aws_secret_access_key=self.secret_access_key,
|
||||
region_name=self.region,
|
||||
endpoint_url=self.endpoint_url if self.endpoint_url else None,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_access_key_id(cls) -> str:
|
||||
access_key_id = settings.SES_ACCESS_KEY_ID
|
||||
if not access_key_id:
|
||||
raise ValueError("SES_ACCESS_KEY_ID is not set in environment or settings")
|
||||
return access_key_id
|
||||
|
||||
@classmethod
|
||||
def get_secret_access_key(cls) -> str:
|
||||
secret_access_key = settings.SES_SECRET_ACCESS_KEY
|
||||
if not secret_access_key:
|
||||
raise ValueError("SES_SECRET_ACCESS_KEY is not set in environment or settings")
|
||||
return secret_access_key
|
||||
|
||||
@classmethod
|
||||
def get_region(cls) -> str:
|
||||
return settings.SES_REGION
|
||||
|
||||
@classmethod
|
||||
def get_endpoint_url(cls) -> str | None:
|
||||
return settings.SES_ENDPOINT
|
||||
|
||||
def create_email_domain(self, domain: str, team_id: int):
|
||||
# NOTE: For sesv1 creation is done through verification
|
||||
self.verify_email_domain(domain, team_id)
|
||||
|
||||
def verify_email_domain(self, domain: str, team_id: int):
|
||||
# Validate the domain contains valid characters for a domain name
|
||||
DOMAIN_REGEX = r"(?i)^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$"
|
||||
if not re.match(DOMAIN_REGEX, domain):
|
||||
raise exceptions.ValidationError("Please enter a valid domain or subdomain name.")
|
||||
|
||||
dns_records = []
|
||||
|
||||
# Start/ensure domain verification (TXT at _amazonses.domain) ---
|
||||
verification_token = None
|
||||
try:
|
||||
resp = self.client.verify_domain_identity(Domain=domain)
|
||||
verification_token = resp.get("VerificationToken")
|
||||
except ClientError as e:
|
||||
# If already requested/exists, carry on; SES v1 is idempotent-ish here
|
||||
if e.response["Error"]["Code"] not in ("InvalidParameterValue",):
|
||||
raise
|
||||
|
||||
if verification_token:
|
||||
dns_records.append(
|
||||
{
|
||||
"type": "verification",
|
||||
"recordType": "TXT",
|
||||
"recordHostname": f"_amazonses.{domain}",
|
||||
"recordValue": verification_token,
|
||||
"status": "pending",
|
||||
}
|
||||
)
|
||||
|
||||
# Start/ensure DKIM (three CNAMEs) ---
|
||||
dkim_tokens: list[str] = []
|
||||
try:
|
||||
resp = self.client.verify_domain_dkim(Domain=domain)
|
||||
dkim_tokens = resp.get("DkimTokens", []) or []
|
||||
except ClientError as e:
|
||||
if e.response["Error"]["Code"] not in ("InvalidParameterValue",):
|
||||
raise
|
||||
|
||||
for t in dkim_tokens:
|
||||
dns_records.append(
|
||||
{
|
||||
"type": "dkim",
|
||||
"recordType": "CNAME",
|
||||
"recordHostname": f"{t}._domainkey.{domain}",
|
||||
"recordValue": f"{t}.dkim.amazonses.com",
|
||||
"status": "pending",
|
||||
}
|
||||
)
|
||||
|
||||
dns_records.append(
|
||||
{
|
||||
"type": "spf",
|
||||
"recordType": "TXT",
|
||||
"recordHostname": "@",
|
||||
"recordValue": "v=spf1 include:amazonses.com ~all",
|
||||
"status": "pending",
|
||||
}
|
||||
)
|
||||
|
||||
# Current verification / DKIM statuses to compute overall status & per-record statuses ---
|
||||
try:
|
||||
id_attrs = self.client.get_identity_verification_attributes(Identities=[domain])
|
||||
verification_status = (
|
||||
id_attrs["VerificationAttributes"].get(domain, {}).get("VerificationStatus", "Unknown")
|
||||
)
|
||||
except ClientError:
|
||||
verification_status = "Unknown"
|
||||
|
||||
try:
|
||||
dkim_attrs = self.client.get_identity_dkim_attributes(Identities=[domain])
|
||||
dkim_status = dkim_attrs["DkimAttributes"].get(domain, {}).get("DkimVerificationStatus", "Unknown")
|
||||
except ClientError:
|
||||
dkim_status = "Unknown"
|
||||
|
||||
# Normalize overall status
|
||||
if verification_status == "Success" and dkim_status == "Success":
|
||||
overall = "success"
|
||||
elif "Failed" in (verification_status, dkim_status):
|
||||
overall = "failed"
|
||||
else:
|
||||
overall = "pending"
|
||||
|
||||
# Upgrade per-record statuses if SES reports success
|
||||
# - Domain verification TXT is considered verified when VerificationStatus == Success
|
||||
# - DKIM CNAMEs considered verified when DkimVerificationStatus == Success
|
||||
if verification_status == "Success":
|
||||
for r in dns_records:
|
||||
if r["type"] == "verification":
|
||||
r["status"] = "success"
|
||||
if dkim_status == "Success":
|
||||
for r in dns_records:
|
||||
if r["type"] == "dkim":
|
||||
r["status"] = "success"
|
||||
|
||||
# If MAIL FROM attrs said Success earlier, MX already marked verified
|
||||
|
||||
return {
|
||||
"status": overall,
|
||||
"dnsRecords": dns_records if overall != "success" else [],
|
||||
}
|
||||
|
||||
def delete_identity(self, identity: str):
|
||||
"""
|
||||
Delete an identity from SES
|
||||
"""
|
||||
try:
|
||||
self.client.delete_identity(Identity=identity)
|
||||
logger.info(f"Identity {identity} deleted from SES")
|
||||
except (ClientError, BotoCoreError) as e:
|
||||
logger.exception(f"SES API error deleting identity: {e}")
|
||||
raise
|
||||
131
products/messaging/backend/test/test_ses_provider.py
Normal file
131
products/messaging/backend/test/test_ses_provider.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import pytest
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.test import override_settings
|
||||
|
||||
from products.messaging.backend.providers.ses import SESProvider
|
||||
|
||||
TEST_DOMAIN = "test.posthog.com"
|
||||
|
||||
|
||||
class TestSESProvider(TestCase):
|
||||
def setUp(self):
|
||||
# Remove all domains from SES
|
||||
ses_provider = SESProvider()
|
||||
if TEST_DOMAIN in ses_provider.client.list_identities()["Identities"]:
|
||||
ses_provider.delete_identity(TEST_DOMAIN)
|
||||
|
||||
def test_init_with_valid_credentials(self):
|
||||
with override_settings(
|
||||
SES_ACCESS_KEY_ID="test_access_key",
|
||||
SES_SECRET_ACCESS_KEY="test_secret_key",
|
||||
SES_REGION="us-east-1",
|
||||
SES_ENDPOINT="",
|
||||
):
|
||||
provider = SESProvider()
|
||||
assert provider.access_key_id == "test_access_key"
|
||||
assert provider.secret_access_key == "test_secret_key"
|
||||
assert provider.region == "us-east-1"
|
||||
|
||||
def test_init_missing_access_key(self):
|
||||
with override_settings(SES_ACCESS_KEY_ID="", SES_SECRET_ACCESS_KEY="test_secret_key"):
|
||||
with pytest.raises(ValueError, match="SES_ACCESS_KEY_ID is not set"):
|
||||
SESProvider()
|
||||
|
||||
def test_init_missing_secret_key(self):
|
||||
with override_settings(SES_ACCESS_KEY_ID="test_access_key", SES_SECRET_ACCESS_KEY=""):
|
||||
with pytest.raises(ValueError, match="SES_SECRET_ACCESS_KEY is not set"):
|
||||
SESProvider()
|
||||
|
||||
def test_create_email_domain_success(self):
|
||||
provider = SESProvider()
|
||||
provider.create_email_domain(TEST_DOMAIN, team_id=1)
|
||||
|
||||
@patch("products.messaging.backend.providers.ses.boto3.client")
|
||||
def test_create_email_domain_invalid_domain(self, mock_boto_client):
|
||||
with override_settings(
|
||||
SES_ACCESS_KEY_ID="test_access_key", SES_SECRET_ACCESS_KEY="test_secret_key", SES_REGION="us-east-1"
|
||||
):
|
||||
provider = SESProvider()
|
||||
with pytest.raises(Exception, match="Please enter a valid domain"):
|
||||
provider.create_email_domain("invalid-domain", team_id=1)
|
||||
|
||||
def test_verify_email_domain_initial_setup(self):
|
||||
provider = SESProvider()
|
||||
|
||||
# Mock the client on the provider instance
|
||||
with patch.object(provider, "client") as mock_client:
|
||||
# Mock the verification attributes to return a non-success status
|
||||
mock_client.get_identity_verification_attributes.return_value = {
|
||||
"VerificationAttributes": {
|
||||
TEST_DOMAIN: {
|
||||
"VerificationStatus": "Pending", # Non-success status
|
||||
"VerificationToken": "test-token-123",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Mock DKIM attributes to return a non-success status
|
||||
mock_client.get_identity_dkim_attributes.return_value = {
|
||||
"DkimAttributes": {
|
||||
TEST_DOMAIN: {
|
||||
"DkimVerificationStatus": "Pending" # Non-success status
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Mock the domain verification and DKIM setup calls
|
||||
mock_client.verify_domain_identity.return_value = {"VerificationToken": "test-token-123"}
|
||||
mock_client.verify_domain_dkim.return_value = {"DkimTokens": ["token1", "token2", "token3"]}
|
||||
|
||||
result = provider.verify_email_domain(TEST_DOMAIN, team_id=1)
|
||||
|
||||
# Should return pending status with DNS records
|
||||
assert result == {
|
||||
"status": "pending",
|
||||
"dnsRecords": [
|
||||
{
|
||||
"type": "verification",
|
||||
"recordType": "TXT",
|
||||
"recordHostname": "_amazonses.test.posthog.com",
|
||||
"recordValue": "test-token-123",
|
||||
"status": "pending",
|
||||
},
|
||||
{
|
||||
"type": "dkim",
|
||||
"recordType": "CNAME",
|
||||
"recordHostname": "token1._domainkey.test.posthog.com",
|
||||
"recordValue": "token1.dkim.amazonses.com",
|
||||
"status": "pending",
|
||||
},
|
||||
{
|
||||
"type": "dkim",
|
||||
"recordType": "CNAME",
|
||||
"recordHostname": "token2._domainkey.test.posthog.com",
|
||||
"recordValue": "token2.dkim.amazonses.com",
|
||||
"status": "pending",
|
||||
},
|
||||
{
|
||||
"type": "dkim",
|
||||
"recordType": "CNAME",
|
||||
"recordHostname": "token3._domainkey.test.posthog.com",
|
||||
"recordValue": "token3.dkim.amazonses.com",
|
||||
"status": "pending",
|
||||
},
|
||||
{
|
||||
"type": "spf",
|
||||
"recordType": "TXT",
|
||||
"recordHostname": "@",
|
||||
"recordValue": "v=spf1 include:amazonses.com ~all",
|
||||
"status": "pending",
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
def test_verify_email_domain_success(self):
|
||||
provider = SESProvider()
|
||||
|
||||
result = provider.verify_email_domain(TEST_DOMAIN, team_id=1)
|
||||
# Should return verified status with no DNS records needed
|
||||
assert result == {"status": "success", "dnsRecords": []}
|
||||
@@ -2,8 +2,9 @@ import { useActions, useValues } from 'kea'
|
||||
import { Form } from 'kea-forms'
|
||||
|
||||
import { IconCheckCircle, IconCopy, IconWarning } from '@posthog/icons'
|
||||
import { LemonButton, LemonInput, LemonModal, Spinner, lemonToast } from '@posthog/lemon-ui'
|
||||
import { LemonButton, LemonInput, LemonModal, LemonSelect, Spinner, lemonToast } from '@posthog/lemon-ui'
|
||||
|
||||
import { FlaggedFeature } from 'lib/components/FlaggedFeature'
|
||||
import { LemonField } from 'lib/lemon-ui/LemonField'
|
||||
|
||||
import { DnsRecord, EmailSetupModalLogicProps, emailSetupModalLogic } from './emailSetupModalLogic'
|
||||
@@ -20,6 +21,18 @@ export const EmailSetupModal = (props: EmailSetupModalLogicProps): JSX.Element =
|
||||
modalContent = (
|
||||
<Form logic={emailSetupModalLogic} formKey="emailSender">
|
||||
<div className="space-y-4">
|
||||
<FlaggedFeature flag="messaging-ses">
|
||||
{/* NOTE: We probably dont want to actually give the options - this is just for our own testing */}
|
||||
<LemonField name="provider" label="Provider">
|
||||
<LemonSelect
|
||||
options={[
|
||||
{ value: 'ses', label: 'AWS SES' },
|
||||
{ value: 'mailjet', label: 'Mailjet' },
|
||||
{ value: 'maildev', label: 'Maildev' },
|
||||
]}
|
||||
/>
|
||||
</LemonField>
|
||||
</FlaggedFeature>
|
||||
<LemonField name="name" label="Name">
|
||||
<LemonInput type="text" placeholder="John Doe" disabled={integrationLoading} />
|
||||
</LemonField>
|
||||
|
||||
@@ -26,6 +26,7 @@ export interface DnsRecord {
|
||||
export interface EmailSenderFormType {
|
||||
email: string
|
||||
name: string
|
||||
provider: 'ses' | 'mailjet' | 'maildev'
|
||||
}
|
||||
|
||||
const EMAIL_REGEX = /^[^\s@]+@[^\s@]+\.[^\s@]+$/i
|
||||
@@ -41,10 +42,11 @@ export const emailSetupModalLogic = kea<emailSetupModalLogicType>([
|
||||
forms(({ actions }) => ({
|
||||
emailSender: {
|
||||
defaults: {
|
||||
provider: 'mailjet',
|
||||
email: '',
|
||||
name: '',
|
||||
},
|
||||
errors: ({ email, name }) => {
|
||||
} as EmailSenderFormType,
|
||||
errors: ({ email, name, provider }) => {
|
||||
let emailError = undefined
|
||||
if (!email) {
|
||||
emailError = 'Email is required'
|
||||
@@ -55,6 +57,7 @@ export const emailSetupModalLogic = kea<emailSetupModalLogicType>([
|
||||
return {
|
||||
email: emailError,
|
||||
name: !name ? 'Name is required' : undefined,
|
||||
provider: !provider ? 'Provider is required' : undefined,
|
||||
}
|
||||
},
|
||||
submit: async (config) => {
|
||||
|
||||
@@ -35,7 +35,7 @@ export const messageChannelLogic = kea<messageChannelLogicType>([
|
||||
({ integration }): boolean => {
|
||||
switch (integration?.kind) {
|
||||
case 'email':
|
||||
return integration.config.mailjet_verified === true
|
||||
return integration.config.verified === true
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user