feat(data-warehouse): external data resource (#17554)

* create airbyte source api

* comment test

* rename and add connection

* comment out test

* frontend updates and additional API calls on airbyte

* some more UI and retrieve endpoint

* restore lock file

* connecting the dots

* add destination logic

* make destinatino parquet

* ui updates

* rename data

* more renaming

* migration

* remove test

* rename

* Update UI snapshots for `chromium` (2)

* missing file

* typing

* remove unneeded field

* add rollback deletions if one fo the related resources fails

* add feature flag

* fix error handling

* adjust warning messages

* fix external link listener

* Update UI snapshots for `chromium` (2)

* format

* remove unnecessary env var

* adjustments

* add more env vars

* fix import

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Eric Duong
2023-11-02 15:25:16 -04:00
committed by GitHub
parent 90ca04b8f2
commit 199e91350f
30 changed files with 961 additions and 111 deletions

View File

@@ -0,0 +1 @@
<svg version="1.1" id="Layer_1" xmlns="http://www.w3.org/2000/svg" x="0" y="0" viewBox="0 0 468 222.5" style="enable-background:new 0 0 468 222.5" xml:space="preserve"><style>.st0{fill-rule:evenodd;clip-rule:evenodd;fill:#635bff}</style><path class="st0" d="M414 113.4c0-25.6-12.4-45.8-36.1-45.8-23.8 0-38.2 20.2-38.2 45.6 0 30.1 17 45.3 41.4 45.3 11.9 0 20.9-2.7 27.7-6.5v-20c-6.8 3.4-14.6 5.5-24.5 5.5-9.7 0-18.3-3.4-19.4-15.2h48.9c0-1.3.2-6.5.2-8.9zm-49.4-9.5c0-11.3 6.9-16 13.2-16 6.1 0 12.6 4.7 12.6 16h-25.8zM301.1 67.6c-9.8 0-16.1 4.6-19.6 7.8l-1.3-6.2h-22v116.6l25-5.3.1-28.3c3.6 2.6 8.9 6.3 17.7 6.3 17.9 0 34.2-14.4 34.2-46.1-.1-29-16.6-44.8-34.1-44.8zm-6 68.9c-5.9 0-9.4-2.1-11.8-4.7l-.1-37.1c2.6-2.9 6.2-4.9 11.9-4.9 9.1 0 15.4 10.2 15.4 23.3 0 13.4-6.2 23.4-15.4 23.4zM223.8 61.7l25.1-5.4V36l-25.1 5.3zM223.8 69.3h25.1v87.5h-25.1zM196.9 76.7l-1.6-7.4h-21.6v87.5h25V97.5c5.9-7.7 15.9-6.3 19-5.2v-23c-3.2-1.2-14.9-3.4-20.8 7.4zM146.9 47.6l-24.4 5.2-.1 80.1c0 14.8 11.1 25.7 25.9 25.7 8.2 0 14.2-1.5 17.5-3.3V135c-3.2 1.3-19 5.9-19-8.9V90.6h19V69.3h-19l.1-21.7zM79.3 94.7c0-3.9 3.2-5.4 8.5-5.4 7.6 0 17.2 2.3 24.8 6.4V72.2c-8.3-3.3-16.5-4.6-24.8-4.6C67.5 67.6 54 78.2 54 95.9c0 27.6 38 23.2 38 35.1 0 4.6-4 6.1-9.6 6.1-8.3 0-18.9-3.4-27.3-8v23.8c9.3 4 18.7 5.7 27.3 5.7 20.8 0 35.1-10.3 35.1-28.2-.1-29.8-38.2-24.5-38.2-35.7z"/></svg>

After

Width:  |  Height:  |  Size: 1.3 KiB

View File

@@ -50,6 +50,8 @@ import {
BatchExportRun,
UserBasicType,
NotebookNodeResource,
ExternalDataStripeSourceCreatePayload,
ExternalDataStripeSource,
} from '~/types'
import { getCurrentOrganizationId, getCurrentTeamId } from './utils/logics'
import { CheckboxValueType } from 'antd/lib/checkbox/Group'
@@ -566,6 +568,11 @@ class ApiRequest {
return this.batchExportRun(id, runId, teamId).addPathComponent('logs')
}
// External Data Source
public externalDataSources(teamId?: TeamType['id']): ApiRequest {
return this.projectsDetail(teamId).addPathComponent('external_data_sources')
}
// Request finalization
public async get(options?: ApiMethodOptions): Promise<any> {
return await api.get(this.assembleFullUrl(), options)
@@ -1571,6 +1578,17 @@ const api = {
},
},
externalDataSources: {
async list(): Promise<PaginatedResponse<ExternalDataStripeSource>> {
return await new ApiRequest().externalDataSources().get()
},
async create(
data: Partial<ExternalDataStripeSourceCreatePayload>
): Promise<ExternalDataStripeSourceCreatePayload> {
return await new ApiRequest().externalDataSources().create({ data })
},
},
dataWarehouseViewLinks: {
async list(): Promise<PaginatedResponse<DataWarehouseViewLink>> {
return await new ApiRequest().dataWarehouseViewLinks().get()

View File

@@ -151,6 +151,7 @@ export const FEATURE_FLAGS = {
EXCEPTION_AUTOCAPTURE: 'exception-autocapture',
DATA_WAREHOUSE: 'data-warehouse', // owner: @EDsCODE
DATA_WAREHOUSE_VIEWS: 'data-warehouse-views', // owner: @EDsCODE
DATA_WAREHOUSE_EXTERNAL_LINK: 'data-warehouse-external-link', // owner: @EDsCODE
FF_DASHBOARD_TEMPLATES: 'ff-dashboard-templates', // owner: @EDsCODE
SHOW_PRODUCT_INTRO_EXISTING_PRODUCTS: 'show-product-intro-existing-products', // owner: @raquelmsmith
ARTIFICIAL_HOG: 'artificial-hog', // owner: @Twixes

View File

@@ -45,7 +45,7 @@ export const appScenes: Record<Scene, () => any> = {
[Scene.DataWarehousePosthog]: () => import('./data-warehouse/posthog/DataWarehousePosthogScene'),
[Scene.DataWarehouseExternal]: () => import('./data-warehouse/external/DataWarehouseExternalScene'),
[Scene.DataWarehouseSavedQueries]: () => import('./data-warehouse/saved_queries/DataWarehouseSavedQueriesScene'),
[Scene.DataWarehouseTable]: () => import('./data-warehouse/DataWarehouseTable'),
[Scene.DataWarehouseSettings]: () => import('./data-warehouse/settings/DataWarehouseSettingsScene'),
[Scene.OrganizationSettings]: () => import('./organization/Settings'),
[Scene.OrganizationCreateFirst]: () => import('./organization/Create'),
[Scene.OrganizationCreationConfirm]: () => import('./organization/ConfirmOrganization/ConfirmOrganization'),

View File

@@ -1,76 +1,15 @@
import { SceneExport } from 'scenes/sceneTypes'
import { dataWarehouseTableLogic } from './dataWarehouseTableLogic'
import { useActions, useValues } from 'kea'
import { Form } from 'kea-forms'
import { PageHeader } from 'lib/components/PageHeader'
import { LemonSkeleton } from 'lib/lemon-ui/LemonSkeleton'
import { LemonButton, LemonDivider, LemonInput, LemonSelect, Link } from '@posthog/lemon-ui'
import { router } from 'kea-router'
import { urls } from 'scenes/urls'
import { LemonInput, LemonSelect } from '@posthog/lemon-ui'
import { Field } from 'lib/forms/Field'
export const scene: SceneExport = {
component: DataWarehousetTable,
logic: dataWarehouseTableLogic,
paramsToProps: ({ params: { id } }): (typeof dataWarehouseTableLogic)['props'] => ({
id: id,
}),
interface DataWarehouseTableFormProps {
footer?: JSX.Element
}
export function DataWarehousetTable({ id }: { id?: string } = {}): JSX.Element {
const { isEditingTable } = useValues(dataWarehouseTableLogic)
const showTableForm = id === 'new' || isEditingTable
return <div>{!id ? <LemonSkeleton /> : <>{showTableForm ? <TableForm id={id} /> : <></>}</>}</div>
}
export function TableForm({ id }: { id: string }): JSX.Element {
const { table, tableLoading, isEditingTable } = useValues(dataWarehouseTableLogic)
const { loadTable, editingTable } = useActions(dataWarehouseTableLogic)
export function DatawarehouseTableForm({ footer }: DataWarehouseTableFormProps): JSX.Element {
return (
<Form formKey="table" logic={dataWarehouseTableLogic} className="space-y-4" enableFormOnSubmit>
<PageHeader
title={id === 'new' ? 'New table' : table.name}
buttons={
<div className="flex items-center gap-2">
<LemonButton
data-attr="cancel-table"
type="secondary"
loading={tableLoading}
onClick={() => {
if (isEditingTable) {
editingTable(false)
loadTable()
} else {
router.actions.push(urls.dataWarehouse())
}
}}
>
Cancel
</LemonButton>
<LemonButton
type="primary"
data-attr="save-feature-flag"
htmlType="submit"
loading={tableLoading}
>
Save
</LemonButton>
</div>
}
caption={
<div>
External tables are supported through object storage systems like S3.{' '}
<Link
to="https://posthog.com/docs/data/data-warehouse#step-1-creating-a-bucket-in-s3"
target="_blank"
>
Learn how to set up your data
</Link>
</div>
}
/>
<LemonDivider />
<div className="flex flex-col gap-2 max-w-160">
<Field name="name" label="Table Name">
<LemonInput
@@ -136,6 +75,7 @@ export function TableForm({ id }: { id: string }): JSX.Element {
/>
</Field>
</div>
{footer}
</Form>
)
}

View File

@@ -1,14 +1,15 @@
import { lemonToast } from '@posthog/lemon-ui'
import { kea, path, props, key, listeners, afterMount, reducers, actions, selectors, connect } from 'kea'
import { kea, path, props, listeners, reducers, actions, selectors, connect } from 'kea'
import { forms } from 'kea-forms'
import { loaders } from 'kea-loaders'
import { router, urlToAction } from 'kea-router'
import { router } from 'kea-router'
import api from 'lib/api'
import { urls } from 'scenes/urls'
import { AnyPropertyFilter, Breadcrumb, DataWarehouseTable } from '~/types'
import { DataTableNode } from '~/queries/schema'
import { databaseSceneLogic } from 'scenes/data-management/database/databaseSceneLogic'
import type { dataWarehouseTableLogicType } from './dataWarehouseTableLogicType'
import { dataWarehouseSceneLogic } from './external/dataWarehouseSceneLogic'
export interface TableLogicProps {
id: string | 'new'
@@ -29,9 +30,13 @@ const NEW_WAREHOUSE_TABLE: DataWarehouseTable = {
export const dataWarehouseTableLogic = kea<dataWarehouseTableLogicType>([
path(['scenes', 'data-warehouse', 'tableLogic']),
props({} as TableLogicProps),
key(({ id }) => id),
connect(() => ({
actions: [databaseSceneLogic, ['loadDatabase']],
actions: [
databaseSceneLogic,
['loadDatabase'],
dataWarehouseSceneLogic,
['loadDataWarehouse', 'toggleSourceModal'],
],
})),
actions({
editingTable: (editing: boolean) => ({ editing }),
@@ -65,6 +70,8 @@ export const dataWarehouseTableLogic = kea<dataWarehouseTableLogicType>([
createTableSuccess: async ({ table }) => {
lemonToast.success(<>Table {table.name} created</>)
actions.loadDatabase()
actions.loadDataWarehouse()
actions.toggleSourceModal()
router.actions.replace(urls.dataWarehouse())
},
updateTableSuccess: async ({ table }) => {
@@ -122,25 +129,4 @@ export const dataWarehouseTableLogic = kea<dataWarehouseTableLogicType>([
},
},
})),
urlToAction(({ actions, props }) => ({
[urls.dataWarehouseTable(props.id ?? 'new')]: (_, __, ___, { method }) => {
// If the URL was pushed (user clicked on a link), reset the scene's data.
// This avoids resetting form fields if you click back/forward.
if (method === 'PUSH') {
if (props.id) {
actions.loadTable()
} else {
actions.resetTable()
}
}
},
})),
afterMount(async ({ props, actions }) => {
// if (props.id !== 'new') {
// await actions.loadTable()
// }
if (props.id === 'new') {
actions.resetTable()
}
}),
])

View File

@@ -1,14 +1,18 @@
import { LemonButton, LemonTag, Link } from '@posthog/lemon-ui'
import { LemonTag, Link, LemonButtonWithSideAction } from '@posthog/lemon-ui'
import { PageHeader } from 'lib/components/PageHeader'
import { SceneExport } from 'scenes/sceneTypes'
import { urls } from 'scenes/urls'
import { useValues } from 'kea'
import { useActions, useValues } from 'kea'
import { router } from 'kea-router'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
import { ProductKey } from '~/types'
import { DataWarehouseTablesContainer } from './DataWarehouseTables'
import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic'
import { DataWarehousePageTabs, DataWarehouseTab } from '../DataWarehousePageTabs'
import SourceModal from './SourceModal'
import { IconSettings } from 'lib/lemon-ui/icons'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'
export const scene: SceneExport = {
component: DataWarehouseExternalScene,
@@ -16,7 +20,10 @@ export const scene: SceneExport = {
}
export function DataWarehouseExternalScene(): JSX.Element {
const { shouldShowEmptyState, shouldShowProductIntroduction } = useValues(dataWarehouseSceneLogic)
const { shouldShowEmptyState, shouldShowProductIntroduction, isSourceModalOpen } =
useValues(dataWarehouseSceneLogic)
const { toggleSourceModal } = useActions(dataWarehouseSceneLogic)
const { featureFlags } = useValues(featureFlagLogic)
return (
<div>
@@ -30,14 +37,20 @@ export function DataWarehouseExternalScene(): JSX.Element {
</div>
}
buttons={
!shouldShowProductIntroduction ? (
<LemonButton
featureFlags[FEATURE_FLAGS.DATA_WAREHOUSE_EXTERNAL_LINK] ? (
<LemonButtonWithSideAction
type="primary"
to={urls.dataWarehouseTable('new')}
data-attr="new-data-warehouse-table"
sideAction={{
icon: <IconSettings />,
onClick: () => router.actions.push(urls.dataWarehouseSettings()),
'data-attr': 'saved-insights-new-insight-dropdown',
}}
data-attr="new-data-warehouse-easy-link"
key={'new-data-warehouse-easy-link'}
onClick={toggleSourceModal}
>
New Table
</LemonButton>
Link Source
</LemonButtonWithSideAction>
) : undefined
}
caption={
@@ -59,13 +72,14 @@ export function DataWarehouseExternalScene(): JSX.Element {
description={
'Bring your production database, revenue data, CRM contacts or any other data into PostHog.'
}
action={() => router.actions.push(urls.dataWarehouseTable('new'))}
action={() => toggleSourceModal()}
isEmpty={shouldShowEmptyState}
docsURL="https://posthog.com/docs/data/data-warehouse"
productKey={ProductKey.DATA_WAREHOUSE}
/>
)}
{!shouldShowEmptyState && <DataWarehouseTablesContainer />}
<SourceModal isOpen={isSourceModalOpen} onClose={toggleSourceModal} />
</div>
)
}

View File

@@ -0,0 +1,129 @@
import { LemonButton, LemonDivider, LemonInput, LemonModal, LemonModalProps } from '@posthog/lemon-ui'
import { Form } from 'kea-forms'
import { ConnectorConfigType, sourceModalLogic } from './sourceModalLogic'
import { useActions, useValues } from 'kea'
import { DatawarehouseTableForm } from '../DataWarehouseTableForm'
import { Field } from 'lib/forms/Field'
import stripeLogo from 'public/stripe-logo.svg'
interface SourceModalProps extends LemonModalProps {}
export default function SourceModal(props: SourceModalProps): JSX.Element {
const { tableLoading, isExternalDataSourceSubmitting, selectedConnector, isManualLinkFormVisible, connectors } =
useValues(sourceModalLogic)
const { selectConnector, toggleManualLinkFormVisible, resetExternalDataSource, resetTable } =
useActions(sourceModalLogic)
const MenuButton = (config: ConnectorConfigType): JSX.Element => {
const onClick = (): void => {
selectConnector(config)
}
return (
<LemonButton onClick={onClick} className="w-100" center type="secondary">
<img src={stripeLogo} alt={`stripe logo`} height={50} />
</LemonButton>
)
}
const onClear = (): void => {
selectConnector(null)
toggleManualLinkFormVisible(false)
resetExternalDataSource()
resetTable()
}
const onManualLinkClick = (): void => {
toggleManualLinkFormVisible(true)
}
const formToShow = (): JSX.Element => {
if (selectedConnector) {
return (
<Form logic={sourceModalLogic} formKey={'externalDataSource'} className="space-y-4" enableFormOnSubmit>
<Field name="account_id" label="Account Id">
<LemonInput className="ph-ignore-input" autoFocus data-attr="account-id" placeholder="acct_" />
</Field>
<Field name="client_secret" label="Client Secret">
<LemonInput
className="ph-ignore-input"
autoFocus
data-attr="client-secret"
placeholder="sklive"
/>
</Field>
<LemonDivider className="mt-4" />
<div className="mt-2 flex flex-row justify-end gap-2">
<LemonButton type="secondary" center data-attr="source-modal-back-button" onClick={onClear}>
Back
</LemonButton>
<LemonButton
type="primary"
center
htmlType="submit"
data-attr="source-link"
loading={isExternalDataSourceSubmitting}
>
Link
</LemonButton>
</div>
</Form>
)
}
if (isManualLinkFormVisible) {
return (
<div>
<DatawarehouseTableForm
footer={
<>
<LemonDivider className="mt-4" />
<div className="mt-2 flex flex-row justify-end gap-2">
<LemonButton
type="secondary"
center
data-attr="source-modal-back-button"
onClick={onClear}
>
Back
</LemonButton>
<LemonButton
type="primary"
center
htmlType="submit"
data-attr="source-link"
loading={tableLoading}
>
Link
</LemonButton>
</div>
</>
}
/>
</div>
)
}
return (
<div className="flex flex-col gap-2">
{connectors.map((config, index) => (
<MenuButton key={config.name + '_' + index} {...config} />
))}
<LemonButton onClick={onManualLinkClick} className="w-100" center type="secondary">
Manual Link
</LemonButton>
</div>
)
}
return (
<LemonModal
{...props}
onAfterClose={() => onClear()}
title="Data Sources"
description={selectedConnector ? selectedConnector.caption : null}
>
{formToShow()}
</LemonModal>
)
}

View File

@@ -1,4 +1,4 @@
import { afterMount, connect, kea, path, selectors } from 'kea'
import { actions, afterMount, connect, kea, path, reducers, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import api, { PaginatedResponse } from 'lib/api'
import { DataWarehouseTable, ProductKey } from '~/types'
@@ -12,6 +12,17 @@ export const dataWarehouseSceneLogic = kea<dataWarehouseSceneLogicType>([
connect(() => ({
values: [userLogic, ['user']],
})),
actions({
toggleSourceModal: true,
}),
reducers({
isSourceModalOpen: [
false,
{
toggleSourceModal: (state) => !state,
},
],
}),
loaders({
dataWarehouse: [
null as PaginatedResponse<DataWarehouseTable> | null,

View File

@@ -0,0 +1,107 @@
import { actions, connect, kea, path, reducers, selectors, listeners } from 'kea'
import type { sourceModalLogicType } from './sourceModalLogicType'
import { forms } from 'kea-forms'
import { ExternalDataStripeSourceCreatePayload } from '~/types'
import api from 'lib/api'
import { lemonToast } from '@posthog/lemon-ui'
import { dataWarehouseTableLogic } from '../dataWarehouseTableLogic'
import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic'
import { router } from 'kea-router'
import { urls } from 'scenes/urls'
import { dataWarehouseSettingsLogic } from '../settings/dataWarehouseSettingsLogic'
export interface ConnectorConfigType {
name: string
fields: string[]
caption: string
disabledReason: string | null
}
// TODO: add icon
export const CONNECTORS: ConnectorConfigType[] = [
{
name: 'Stripe',
fields: ['accound_id', 'client_secret'],
caption: 'Enter your Stripe credentials to link your Stripe to PostHog',
disabledReason: null,
},
]
export const sourceModalLogic = kea<sourceModalLogicType>([
path(['scenes', 'data-warehouse', 'external', 'sourceModalLogic']),
actions({
selectConnector: (connector: ConnectorConfigType | null) => ({ connector }),
toggleManualLinkFormVisible: (visible: boolean) => ({ visible }),
}),
connect({
values: [dataWarehouseTableLogic, ['tableLoading'], dataWarehouseSettingsLogic, ['dataWarehouseSources']],
actions: [
dataWarehouseSceneLogic,
['toggleSourceModal'],
dataWarehouseTableLogic,
['resetTable'],
dataWarehouseSettingsLogic,
['loadSources'],
],
}),
reducers({
selectedConnector: [
null as ConnectorConfigType | null,
{
selectConnector: (_, { connector }) => connector,
},
],
isManualLinkFormVisible: [
false,
{
toggleManualLinkFormVisible: (_, { visible }) => visible,
},
],
}),
selectors({
showFooter: [
(s) => [s.selectedConnector, s.isManualLinkFormVisible],
(selectedConnector, isManualLinkFormVisible) => selectedConnector || isManualLinkFormVisible,
],
connectors: [
(s) => [s.dataWarehouseSources],
(sources) => {
return CONNECTORS.map((connector) => ({
...connector,
disabledReason:
sources && sources.results.find((source) => source.source_type === connector.name)
? 'Already linked'
: null,
}))
},
],
}),
forms(() => ({
externalDataSource: {
defaults: { account_id: '', client_secret: '' } as ExternalDataStripeSourceCreatePayload,
errors: ({ account_id, client_secret }) => {
return {
account_id: !account_id && 'Please enter an account id.',
client_secret: !client_secret && 'Please enter a client secret.',
}
},
submit: async (payload: ExternalDataStripeSourceCreatePayload) => {
const newResource = await api.externalDataSources.create(payload)
return newResource
},
},
})),
listeners(({ actions }) => ({
submitExternalDataSourceSuccess: () => {
lemonToast.success('New Data Resource Created')
actions.toggleSourceModal()
actions.resetExternalDataSource()
actions.loadSources()
router.actions.push(urls.dataWarehouseSettings())
},
submitExternalDataSourceFailure: () => {
lemonToast.error('Error creating new Data Resource. Check that provided credentials are valid.')
},
})),
])

View File

@@ -0,0 +1,85 @@
import { LemonButton, LemonTable, LemonTag } from '@posthog/lemon-ui'
import { PageHeader } from 'lib/components/PageHeader'
import { SceneExport } from 'scenes/sceneTypes'
import { dataWarehouseSettingsLogic } from './dataWarehouseSettingsLogic'
import { useActions, useValues } from 'kea'
import { dataWarehouseSceneLogic } from '../external/dataWarehouseSceneLogic'
import SourceModal from '../external/SourceModal'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'
export const scene: SceneExport = {
component: DataWarehouseSettingsScene,
logic: dataWarehouseSettingsLogic,
}
const StatusTagSetting = {
running: 'default',
succeeded: 'primary',
error: 'danger',
}
export function DataWarehouseSettingsScene(): JSX.Element {
const { dataWarehouseSources, dataWarehouseSourcesLoading } = useValues(dataWarehouseSettingsLogic)
const { toggleSourceModal } = useActions(dataWarehouseSceneLogic)
const { isSourceModalOpen } = useValues(dataWarehouseSceneLogic)
const { featureFlags } = useValues(featureFlagLogic)
return (
<div>
<PageHeader
title={
<div className="flex items-center gap-2">
Data Warehouse
<LemonTag type="warning" className="uppercase">
Beta
</LemonTag>
</div>
}
buttons={
featureFlags[FEATURE_FLAGS.DATA_WAREHOUSE_EXTERNAL_LINK] ? (
<LemonButton
type="primary"
data-attr="new-data-warehouse-easy-link"
key={'new-data-warehouse-easy-link'}
onClick={toggleSourceModal}
>
Link Source
</LemonButton>
) : undefined
}
caption={
<div>
Linked data sources will appear here. Data sources can take a while to sync depending on the
size of the source.
</div>
}
/>
<LemonTable
dataSource={dataWarehouseSources?.results ?? []}
loading={dataWarehouseSourcesLoading}
columns={[
{
title: 'Source Type',
key: 'name',
width: 0,
render: function RenderName(_, source) {
return source.source_type
},
},
{
title: 'Status',
key: 'status',
width: 0,
render: function RenderStatus(_, source) {
return (
<LemonTag type={StatusTagSetting[source.status] || 'default'}>{source.status}</LemonTag>
)
},
},
]}
/>
<SourceModal isOpen={isSourceModalOpen} onClose={toggleSourceModal} />
</div>
)
}

View File

@@ -0,0 +1,41 @@
import { afterMount, kea, path, selectors } from 'kea'
import type { dataWarehouseSettingsLogicType } from './dataWarehouseSettingsLogicType'
import { loaders } from 'kea-loaders'
import api, { PaginatedResponse } from 'lib/api'
import { ExternalDataStripeSource, Breadcrumb } from '~/types'
import { urls } from 'scenes/urls'
export interface DataWarehouseSource {}
export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
path(['scenes', 'data-warehouse', 'settings', 'dataWarehouseSettingsLogic']),
loaders({
dataWarehouseSources: [
null as PaginatedResponse<ExternalDataStripeSource> | null,
{
loadSources: async () => {
return api.externalDataSources.list()
},
},
],
}),
selectors({
breadcrumbs: [
() => [],
(): Breadcrumb[] => [
{
name: `Data Warehouse`,
path: urls.dataWarehouseExternal(),
},
{
name: 'Data Warehouse Settings',
path: urls.dataWarehouseSettings(),
},
],
],
}),
afterMount(({ actions }) => {
actions.loadSources()
}),
])

View File

@@ -35,10 +35,10 @@ const sceneNavAlias: Partial<Record<Scene, Scene>> = {
[Scene.EarlyAccessFeature]: Scene.EarlyAccessFeatures,
[Scene.Survey]: Scene.Surveys,
[Scene.SurveyTemplates]: Scene.Surveys,
[Scene.DataWarehouseTable]: Scene.DataWarehouse,
[Scene.DataWarehousePosthog]: Scene.DataWarehouse,
[Scene.DataWarehouseExternal]: Scene.DataWarehouse,
[Scene.DataWarehouseSavedQueries]: Scene.DataWarehouse,
[Scene.DataWarehouseSettings]: Scene.DataWarehouse,
[Scene.AppMetrics]: Scene.Apps,
[Scene.ReplaySingle]: Scene.Replay,
[Scene.ReplayPlaylist]: Scene.ReplayPlaylist,

View File

@@ -48,7 +48,7 @@ export enum Scene {
DataWarehousePosthog = 'DataWarehousePosthog',
DataWarehouseExternal = 'DataWarehouseExternal',
DataWarehouseSavedQueries = 'DataWarehouseSavedQueries',
DataWarehouseTable = 'DataWarehouseTable',
DataWarehouseSettings = 'DataWarehouseSettings',
OrganizationSettings = 'OrganizationSettings',
OrganizationCreateFirst = 'OrganizationCreate',
ProjectHomepage = 'ProjectHomepage',

View File

@@ -187,9 +187,9 @@ export const sceneConfigurations: Partial<Record<Scene, SceneConfig>> = {
projectBased: true,
name: 'Data Warehouse',
},
[Scene.DataWarehouseTable]: {
[Scene.DataWarehouseSettings]: {
projectBased: true,
name: 'Data Warehouse Table',
name: 'Data Warehouse Settings',
},
[Scene.EarlyAccessFeatures]: {
projectBased: true,
@@ -455,10 +455,10 @@ export const routes: Record<string, Scene> = {
[urls.survey(':id')]: Scene.Survey,
[urls.surveyTemplates()]: Scene.SurveyTemplates,
[urls.dataWarehouse()]: Scene.DataWarehouse,
[urls.dataWarehouseTable(':id')]: Scene.DataWarehouseTable,
[urls.dataWarehousePosthog()]: Scene.DataWarehousePosthog,
[urls.dataWarehouseExternal()]: Scene.DataWarehouseExternal,
[urls.dataWarehouseSavedQueries()]: Scene.DataWarehouseSavedQueries,
[urls.dataWarehouseSettings()]: Scene.DataWarehouseSettings,
[urls.featureFlags()]: Scene.FeatureFlags,
[urls.featureFlag(':id')]: Scene.FeatureFlag,
[urls.annotations()]: Scene.Annotations,

View File

@@ -111,11 +111,11 @@ export const urls = {
surveys: (): string => '/surveys',
survey: (id: ':id' | 'new' | string): string => `/surveys/${id}`,
surveyTemplates: (): string => '/survey_templates',
dataWarehouse: (): string => '/warehouse',
dataWarehouseTable: (id: ':id' | 'new' | string): string => `/warehouse/${id}`,
dataWarehouse: (): string => '/data-warehouse',
dataWarehousePosthog: (): string => '/data-warehouse/posthog',
dataWarehouseExternal: (): string => '/data-warehouse/external',
dataWarehouseSavedQueries: (): string => '/data-warehouse/views',
dataWarehouseSettings: (): string => '/data-warehouse/settings',
annotations: (): string => '/annotations',
annotation: (id: AnnotationType['id'] | ':id'): string => `/annotations/${id}`,
projectApps: (tab?: PluginTab): string => `/project/apps${tab ? `?tab=${tab}` : ''}`,

View File

@@ -3141,6 +3141,19 @@ export interface DataWarehouseViewLink {
from_join_key?: string
}
export interface ExternalDataStripeSourceCreatePayload {
account_id: string
client_secret: string
}
export interface ExternalDataStripeSource {
id: string
source_id: string
connection_id: string
status: string
source_type: string
}
export type BatchExportDestinationS3 = {
type: 'S3'
config: {

View File

@@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0357_add_redshift_batch_export_destination
posthog: 0358_externaldatasource
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019

View File

@@ -3,7 +3,7 @@ from rest_framework import decorators, exceptions
from posthog.api.routing import DefaultRouterPlusPlus
from posthog.batch_exports import http as batch_exports
from posthog.settings import EE_AVAILABLE
from posthog.warehouse.api import saved_query, table, view_link
from posthog.warehouse.api import external_data_source, saved_query, table, view_link
from ..session_recordings.session_recording_api import SessionRecordingViewSet
from . import (
activity_log,
@@ -210,6 +210,14 @@ projects_router.register(r"uploaded_media", uploaded_media.MediaViewSet, "projec
projects_router.register(r"tags", tagged_item.TaggedItemViewSet, "project_tags", ["team_id"])
projects_router.register(r"query", query.QueryViewSet, "project_query", ["team_id"])
# External data resources
projects_router.register(
r"external_data_sources",
external_data_source.ExternalDataSourceViewSet,
"project_external_data_sources",
["team_id"],
)
# General endpoints (shared across CH & PG)
router.register(r"login", authentication.LoginViewSet, "login")
router.register(r"login/token", authentication.TwoFactorViewSet)

View File

@@ -327,6 +327,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs):
name="delete expired exported assets",
)
sender.add_periodic_task(
crontab(minute="*/10"),
sync_datawarehouse_sources.s(),
name="sync datawarehouse sources that have settled in s3 bucket",
)
# Set up clickhouse query instrumentation
@task_prerun.connect
@@ -1081,3 +1087,13 @@ def ee_persist_finished_recordings():
pass
else:
persist_finished_recordings()
@app.task(ignore_result=True)
def sync_datawarehouse_sources():
try:
from posthog.warehouse.sync_resource import sync_resources
except ImportError:
pass
else:
sync_resources()

View File

@@ -0,0 +1,42 @@
# Generated by Django 3.2.19 on 2023-09-19 14:19
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import posthog.models.utils
class Migration(migrations.Migration):
dependencies = [
("posthog", "0357_add_redshift_batch_export_destination"),
]
operations = [
migrations.CreateModel(
name="ExternalDataSource",
fields=[
("created_at", models.DateTimeField(auto_now_add=True)),
(
"id",
models.UUIDField(
default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
),
),
("source_id", models.CharField(max_length=400)),
("connection_id", models.CharField(max_length=400)),
(
"created_by",
models.ForeignKey(
blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL
),
),
("status", models.CharField(max_length=400)),
("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")),
("source_type", models.CharField(choices=[("Stripe", "Stripe")], max_length=128)),
("are_tables_created", models.BooleanField(default=False)),
],
options={
"abstract": False,
},
),
]

View File

@@ -38,6 +38,7 @@ from posthog.settings.statsd import *
from posthog.settings.object_storage import *
from posthog.settings.temporal import *
from posthog.settings.web import *
from posthog.settings.airbyte import *
from posthog.settings.utils import get_from_env, str_to_bool

View File

@@ -0,0 +1,8 @@
import os
AIRBYTE_API_KEY = os.getenv("AIRBYTE_API_KEY", None)
AIRBYTE_WORKSPACE_ID = os.getenv("AIRBYTE_WORKSPACE_ID", None)
AIRBYTE_BUCKET_REGION = os.getenv("AIRBYTE_BUCKET_REGION", None)
AIRBYTE_BUCKET_KEY = os.getenv("AIRBYTE_BUCKET_KEY", None)
AIRBYTE_BUCKET_SECRET = os.getenv("AIRBYTE_BUCKET_SECRET", None)
AIRBYTE_BUCKET_URL = os.getenv("AIRBYTE_BUCKET_URL", None)

View File

@@ -0,0 +1,82 @@
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from posthog.permissions import OrganizationMemberPermissions
from rest_framework.exceptions import NotAuthenticated
from rest_framework.permissions import IsAuthenticated
from rest_framework import filters, serializers, viewsets
from posthog.warehouse.models import ExternalDataSource
from posthog.warehouse.external_data_source.source import StripeSourcePayload, create_stripe_source, delete_source
from posthog.warehouse.external_data_source.connection import create_connection, start_sync
from posthog.warehouse.external_data_source.destination import create_destination, delete_destination
from posthog.api.routing import StructuredViewSetMixin
from posthog.models import User
from typing import Any
class ExternalDataSourceSerializers(serializers.ModelSerializer):
account_id = serializers.CharField(write_only=True)
client_secret = serializers.CharField(write_only=True)
class Meta:
model = ExternalDataSource
fields = ["id", "source_id", "created_at", "created_by", "status", "client_secret", "account_id", "source_type"]
read_only_fields = ["id", "source_id", "created_by", "created_at", "status", "source_type"]
class ExternalDataSourceViewSet(StructuredViewSetMixin, viewsets.ModelViewSet):
"""
Create, Read, Update and Delete External data Sources.
"""
queryset = ExternalDataSource.objects.all()
serializer_class = ExternalDataSourceSerializers
permission_classes = [IsAuthenticated, OrganizationMemberPermissions]
filter_backends = [filters.SearchFilter]
search_fields = ["source_id"]
ordering = "-created_at"
def get_queryset(self):
if not isinstance(self.request.user, User) or self.request.user.current_team is None:
raise NotAuthenticated()
if self.action == "list":
return self.queryset.filter(team_id=self.team_id).prefetch_related("created_by").order_by(self.ordering)
return self.queryset.filter(team_id=self.team_id).prefetch_related("created_by").order_by(self.ordering)
def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
account_id = request.data["account_id"]
client_secret = request.data["client_secret"]
stripe_payload = StripeSourcePayload(
account_id=account_id,
client_secret=client_secret,
)
new_source = create_stripe_source(stripe_payload)
try:
new_destination = create_destination(self.team_id)
except Exception as e:
delete_source(new_source.source_id)
raise e
try:
new_connection = create_connection(new_source.source_id, new_destination.destination_id)
except Exception as e:
delete_source(new_source.source_id)
delete_destination(new_destination.destination_id)
raise e
ExternalDataSource.objects.create(
source_id=new_source.source_id,
connection_id=new_connection.connection_id,
team=self.team,
status="running",
source_type="Stripe",
)
start_sync(new_connection.connection_id)
return Response(status=status.HTTP_201_CREATED, data={"source_id": new_source.source_id})

View File

@@ -0,0 +1,106 @@
import requests
from django.conf import settings
from pydantic import BaseModel
from typing import Dict
AIRBYTE_CONNECTION_URL = "https://api.airbyte.com/v1/connections"
AIRBYTE_JOBS_URL = "https://api.airbyte.com/v1/jobs"
class ExternalDataConnection(BaseModel):
connection_id: str
source_id: str
destination_id: str
name: str
workspace_id: str
def create_connection(source_id: str, destination_id: str) -> ExternalDataConnection:
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.")
headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
payload = {
"schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"},
"namespaceFormat": None,
"sourceId": source_id,
"destinationId": destination_id,
}
response = requests.post(AIRBYTE_CONNECTION_URL, json=payload, headers=headers)
response_payload = response.json()
if not response.ok:
raise ValueError(response_payload["message"])
update_connection_stream(response_payload["connectionId"], headers)
return ExternalDataConnection(
source_id=response_payload["sourceId"],
name=response_payload["name"],
connection_id=response_payload["connectionId"],
workspace_id=response_payload["workspaceId"],
destination_id=response_payload["destinationId"],
)
def update_connection_stream(connection_id: str, headers: Dict):
connection_id_url = f"{AIRBYTE_CONNECTION_URL}/{connection_id}"
# TODO: hardcoded to stripe stream right now
payload = {
"configurations": {"streams": [{"name": "customers", "syncMode": "full_refresh_overwrite"}]},
"schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"},
"namespaceFormat": None,
}
response = requests.patch(connection_id_url, json=payload, headers=headers)
response_payload = response.json()
if not response.ok:
raise ValueError(response_payload["message"])
def delete_connection(connection_id: str) -> None:
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to delete a connection.")
headers = {"Authorization": f"Bearer {token}"}
response = requests.delete(AIRBYTE_CONNECTION_URL + "/" + connection_id, headers=headers)
if not response.ok:
raise ValueError(response.json()["message"])
# Fire and forget
def start_sync(connection_id: str):
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to start sync.")
headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
payload = {"jobType": "sync", "connectionId": connection_id}
requests.post(AIRBYTE_JOBS_URL, json=payload, headers=headers)
def retrieve_sync(connection_id: str):
token = settings.AIRBYTE_API_KEY
headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
params = {"connectionId": connection_id, "limit": 1}
response = requests.get(AIRBYTE_JOBS_URL, params=params, headers=headers)
response_payload = response.json()
if not response.ok:
raise ValueError(response_payload["message"])
data = response_payload.get("data", [])
if not data:
return None
latest_job = response_payload["data"][0]
return latest_job

View File

@@ -0,0 +1,52 @@
import requests
from django.conf import settings
from pydantic import BaseModel
AIRBYTE_DESTINATION_URL = "https://api.airbyte.com/v1/destinations"
class ExternalDataDestination(BaseModel):
destination_id: str
def create_destination(team_id: int) -> ExternalDataDestination:
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.")
payload = {
"configuration": {
"format": {"format_type": "Parquet", "compression_codec": "UNCOMPRESSED"},
"destinationType": "s3",
"s3_bucket_region": settings.AIRBYTE_BUCKET_REGION,
"access_key_id": settings.AIRBYTE_BUCKET_KEY,
"secret_access_key": settings.AIRBYTE_BUCKET_SECRET,
"s3_bucket_name": "databeach-hackathon",
"s3_bucket_path": f"airbyte/{team_id}",
},
"name": f"S3/{team_id}",
"workspaceId": settings.AIRBYTE_WORKSPACE_ID,
}
headers = {"accept": "application/json", "content-type": "application/json", "authorization": f"Bearer {token}"}
response = requests.post(AIRBYTE_DESTINATION_URL, json=payload, headers=headers)
response_payload = response.json()
if not response.ok:
raise ValueError(response_payload["message"])
return ExternalDataDestination(
destination_id=response_payload["destinationId"],
)
def delete_destination(destination_id: str) -> None:
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to delete a destiantion.")
headers = {"authorization": f"Bearer {token}"}
response = requests.delete(AIRBYTE_DESTINATION_URL + "/" + destination_id, headers=headers)
if not response.ok:
raise ValueError(response.json()["message"])

View File

@@ -0,0 +1,106 @@
import requests
from django.conf import settings
from posthog.models.utils import UUIDT
from pydantic import BaseModel, field_validator
from typing import Dict, Optional
import datetime as dt
AIRBYTE_SOURCE_URL = "https://api.airbyte.com/v1/sources"
class StripeSourcePayload(BaseModel):
account_id: str
client_secret: str
start_date: Optional[dt.datetime] = None
lookback_window_days: Optional[int] = None
slice_range: Optional[int] = None
@field_validator("account_id")
@classmethod
def account_id_is_valid_uuid(cls, v: str) -> str:
try:
UUIDT.is_valid_uuid(v)
except ValueError:
raise ValueError("account_id must be a valid UUID.")
return v
@field_validator("start_date")
@classmethod
def valid_iso_start_date(cls, v: Optional[str]) -> Optional[str]:
from posthog.batch_exports.http import validate_date_input
if not v:
return v
try:
validate_date_input(v)
except ValueError:
raise ValueError("start_date must be a valid ISO date string.")
return v
class ExternalDataSource(BaseModel):
source_id: str
name: str
source_type: str
workspace_id: str
def create_stripe_source(payload: StripeSourcePayload) -> ExternalDataSource:
workspace_id = settings.AIRBYTE_WORKSPACE_ID
if not workspace_id:
raise ValueError("AIRBYTE_WORKSPACE_ID must be set in order to create a source.")
optional_config = {}
if payload.start_date:
optional_config["start_date"] = payload.start_date.isoformat()
if payload.lookback_window_days:
optional_config["lookback_window_days"] = payload.lookback_window_days
if payload.slice_range:
optional_config["slice_range"] = payload.slice_range
payload = {
"configuration": {
"sourceType": "stripe",
"account_id": payload.account_id,
"client_secret": payload.client_secret,
**optional_config,
},
"name": "stripe source",
"workspaceId": workspace_id,
}
return _create_source(payload)
def _create_source(payload: Dict) -> ExternalDataSource:
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.")
headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
response = requests.post(AIRBYTE_SOURCE_URL, json=payload, headers=headers)
response_payload = response.json()
if not response.ok:
raise ValueError(response_payload["message"])
return ExternalDataSource(
source_id=response_payload["sourceId"],
name=response_payload["name"],
source_type=response_payload["sourceType"],
workspace_id=response_payload["workspaceId"],
)
def delete_source(source_id):
token = settings.AIRBYTE_API_KEY
if not token:
raise ValueError("AIRBYTE_API_KEY must be set in order to delete a source.")
headers = {"authorization": f"Bearer {token}"}
response = requests.delete(AIRBYTE_SOURCE_URL + "/" + source_id, headers=headers)
if not response.ok:
raise ValueError(response.json()["message"])

View File

@@ -2,3 +2,4 @@ from .table import *
from .credential import *
from .datawarehouse_saved_query import *
from .view_link import *
from .external_data_source import *

View File

@@ -0,0 +1,17 @@
from posthog.models.utils import UUIDModel, CreatedMetaFields, sane_repr
from django.db import models
from posthog.models.team import Team
class ExternalDataSource(CreatedMetaFields, UUIDModel):
class Type(models.TextChoices):
STRIPE = "Stripe", "Stripe"
source_id: models.CharField = models.CharField(max_length=400)
connection_id: models.CharField = models.CharField(max_length=400)
team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)
status: models.CharField = models.CharField(max_length=400)
source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices)
are_tables_created: models.BooleanField = models.BooleanField(default=False)
__repr__ = sane_repr("source_id")

View File

@@ -0,0 +1,65 @@
from posthog.warehouse.models.external_data_source import ExternalDataSource
from posthog.warehouse.models import DataWarehouseCredential, DataWarehouseTable
from posthog.warehouse.external_data_source.connection import retrieve_sync
from posthog.celery import app
from django.conf import settings
import structlog
logger = structlog.get_logger(__name__)
def sync_resources():
resources = ExternalDataSource.objects.filter(are_tables_created=False, status__in=["running", "error"])
for resource in resources:
_sync_resource.delay(resource.pk)
@app.task(ignore_result=True)
def _sync_resource(resource_id):
resource = ExternalDataSource.objects.get(pk=resource_id)
try:
job = retrieve_sync(resource.connection_id)
except Exception as e:
logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e)
resource.status = "error"
resource.save()
return
if job is None:
logger.error(f"No jobs found for connection: {resource.connection_id}")
resource.status = "error"
resource.save()
if job["status"] == "succeeded":
resource = ExternalDataSource.objects.get(pk=resource_id)
credential, _ = DataWarehouseCredential.objects.get_or_create(
team_id=resource.team.pk,
access_key=settings.AIRBYTE_BUCKET_KEY,
access_secret=settings.AIRBYTE_BUCKET_SECRET,
)
data = {
"credential": credential,
"name": "stripe_customers",
"format": "Parquet",
"url_pattern": f"{settings.AIRBYTE_BUCKET_URL}/{resource.team.pk}/customers/*.parquet",
"team_id": resource.team.pk,
}
table = DataWarehouseTable(**data)
try:
table.columns = table.get_columns()
except Exception as e:
logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e)
else:
table.save()
resource.are_tables_created = True
resource.status = job["status"]
resource.save()
else:
resource.status = job["status"]
resource.save()