feat(cdp): add cdp destination APIs (#14994)

This commit adds the CDP destination APIs. Key changes include:

 - use `db-migrate` for migrations
 - jest for functional_tests (although I would be happy to use vitest or
   alternatives if we want to, I didn't want to change too much at once)
 - pnpm for package management
 - koajs for the server
 - Ajv for validation
 - A separate PostgreSQL logical database for the destination APIs
   persistence.

Things still to do:

 - add some delivery mechanism that takes events from Kafka and puts
   them to the destinations.
 - add CI
 - add to Helm Chart
 - add some method of authentication. I've added the API here but it
   might be that I just end up putting that in the main app in the end,
   depending on how much momentum there is to try out separating the API
   a bit, and the logistics of that.
This commit is contained in:
Harry Waye
2023-04-12 12:47:50 +01:00
committed by GitHub
parent 5b0b74863d
commit 9529cdd443
16 changed files with 6455 additions and 0 deletions

128
.github/workflows/cdp.yml vendored Normal file
View File

@@ -0,0 +1,128 @@
#
# Build and test the Docker image for the CDP service found in the cdp/
# directory.
#
# This job is triggered by pushes to the master branch and by pull requests that
# touch the cdp/ directory.
#
# Once built we run the functional tests against the running image.
name: CDP CI
on:
push:
branches:
- master
paths:
- cdp/**
pull_request:
branches:
- master
paths:
- cdp/**
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: docker/metadata-action@v4
id: meta
with:
images: ghcr.io/${{ github.repository }}/cdp
flavor: |
latest=${{ github.ref == 'refs/heads/master' }}
# Make the image tags used for docker cache. We use this rather than
# ${{ github.repository }} directly because the repository
# organization name is has upper case characters, which are not
# allowed in docker image names.
- uses: docker/metadata-action@v4
id: meta-cache
with:
images: ghcr.io/${{ github.repository }}/cdp
tags: |
type=raw,value=cache
- uses: docker/build-push-action@v4
with:
context: cdp
file: cdp/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=${{ steps.meta-cache.outputs.tags }}
cache-to: type=registry,ref=${{ steps.meta-cache.outputs.tags }},mode=max
# Output the image tags so that we can use them in the next job.
outputs:
tags: ${{ steps.meta.outputs.tags }}
test:
# Run the functional tests against the CDP service. We pull the image
# from GHCR and run it locally. We need only the db service from the
# main docker-compose.yml file, so we use the --services flag to only
# start that service.
runs-on: ubuntu-latest
needs: build
steps:
- uses: actions/checkout@v3
- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Install PNPM
run: |
npm install -g pnpm
- name: Setup node
uses: actions/setup-node@v3
with:
node-version: '18'
cache: 'pnpm'
cache-dependency-path: cdp/pnpm-lock.yaml
- name: Install test dependencies
working-directory: cdp
run: |
pnpm install --frozen-lockfile
- name: Start CDP
working-directory: cdp
run: |
mkdir -p /tmp/logs
docker compose -f ../docker-compose.dev.yml up -d db >> /tmp/logs/db.txt
# Wait for the db service to be ready, up to 30 seconds.
SECONDS=0
until docker compose -f ../docker-compose.dev.yml exec -T db pg_isready; do
if [ $SECONDS -gt 30 ]; then
echo "Timed out waiting for db service to be ready."
exit 1
fi
sleep 1
done
# Create a shell alias for the docker image we just built, using the tags output.
export SECRET_KEY=$(openssl rand -hex 32)
CDP_RUN="docker run -e SECRET_KEY=$SECRET_KEY -e DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog --rm --network=host ${{ needs.build.outputs.tags }}"
# Run the migrations.
$CDP_RUN sqlx migrate run
# Start the CDP service.
$CDP_RUN &> /tmp/logs/cdp.txt &
# Run the functional tests.
pnpm jest

1
cdp/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
dist/

15
cdp/.swcrc Normal file
View File

@@ -0,0 +1,15 @@
{
"jsc": {
"parser": {
"syntax": "typescript",
"tsx": false,
"decorators": false,
"dynamicImport": false
},
"target": "es2020",
"baseUrl": "."
},
"module": {
"type": "commonjs"
}
}

82
cdp/Dockerfile Normal file
View File

@@ -0,0 +1,82 @@
# Build the CDP server image. We use a multi-stage build to first build the CDP
# node application, then copy the built files to the final image.
#
# Note: separtely we bundle the resulting dist folder into the
# production.Dockerfile image such that the main image can run the entire
# application without needing to build the CDP server.
#
# We also need to copy the migrations folder as the CDP server needs it to
# run the migrations. The migrations use the Rust application sqlx-cli to
# run the migrations, so we need to copy the compiled binary from the Rust
# image. I'm sure there's a better way to do this, but this works for now.
FROM rust:1.68.2-slim-bullseye AS sqlx-cli-build
WORKDIR /code
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
# Since we are using the slim image, we need to install `pkg-config` and
# `libssl-dev` so cargo install completes successfully.
RUN apt-get update && \
apt-get install -y --no-install-recommends \
"pkg-config" \
"libssl-dev" \
&& \
rm -rf /var/lib/apt/lists/*
# Install SQLx CLI.
RUN cargo install --version 0.6.3 sqlx-cli --no-default-features --features native-tls,postgres
FROM node:18.12.1-bullseye-slim AS cdp-build
WORKDIR /code
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
# Install Node.js dependencies.
COPY package.json pnpm-lock.yaml ./
RUN corepack enable && \
mkdir /tmp/pnpm-store && \
pnpm install --frozen-lockfile --store-dir /tmp/pnpm-store && \
rm -rf /tmp/pnpm-store
# Build the CDP server.
#
# Note: we run the build as a separate action to increase
# the cache hit ratio of the layers above.
COPY ./src/ ./src/
COPY tsconfig.json .swcrc ./
RUN pnpm build
# As the CDP server is now built, lets keep only prod dependencies in the
# node_module folder as we will copy it to the last image. We remove all
# dependencies first to ensure we end up with the smallest possible image.
RUN rm -rf node_modules && \
corepack enable && \
mkdir /tmp/pnpm-store && \
pnpm install --frozen-lockfile --store-dir /tmp/pnpm-store --prod && \
rm -rf /tmp/pnpm-store
# Build the final image.
FROM node:18.12.1-bullseye-slim
WORKDIR /code
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
# Install tini.
RUN apt-get update && \
apt-get install -y --no-install-recommends \
"tini" \
&& \
rm -rf /var/lib/apt/lists/*
# Copy the SQLx CLI binary from the previous stage.
COPY --from=sqlx-cli-build --link /usr/local/cargo/bin/sqlx /usr/local/bin/sqlx
# Copy the built CDP server from the previous stage.
COPY --from=cdp-build --link /code/node_modules/ ./node_modules/
COPY --from=cdp-build --link /code/dist/ ./dist/
COPY --link ./migrations/ ./migrations/
# Set [Tini](https://github.com/krallin/tini) as the entrypoint.
ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["node", "dist/rest.js"]

5
cdp/README.md Normal file
View File

@@ -0,0 +1,5 @@
# Customer Data Pipeline
Handles delivering event streams to destinations.
TODO: fill this in a bit more. Very much a work in progress at the moment.

View File

@@ -0,0 +1,498 @@
/*
Tests for a basic CRUD API for destinations supporting GET, POST, PUT, and
DELETE, corresponding to creating, reading, updating, and deleting destinations
as well as other similar operations.
We also have an API for the list of destination types, which provides a list of
types along with the schema for the configuration for each type. This is used
to validate the configuration for each destination.
We do not attempt to handle e.g. idempotency of requests although that may be a
good idea if we hit issues with e.g. retry logic and concurrency. See for
example https://www.mscharhag.com/api-design/rest-making-post-patch-idempotent
for an example way to implement this.
*/
import { describe, test, expect } from '@jest/globals'
import jwt from 'jsonwebtoken'
describe('DestinationType API', () => {
describe('GET destination types', () => {
test.concurrent('should be able to retrieve a list of destination types', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destinationTypes = await listDestinationTypesOk(token, projectId)
expect(destinationTypes).toEqual(
expect.arrayContaining([
expect.objectContaining({
type: 'webhook',
configSchema: expect.any(Object),
}),
])
)
})
test.concurrent('project id must be a number', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const response = await listDestinationTypes(token, 'invalid')
expect(response.status).toEqual(400)
})
test.concurrent(
"should not be able to retrieve a list of destination types if you don't have access to the project",
async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [], userId: 1 })
const response = await listDestinationTypes(token, projectId)
expect(response.status).toEqual(403)
}
)
})
})
describe('Destination API', () => {
describe('POST destination', () => {
test.concurrent('should be able to create a destination', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const response = await postDestination(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: {
url: 'https://example.com',
},
})
expect(response.status).toEqual(201)
})
test.concurrent('should not be able to create a destination with an invalid config schema', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const response = await postDestination(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: {
invalid: 'config',
},
})
expect(response.status).toEqual(400)
})
test.concurrent(
"should not be able to create a destination if you don't have access to the project",
async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [], userId: 1 })
const response = await postDestination(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: {
url: 'https://example.com',
},
})
expect(response.status).toEqual(403)
}
)
})
describe('GET destination', () => {
test.concurrent('should be able to retrieve a destination', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
const retrievedDestination = await getDestinationOk(token, projectId, destinationId)
expect(retrievedDestination).toEqual(expect.objectContaining(destination))
})
test.concurrent('should not be able to retrieve a destination from another project', async () => {
const projectId = (await createProjectOk()).id
const otherProjectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId, otherProjectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const response = await getDestination(token, otherProjectId, destinationId)
expect(response.status).toEqual(404)
})
test.concurrent(
"should not be able to retrieve a destination if you don't have access to the project",
async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const unauthorizedToken = await generateJwt({ projectIds: [], userId: 1 })
const response = await getDestination(unauthorizedToken, projectId, destinationId)
expect(response.status).toEqual(403)
}
)
})
describe('PUT destination', () => {
test.concurrent('should be able to update a destination', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const updatedDestination = await putDestinationOk(token, projectId, destinationId, {
name: 'Updated Destination',
description: 'Updated Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
expect(updatedDestination).toEqual(
expect.objectContaining({
id: destinationId,
name: 'Updated Destination',
description: 'Updated Description',
})
)
})
test.concurrent('should not be able to update a destination with an invalid config schema', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const response = await putDestination(token, projectId, destinationId, {
name: 'Updated Destination',
description: 'Updated Description',
type: 'webhook',
config: { invalid: 'config' },
})
expect(response.status).toEqual(400)
})
test.concurrent('should not be able to change the destination type', async () => {
// For simplicity of handling e.g. the schema of `config` do not
// want to allow changing the destination type rather the user
// should delete and recreate a distination.
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const response = await putDestination(token, projectId, destinationId, {
name: 'Updated Destination',
description: 'Updated Description',
type: 'email',
config: { url: 'https://example.com' },
})
expect(response.status).toEqual(400)
})
test.concurrent('should not be able to update a destination with an invalid id', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const response = await putDestination(token, projectId, 'invalid', {
name: 'Updated Destination',
description: 'Updated Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
expect(response.status).toEqual(400)
})
test.concurrent('should not be able to update a destination from another project', async () => {
const projectId = (await createProjectOk()).id
const otherProjectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId, otherProjectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const response = await putDestination(token, otherProjectId, destinationId, {
name: 'Updated Destination',
description: 'Updated Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
expect(response.status).toEqual(404)
})
test.concurrent(
"should not be able to update a destination if you don't have access to the project",
async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const unauthorizedToken = await generateJwt({ projectIds: [], userId: 1 })
const response = await putDestination(unauthorizedToken, projectId, destinationId, {
name: 'Updated Destination',
description: 'Updated Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
expect(response.status).toEqual(403)
}
)
})
describe('DELETE destination', () => {
test.concurrent('should be able to delete a destination', async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const response = await deleteDestination(token, projectId, destinationId)
expect(response.status).toEqual(204)
// Check that the destination is no longer retrievable
const getResponse = await getDestination(token, projectId, destinationId)
expect(getResponse.status).toEqual(404)
})
test.concurrent('should not be able to delete a destination with an invalid id', async () => {
const id = 'invalid'
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const response = await deleteDestination(token, projectId, id)
expect(response.status).toEqual(400)
})
test.concurrent('should not be able to delete a destination from another project', async () => {
const projectId = (await createProjectOk()).id
const otherProjectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId, otherProjectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const response = await deleteDestination(token, otherProjectId, destinationId)
expect(response.status).toEqual(404)
// Check that the destination is still retrievable
const getResponse = await getDestination(token, projectId, destinationId)
expect(getResponse.status).toEqual(200)
})
test.concurrent(
"should not be able to delete a destination if you don't have access to the project",
async () => {
const projectId = (await createProjectOk()).id
const token = await generateJwt({ projectIds: [projectId], userId: 1 })
const destination = await postDestinationOk(token, projectId, {
name: 'Test Destination',
description: 'Test Description',
type: 'webhook',
config: { url: 'https://example.com' },
})
const destinationId = destination.id
expect(destinationId).toBeDefined()
const unauthorizedToken = await generateJwt({ projectIds: [], userId: 1 })
const response = await deleteDestination(unauthorizedToken, projectId, destinationId)
expect(response.status).toEqual(403)
// Check that the destination is still retrievable
const getResponse = await getDestination(token, projectId, destinationId)
expect(getResponse.status).toEqual(200)
}
)
})
})
const listDestinationTypes = async (token: string, projectId: any): Promise<Response> => {
return await fetch(`http://localhost:3000/api/projects/${projectId}/destination-types`, {
headers: {
Authorization: `Bearer ${token}`,
},
})
}
const listDestinationTypesOk = async (token: string, projectId: number): Promise<DestinationType[]> => {
const response = await listDestinationTypes(token, projectId)
if (!response.ok) {
throw new Error(`Failed to list destination types: ${response.statusText}`)
}
return await response.json()
}
const postDestination = async (
token: string,
projectId: number,
destinationData: DestinationCreate
): Promise<Response> => {
return await fetch(`http://localhost:3000/api/projects/${projectId}/destinations`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
body: JSON.stringify(destinationData),
})
}
const postDestinationOk = async (
token: string,
projectId: number,
destinationData: DestinationCreate
): Promise<Destination> => {
const response = await postDestination(token, projectId, destinationData)
if (!response.ok) {
throw new Error(`Failed to create destination: ${response.statusText}`)
}
return await response.json()
}
const putDestination = async (
token: string,
projectId: number,
id: string,
destinationData: DestinationUpdate
): Promise<Response> => {
return await fetch(`http://localhost:3000/api/projects/${projectId}/destinations/${id}`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
body: JSON.stringify(destinationData),
})
}
const putDestinationOk = async (
token: string,
projectId: number,
id: string,
destinationData: DestinationUpdate
): Promise<Destination> => {
const response = await putDestination(token, projectId, id, destinationData)
if (!response.ok) {
throw new Error(`Failed to update destination: ${response.statusText}`)
}
return await response.json()
}
const getDestination = async (token: string, projectId: number, id: string): Promise<Response> => {
return await fetch(`http://localhost:3000/api/projects/${projectId}/destinations/${id}`, {
headers: {
Authorization: `Bearer ${token}`,
},
})
}
const getDestinationOk = async (token: string, projectId: number, id: string): Promise<Destination> => {
const response = await getDestination(token, projectId, id)
if (!response.ok) {
throw new Error(`Failed to retrieve destination: ${response.statusText}`)
}
return await response.json()
}
const deleteDestination = async (token: string, projectId: number, id: string): Promise<Response> => {
return await fetch(`http://localhost:3000/api/projects/${projectId}/destinations/${id}`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${token}`,
},
})
}
const createProjectOk = async (): Promise<{ id: number }> => {
// This isn't really an API method but rather a helper method to create a
// projectId.
return { id: Math.floor(Math.random() * 100000) }
}
const generateJwt = async (claims: Record<string, unknown>): Promise<string> => {
// Generate a token to use for HTTP requests, with the given claims using
// the jsonwebtoken library. We use the SECRET_KEY environment variable to
// sign the token.
const secret = process.env.SECRET_KEY
if (!secret) {
throw new Error('Missing SECRET_KEY environment variable')
}
return jwt.sign(claims, secret, { algorithm: 'HS256' })
}
type DestinationType = {
type: string
name: string
description: string
schema: Record<string, unknown> // A JSONSchema describing the configuration
}
type DestinationCreate = {
name: string // Name displayed to the user
description: string // Description displayed to the user
type: string // Type of destination, e.g. webhook, email, Stripe etc.
config: Record<string, unknown> // Configuration for the destination, e.g. webhook URL, email address, Stripe API key etc.
}
type DestinationUpdate = DestinationCreate
type Destination = DestinationCreate & {
id: string
created_at: string // ISO 8601 timestamp
updated_at: string // ISO 8601 timestamp
}

5
cdp/jest.config.json Normal file
View File

@@ -0,0 +1,5 @@
{
"roots": ["functional_tests"],
"preset": "ts-jest",
"testEnvironment": "node"
}

View File

@@ -0,0 +1,23 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE destinations (
primary_key integer PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
-- A unique identifier for this destination that does not expose
-- cardinality.
id uuid NOT NULL DEFAULT uuid_generate_v4() UNIQUE,
-- NOTE: we use team_id here to be consistent with the rest of the app,
-- but this is the id of a project.
team_id bigint NOT NULL,
name text NOT NULL,
description text NOT NULL,
-- The type of destination. This is used to determine which
-- destination-specific configuration to use.
type text NOT NULL,
-- The destination-specific configuration. This is a JSON object
-- that is specific to the destination type.
config jsonb NOT NULL,
-- Metadata about the destination.
created_at timestamp NOT NULL DEFAULT now(),
created_by_id bigint NOT NULL,
updated_at timestamp NOT NULL DEFAULT now(),
is_deleted boolean NOT NULL DEFAULT false
);

52
cdp/package.json Normal file
View File

@@ -0,0 +1,52 @@
{
"name": "cdp",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"dev": "NODE_ENV=dev nodemon -w src/ src/rest.ts | pino-pretty",
"test": "jest",
"build": "swc ./src/ --out-dir ./dist/",
"migrate": "sqlx migrate run"
},
"keywords": [],
"author": "",
"license": "ISC",
"devDependencies": {
"@jest/globals": "^29.5.0",
"@swc/cli": "^0.1.62",
"@swc/core": "^1.3.46",
"@swc/helpers": "^0.5.0",
"@types/jest": "^29.5.0",
"@types/jsonwebtoken": "^9.0.1",
"@types/koa": "^2.13.6",
"@types/koa-bodyparser": "^4.3.10",
"@types/koa-pino-logger": "^3.0.1",
"@types/koa-router": "^7.4.4",
"@types/node": "^18.15.11",
"@types/pg": "^8.6.6",
"jest": "^29.5.0",
"nodemon": "^2.0.22",
"pino-pretty": "^10.0.0",
"swc-node": "^1.0.0",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"typescript": "^5.0.3"
},
"dependencies": {
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/instrumentation-pg": "^0.35.0",
"@opentelemetry/instrumentation-pino": "^0.33.1",
"@opentelemetry/sdk-node": "^0.37.0",
"@opentelemetry/sdk-trace-base": "^1.11.0",
"@opentelemetry/sdk-trace-node": "^1.11.0",
"ajv": "^8.12.0",
"jsonwebtoken": "^9.0.0",
"koa": "^2.14.1",
"koa-bodyparser": "^4.4.0",
"koa-jwt": "^4.0.4",
"koa-pino-logger": "^4.0.0",
"koa-router": "^12.0.0",
"pg": "^8.10.0"
}
}

4837
cdp/pnpm-lock.yaml generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,42 @@
// This file is responsible for handling the destination types API. It's a
// simple API that returns a list of all the available destination types.
//
// The destination types are defined in code for now, but it's possible that we
// will want to move them to the database in the future to allow dynamic
// addition of new destination types.
import Koa from 'koa'
type DestinationType = {
type: string
name: string
description: string
configSchema: Record<string, unknown> // A JSONSchema describing the configuration
}
const destinationTypes: { [type: string]: DestinationType } = {
webhook: {
type: 'webhook',
name: 'Webhook',
description: 'Send events to a webhook',
configSchema: {
type: 'object',
properties: {
url: {
type: 'string',
description: 'The URL to send the webhook to',
},
},
required: ['url'],
},
},
}
export const listDestinationTypes = async (): Promise<DestinationType[]> => {
return Object.values(destinationTypes)
}
export const listDestinationTypesHandler = async (ctx: Koa.Context): Promise<void> => {
ctx.status = 200
ctx.body = await listDestinationTypes()
}

View File

@@ -0,0 +1,245 @@
/*
*
* This file is responsible for handling the destination API. It provides
* handlers for creating, updating, and deleting destinations, as well as
* listing destinations.
*
* Note that we do not delete destinations, but instead mark them as deleted. This
* is to ensure that we can keep a history of destinations that have been used
* in the past.
*
*/
import { randomUUID } from 'crypto'
import Koa from 'koa'
import pg from 'pg'
import Ajv, { JSONSchemaType } from 'ajv'
import { SQL } from '../sql-template-string'
import { listDestinationTypes } from '../destination-types/handlers'
type DestinationData = {
name: string // Name displayed to the user
description: string // Description displayed to the user
type: string // Type of destination, e.g. webhook, email, Stripe etc.
config: Record<string, unknown> // Configuration for the destination, e.g. webhook URL, email address, Stripe API key etc.
}
type DestinationCreateRequest = DestinationData
const ajv = new Ajv()
const createDestinationRequestSchema: JSONSchemaType<DestinationCreateRequest> = {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Name displayed to the user',
},
description: {
type: 'string',
description: 'Description displayed to the user',
},
type: {
type: 'string',
description: 'Type of destination, e.g. webhook, email, Stripe etc.',
},
config: {
type: 'object',
description: 'Configuration for the destination, e.g. webhook URL, email address, Stripe API key etc.',
},
},
required: ['name', 'description', 'type', 'config'],
}
const createDestinationRequestValidator = ajv.compile(createDestinationRequestSchema)
export const createDestinationHandler =
(database: pg.Client) =>
async (ctx: Koa.Context): Promise<void> => {
const destination = ctx.request.body
// Validate the request body using Ajv
const requestValid = createDestinationRequestValidator(destination)
if (!requestValid) {
ctx.status = 400
ctx.body = createDestinationRequestValidator.errors
return
}
// Validate the config against the destination type schema
const config = destination.config
const destinationType = (await listDestinationTypes()).find(
(destinationType) => destinationType.type === destination.type
)
// If the destination type doesn't exist, return a 400
if (!destinationType) {
ctx.status = 400
return
}
// If the config doesn't match the schema, return a 400. We use AJV to
// perform validation.
const typeValidator = ajv.compile(destinationType.configSchema)
const typeValid = typeValidator(config)
if (!typeValid) {
ctx.status = 400
ctx.body = typeValidator.errors
return
}
const id = randomUUID()
const result = await database.query(
SQL`
INSERT INTO destinations (
id,
team_id,
name,
description,
type,
config,
created_by_id
) VALUES (
${id},
${ctx.params.projectId},
${destination.name},
${destination.description},
${destination.type},
${destination.config},
${ctx.state.jwtData.userId}
) RETURNING *
`
)
ctx.status = 201
ctx.body = result.rows[0]
}
export const getDestinationHandler =
(database: pg.Client) =>
async (ctx: Koa.Context): Promise<void> => {
const id = ctx.params.destinationId
// Validate id is a uuid
if (!id.match(/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i)) {
ctx.status = 400
return
}
const result = await database.query(
SQL`
SELECT *
FROM destinations
WHERE
id = ${id} AND
team_id = ${ctx.params.projectId} AND
is_deleted = false
`
)
if (result.rowCount === 0) {
ctx.status = 404
return
}
ctx.status = 200
ctx.body = result.rows[0]
}
type DestinationUpdateRequest = DestinationData
const updateDestinationRequestSchema: JSONSchemaType<DestinationUpdateRequest> = createDestinationRequestSchema
const updateDestinationRequestValidator = ajv.compile(updateDestinationRequestSchema)
export const updateDestinationHandler =
(database: pg.Client) =>
async (ctx: Koa.Context): Promise<void> => {
const destination = ctx.request.body
// Validate the request body using Ajv
const requestValid = updateDestinationRequestValidator(destination)
if (!requestValid) {
ctx.status = 400
ctx.body = updateDestinationRequestValidator.errors
return
}
const id = ctx.params.destinationId
// Validate id is a uuid
if (!id.match(/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i)) {
ctx.status = 400
return
}
// Validate the config against the destination type schema
const config = destination.config
const destinationType = (await listDestinationTypes()).find(
(destinationType) => destinationType.type === destination.type
)
// If the destination type doesn't exist, return a 400
if (!destinationType) {
ctx.status = 400
return
}
// If the config doesn't match the schema, return a 400. We use AJV to
// perform validation.
const typeValidator = ajv.compile(destinationType.configSchema)
const typeValid = typeValidator(config)
if (!typeValid) {
ctx.status = 400
ctx.body = typeValidator.errors
return
}
// NOTE: you cannot update a deleted destination. In the case that you
// try to update a deleted destination, we will return a 404. This is
// detected by the update row count being 0.
const result = await database.query(
SQL`
UPDATE destinations
SET
name = ${destination.name},
description = ${destination.description},
type = ${destination.type},
config = ${destination.config}
WHERE
id = ${id} AND
team_id = ${ctx.params.projectId} AND
is_deleted = false
RETURNING *
`
)
if (result.rowCount === 0) {
ctx.status = 404
return
}
ctx.status = 200
ctx.body = result.rows[0]
}
export const deleteDestinationHandler =
(database: pg.Client) =>
async (ctx: Koa.Context): Promise<void> => {
// NOTE: we do not delete the destination, but instead mark it as deleted
const id = ctx.params.destinationId
// Validate id is a uuid
if (!id.match(/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i)) {
ctx.status = 400
return
}
const result = await database.query(SQL`
UPDATE destinations
SET is_deleted = true
WHERE
id = ${id} AND
team_id = ${ctx.params.projectId}
`)
if (result.rowCount === 0) {
ctx.status = 404
return
}
ctx.status = 204
}

121
cdp/src/rest.ts Normal file
View File

@@ -0,0 +1,121 @@
/*
REST API for adding, updating, removing, and listing Destination. A Destination
represents a remote location that events should be sent to. The REST API tries
to be as simple as possible, and only supports JSON. It also tries to be of the
same response style as the API from the Django part of the application which
uses Django REST Framework. `Destination`s are stored in a separate logical
PostgreSQL database to the main application database to provide a clear
separation of concerns and limit the impact of e.g. heavy usage of the database
from the main application.
We also provide a read only DestinationType resource, which is used to list
the available DestinationTypes. This is used to retrieve the available
DestinationTypes for use as `Destination.type` as well as the schema for the
`Destination.config` field. These types are defined in code for now, but it's
possible that we will want to move them to the database in the future to allow
dynamic addition of new `DestinationType`s.
The implementation is based on Koajs, which is a popular Node.js web
application framework. Below we define the Koa application and the routes for
the REST API, using handlers defined in the `handlers.ts` files.
We do not at this point separate out the implementation
into Services, Repositories, and Controllers, but instead keep it all in one
file, although that could be an improvement in the future if we find ourselves
using the destinations API in other parts of the CDP application.
*/
import assert from 'assert'
import Koa from 'koa'
import Router from 'koa-router'
import bodyParser from 'koa-bodyparser'
import logger from 'koa-pino-logger'
import pg from 'pg'
import jwt from 'koa-jwt'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base'
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg'
import { PinoInstrumentation } from '@opentelemetry/instrumentation-pino'
import { listDestinationTypesHandler } from './destination-types/handlers'
import {
createDestinationHandler,
deleteDestinationHandler,
getDestinationHandler,
updateDestinationHandler,
} from './destinations/handlers'
const getApp = async (config: NodeJS.ProcessEnv): Promise<Koa> => {
const app = new Koa()
const router = new Router()
assert(config.DATABASE_URL, 'DATABASE_URL environment variable must be set')
assert(config.SECRET_KEY, 'SECRET_KEY environment variable must be set')
const database = new pg.Client({
connectionString: config.DATABASE_URL,
statement_timeout: 1000,
connectionTimeoutMillis: 1000,
})
await database.connect()
const opentelemetry = new NodeSDK({
traceExporter: new ConsoleSpanExporter(),
instrumentations: [new PgInstrumentation(), new PinoInstrumentation()],
})
opentelemetry.start()
app.use(jwt({ secret: config.SECRET_KEY, key: 'jwtData' }))
// For any route matching /api/projects/:projectId/... we want to make sure
// that the JWT token contains the projectId as a claim. If it doesn't we
// return a 403 Forbidden response.
router.use('/api/projects/:projectId', async (ctx, next) => {
const projectId = Number.parseInt(ctx.params.projectId)
const jwtData = ctx.state.jwtData
if (jwtData.projectIds.indexOf(projectId) === -1) {
ctx.status = 403
ctx.body = {
detail: 'You do not have permission to perform this action.',
}
return
}
await next()
})
router.param('projectId', (projectId, ctx, next) => {
if (projectId.match(/^[0-9]+$/)) {
return next()
}
ctx.status = 400
ctx.body = {
detail: 'Invalid project ID.',
}
})
router.get('/api/projects/:projectId/destination-types', listDestinationTypesHandler)
router.post('/api/projects/:projectId/destinations', createDestinationHandler(database))
router.get('/api/projects/:projectId/destinations/:destinationId', getDestinationHandler(database))
router.put('/api/projects/:projectId/destinations/:destinationId', updateDestinationHandler(database))
router.delete('/api/projects/:projectId/destinations/:destinationId', deleteDestinationHandler(database))
app.use(logger())
app.use(bodyParser())
app.use(router.routes())
app.use(router.allowedMethods())
return app
}
const config = {
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/cdp',
SECRET_KEY: '<randomly generated secret key>',
...process.env,
}
getApp(config).then((app) => {
app.listen(3000)
})

View File

@@ -0,0 +1,9 @@
export const SQL = (sqlParts: TemplateStringsArray, ...args: any[]): { text: string; values: any[] } => {
// Generates a node-pq compatible query object given a tagged
// template literal. The intention is to remove the need to match up
// the positional arguments with the $1, $2, etc. placeholders in
// the query string.
const text = sqlParts.reduce((acc, part, i) => acc + '$' + i + part)
const values = args
return { text, values }
}

20
cdp/tsconfig.json Normal file
View File

@@ -0,0 +1,20 @@
{
"ts-node": {
"transpileOnly": true,
"transpiler": "ts-node/transpilers/swc-experimental"
},
"compilerOptions": {
"module": "commonjs",
"esModuleInterop": true,
"target": "es6",
"noImplicitAny": true,
"moduleResolution": "node",
"sourceMap": true,
"outDir": "dist",
"baseUrl": ".",
"paths": {
"*": ["node_modules/*"]
}
},
"include": ["src/**/*"]
}

View File

@@ -0,0 +1,372 @@
openapi: 3.0.0
info:
title: Customer Data Pipeline
description: |
The Customer Data Pipeline (CDP) is a service that allows users to
upload data to the platform. It allows users to specify `Destination`s
to which the data should be sent, and `Source`s from which the data is
sent from the client.
Destinations can be of type 'webhook' only currently. The CDP service
will send a POST request to the webhook URL with the data in the body
of the request. A basic transformation can be specified to transform
event data into the format required by the destination.
paths:
/api/projects/{project_id}/destination-types:
get:
summary: List destination types
description: |
List all destination types.
operationId: listDestinationTypes
parameters:
- name: project_id
in: path
description: ID of the project
required: true
schema:
type: string
responses:
'200':
description: List of destination types
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/DestinationType'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Project not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
/api/projects/{project_id}/destinations:
get:
summary: List destinations
description: |
List all destinations for a project.
operationId: listDestinations
parameters:
- name: project_id
in: path
description: ID of the project
required: true
schema:
type: string
responses:
'200':
description: List of destinations
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Destination'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Project not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
post:
summary: Create a destination
description: |
Create a destination for a project.
operationId: createDestination
parameters:
- name: project_id
in: path
description: ID of the project
required: true
schema:
type: string
requestBody:
description: Destination to create
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/Destination'
responses:
'201':
description: Destination created
content:
application/json:
schema:
$ref: '#/components/schemas/Destination'
'400':
description: Bad request
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Project not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
/api/projects/{project_id}/destinations/{destination_id}:
get:
summary: Get a destination
description: |
Get a destination for a project.
operationId: getDestination
parameters:
- name: project_id
in: path
description: ID of the project
required: true
schema:
type: string
- name: destination_id
in: path
description: ID of the destination
required: true
schema:
type: string
responses:
'200':
description: Destination
content:
application/json:
schema:
$ref: '#/components/schemas/Destination'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Project or destination not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
put:
summary: Update a destination
description: |
Update a destination for a project.
operationId: updateDestination
parameters:
- name: project_id
in: path
description: ID of the project
required: true
schema:
type: string
- name: destination_id
in: path
description: ID of the destination
required: true
schema:
type: string
requestBody:
description: Destination to update
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/Destination'
responses:
'200':
description: Destination updated
content:
application/json:
schema:
$ref: '#/components/schemas/Destination'
'400':
description: Bad request
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Project or destination not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
delete:
summary: Delete a destination
description: |
Delete a destination for a project.
operationId: deleteDestination
parameters:
- name: project_id
in: path
description: ID of the project
required: true
schema:
type: string
- name: destination_id
in: path
description: ID of the destination
required: true
schema:
type: string
responses:
'204':
description: Destination deleted
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Project or destination not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
components:
schemas:
DestinationType:
type: object
properties:
id:
type: string
description: ID of the destination type
name:
type: string
description: Name of the destination type
description:
type: string
description: Description of the destination type
created_at:
type: string
format: date-time
description: Date and time when the destination type was created
updated_at:
type: string
format: date-time
description: Date and time when the destination type was updated
Destination:
type: object
properties:
id:
type: string
description: ID of the destination
type:
type: string
description: Type of the destination
name:
type: string
description: Name of the destination
description:
type: string
description: Description of the destination
created_at:
type: string
format: date-time
description: Date and time when the destination was created
updated_at:
type: string
format: date-time
description: Date and time when the destination was updated
Error:
type: object
properties:
message:
type: string
description: Error message