feat(batch-exports): Add UI for Databricks batch exports destination (#39233)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Ross
2025-10-07 15:19:20 +01:00
committed by GitHub
parent f79d61899e
commit 3a673b937d
17 changed files with 362 additions and 52 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

View File

@@ -7,6 +7,7 @@ import api from 'lib/api'
import { IntegrationView } from 'lib/integrations/IntegrationView'
import { integrationsLogic } from 'lib/integrations/integrationsLogic'
import { getIntegrationNameFromKind } from 'lib/integrations/utils'
import { DatabricksSetupModal } from 'scenes/integrations/databricks/DatabricksSetupModal'
import { urls } from 'scenes/urls'
import { CyclotronJobInputSchemaType } from '~/types'
@@ -61,6 +62,13 @@ export function IntegrationChoice({
input.click()
}
const handleNewDatabricksIntegration = (integrationId: number | undefined): void => {
if (integrationId) {
onChange?.(integrationId)
}
closeNewIntegrationModal()
}
const button = (
<LemonMenu
items={[
@@ -103,21 +111,30 @@ export function IntegrationChoice({
},
],
}
: {
items: [
{
to: api.integrations.authorizeUrl({
kind,
next: redirectUrl,
}),
disableClientSideRouting: true,
onClick: beforeRedirect,
label: integrationsOfKind?.length
? `Connect to a different integration for ${kindName}`
: `Connect to ${kindName}`,
},
],
},
: ['databricks'].includes(kind)
? {
items: [
{
label: 'Configure new Databricks account',
onClick: () => openNewIntegrationModal('databricks'),
},
],
}
: {
items: [
{
to: api.integrations.authorizeUrl({
kind,
next: redirectUrl,
}),
disableClientSideRouting: true,
onClick: beforeRedirect,
label: integrationsOfKind?.length
? `Connect to a different integration for ${kindName}`
: `Connect to ${kindName}`,
},
],
},
{
items: [
{
@@ -158,6 +175,11 @@ export function IntegrationChoice({
integration={integrationKind || undefined}
onComplete={closeNewIntegrationModal}
/>
<DatabricksSetupModal
isOpen={newIntegrationModalKind === 'databricks'}
integration={integrationKind || undefined}
onComplete={handleNewDatabricksIntegration}
/>
</>
)
}

View File

@@ -201,6 +201,7 @@ export const FEATURE_FLAGS = {
QUERY_CACHE_USE_S3: 'query-cache-use-s3', // owner: @aspicer #team-product-analytics
DASHBOARD_THREADS: 'dashboard-threads', // owner: @aspicer #team-product-analytics
BATCH_EXPORTS_POSTHOG_HTTP: 'posthog-http-batch-exports',
BATCH_EXPORTS_DATABRICKS: 'databricks-batch-exports', // owner: @rossgray #team-batch-exports
HEDGEHOG_SKIN_SPIDERHOG: 'hedgehog-skin-spiderhog', // owner: @benjackwhite
WEB_EXPERIMENTS: 'web-experiments', // owner: @team-feature-success
ENVIRONMENTS: 'environments', // owner: #team-platform-features

View File

@@ -3,6 +3,7 @@ import { capitalizeFirstLetter } from 'lib/utils'
import { IntegrationKind } from '~/types'
import IconClickUp from 'public/services/clickup.svg'
import IconDatabricks from 'public/services/databricks.png'
import IconGitHub from 'public/services/github.png'
import IconGoogleAds from 'public/services/google-ads.png'
import IconGoogleCloudStorage from 'public/services/google-cloud-storage.png'
@@ -38,6 +39,7 @@ export const ICONS: Record<IntegrationKind, any> = {
twilio: IconTwilio,
clickup: IconClickUp,
'reddit-ads': IconReddit,
databricks: IconDatabricks,
}
export const getIntegrationNameFromKind = (kind: string): string => {

View File

@@ -1246,6 +1246,17 @@ export function IconTwilio(props: LemonIconProps): JSX.Element {
)
}
export function IconDatabricks(props: LemonIconProps): JSX.Element {
return (
<LemonIconBase viewBox="0 0 300 325" {...props}>
<path
fill="currentColor"
d="M283.923 136.449L150.144 213.624L6.88995 131.168L0 134.982V194.844L150.144 281.115L283.923 204.234V235.926L150.144 313.1L6.88995 230.644L0 234.458V244.729L150.144 331L300 244.729V184.867L293.11 181.052L150.144 263.215L16.0766 186.334V154.643L150.144 231.524L300 145.253V86.2713L292.536 81.8697L150.144 163.739L22.9665 90.9663L150.144 17.8998L254.641 78.055L263.828 72.773V65.4371L150.144 0L0 86.2713V95.6613L150.144 181.933L283.923 104.758V136.449Z"
/>
</LemonIconBase>
)
}
export function IconChrome(props: LemonIconProps): JSX.Element {
return (
<LemonIconBase {...props}>

View File

@@ -15259,7 +15259,8 @@
"github",
"meta-ads",
"clickup",
"reddit-ads"
"reddit-ads",
"databricks"
],
"type": "string"
},

View File

@@ -12,6 +12,7 @@ import {
Tooltip,
} from '@posthog/lemon-ui'
import { IntegrationChoice } from 'lib/components/CyclotronJob/integrations/IntegrationChoice'
import { LemonField } from 'lib/lemon-ui/LemonField'
import { BatchExportConfigurationForm } from './types'
@@ -493,6 +494,102 @@ export function BatchExportsEditFields({
</LemonField>
) : null}
</>
) : batchExportConfigForm.destination === 'Databricks' ? (
<>
<LemonField name="integration_id" label="Integration">
{({ value, onChange }) => (
<IntegrationChoice integration="databricks" value={value} onChange={onChange} />
)}
</LemonField>
<LemonField
name="http_path"
label="HTTP Path"
info={<>HTTP Path value for your all-purpose compute or SQL warehouse.</>}
>
<LemonInput placeholder="/sql/1.0/warehouses/my-warehouse" />
</LemonField>
<LemonField name="catalog" label="Catalog">
<LemonInput placeholder="workspace" />
</LemonField>
<LemonField name="schema" label="Schema">
<LemonInput placeholder="default" />
</LemonField>
<LemonField name="table_name" label="Table name">
<LemonInput placeholder="my-table" />
</LemonField>
{isNew ? (
<LemonField
name="table_partition_field"
label="Table partition field"
showOptional
info={
<>
The field to partition the table by. If left empty, the default partition by
field for the model will be used (if applicable). For more information, refer to
the{' '}
<Link
to="https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-partition"
target="_blank"
>
Databricks documentation
</Link>
.
<br />
<strong>
This setting cannot be changed after the batch export is created.
</strong>
</>
}
>
<LemonInput placeholder="my-partition-field" />
</LemonField>
) : null}
{isNew ? (
<LemonField name="use_variant_type">
{({ value, onChange }) => (
<LemonCheckbox
checked={!!value}
onChange={onChange}
bordered
label={
<span className="flex gap-2 items-center">
Use VARIANT type for storing JSON data
<Tooltip
title={
<>
Using VARIANT for storing JSON data is{' '}
<Link
to="https://docs.databricks.com/aws/en/semi-structured/variant"
target="_blank"
>
recommended by Databricks
</Link>{' '}
, however, the VARIANT data type is only available in
Databricks Runtime 15.3 and above. If left unchecked, JSON
data will be stored using the STRING type.
<br />
<strong>
This setting cannot be changed after the batch export is
created.
</strong>
</>
}
>
<IconInfo className="text-lg text-secondary" />
</Tooltip>
</span>
}
/>
)}
</LemonField>
) : null}
</>
) : batchExportConfigForm.destination === 'HTTP' ? (
<>
<LemonField name="url" label="PostHog region">

View File

@@ -5,6 +5,7 @@ import { BatchExportService } from '~/types'
import IconHTTP from 'public/hedgehog/running-hog.png'
import IconS3 from 'public/services/aws-s3.png'
import IconBigQuery from 'public/services/bigquery.png'
import IconDatabricks from 'public/services/databricks.png'
import IconPostgres from 'public/services/postgres.png'
import IconRedshift from 'public/services/redshift.png'
import IconSnowflake from 'public/services/snowflake.png'
@@ -20,6 +21,7 @@ export const BATCH_EXPORT_ICON_MAP: Record<BatchExportService['type'], string> =
S3: IconS3,
Snowflake: IconSnowflake,
HTTP: IconHTTP,
Databricks: IconDatabricks,
}
export function RenderBatchExportIcon({

View File

@@ -32,6 +32,8 @@ function getConfigurationFromBatchExportConfig(batchExportConfig: BatchExportCon
interval: batchExportConfig.interval,
model: batchExportConfig.model,
filters: batchExportConfig.filters,
integration_id:
batchExportConfig.destination.type === 'Databricks' ? batchExportConfig.destination.integration : undefined,
...batchExportConfig.destination.config,
}
}
@@ -49,6 +51,11 @@ export function getDefaultConfiguration(service: string): Record<string, any> {
file_format: 'Parquet',
compression: 'zstd',
}),
...(service === 'Databricks' && {
use_variant_type: true,
// prefill prefix for http path
http_path: '/sql/1.0/warehouses/',
}),
}
}
@@ -108,44 +115,59 @@ function getEventTable(service: BatchExportService['type']): DatabaseSchemaBatch
schema_valid: true,
},
}),
...(service != 'S3' && {
...(service == 'Databricks' && {
team_id: {
name: 'team_id',
hogql_value: service == 'Postgres' || service == 'Redshift' ? 'toInt32(team_id)' : 'team_id',
hogql_value: 'team_id',
type: 'integer',
schema_valid: true,
},
set: {
name: service == 'Snowflake' ? 'people_set' : 'set',
hogql_value: "nullIf(JSONExtractString(properties, '$set'), '')",
type: 'string',
schema_valid: true,
},
set_once: {
name: service == 'Snowflake' ? 'people_set_once' : 'set_once',
hogql_value: "nullIf(JSONExtractString(properties, '$set_once'), '')",
type: 'string',
schema_valid: true,
},
site_url: {
name: 'site_url',
hogql_value: "''",
type: 'string',
schema_valid: true,
},
ip: {
name: 'ip',
hogql_value: "nullIf(JSONExtractString(properties, '$ip'), '')",
type: 'string',
schema_valid: true,
},
elements_chain: {
name: 'elements',
hogql_value: 'toJSONString(elements_chain)',
type: 'string',
databricks_ingested_timestamp: {
name: 'databricks_ingested_timestamp',
hogql_value: 'NOW64()',
type: 'datetime',
schema_valid: true,
},
}),
...(service != 'S3' &&
service != 'Databricks' && {
team_id: {
name: 'team_id',
hogql_value: service == 'Postgres' || service == 'Redshift' ? 'toInt32(team_id)' : 'team_id',
type: 'integer',
schema_valid: true,
},
set: {
name: service == 'Snowflake' ? 'people_set' : 'set',
hogql_value: "nullIf(JSONExtractString(properties, '$set'), '')",
type: 'string',
schema_valid: true,
},
set_once: {
name: service == 'Snowflake' ? 'people_set_once' : 'set_once',
hogql_value: "nullIf(JSONExtractString(properties, '$set_once'), '')",
type: 'string',
schema_valid: true,
},
site_url: {
name: 'site_url',
hogql_value: "''",
type: 'string',
schema_valid: true,
},
ip: {
name: 'ip',
hogql_value: "nullIf(JSONExtractString(properties, '$ip'), '')",
type: 'string',
schema_valid: true,
},
elements_chain: {
name: 'elements',
hogql_value: 'toJSONString(elements_chain)',
type: 'string',
schema_valid: true,
},
}),
...(service == 'BigQuery' && {
bq_ingested_timestamp: {
name: 'bq_ingested_timestamp',
@@ -543,10 +565,12 @@ export const batchExportConfigurationLogic = kea<batchExportConfigurationLogicTy
model,
filters,
json_config_file,
integration_id,
...config
} = formdata
const destinationObj = {
type: destination,
integration: integration_id,
config: config,
}
const data: Omit<
@@ -795,6 +819,16 @@ export const batchExportConfigurationLogic = kea<batchExportConfigurationLogicTy
'schema',
'table_name',
]
} else if (service === 'Databricks') {
return [
...generalRequiredFields,
'integration_id',
'http_path',
'catalog',
'schema',
'table_name',
'use_variant_type',
]
}
return generalRequiredFields
},

View File

@@ -3,6 +3,7 @@ import { Dayjs } from 'lib/dayjs'
import {
BatchExportConfiguration,
BatchExportServiceBigQuery,
BatchExportServiceDatabricks,
BatchExportServiceHTTP,
BatchExportServicePostgres,
BatchExportServiceRedshift,
@@ -19,8 +20,9 @@ export type BatchExportConfigurationForm = Omit<
Partial<BatchExportServiceBigQuery['config']> &
Partial<BatchExportServiceS3['config']> &
Partial<BatchExportServiceSnowflake['config']> &
Partial<BatchExportServiceDatabricks['config']> &
Partial<BatchExportServiceHTTP['config']> & {
destination: 'S3' | 'Snowflake' | 'Postgres' | 'BigQuery' | 'Redshift' | 'HTTP'
destination: 'S3' | 'Snowflake' | 'Postgres' | 'BigQuery' | 'Redshift' | 'Databricks' | 'HTTP'
start_at: Dayjs | null
end_at: Dayjs | null
json_config_file?: File[] | null

View File

@@ -92,11 +92,16 @@ export const nonHogFunctionTemplatesLogic = kea<nonHogFunctionTemplatesLogicType
hogFunctionTemplatesBatchExports: [
(s) => [s.featureFlags, s.user],
(featureFlags, user): HogFunctionTemplateType[] => {
// HTTP is currently only used for Cloud to Cloud migrations and shouldn't be accessible to users
const httpEnabled =
featureFlags[FEATURE_FLAGS.BATCH_EXPORTS_POSTHOG_HTTP] || user?.is_impersonated || user?.is_staff
// HTTP is currently only used for Cloud to Cloud migrations and shouldn't be accessible to users
const services = BATCH_EXPORT_SERVICE_NAMES.filter((service) =>
httpEnabled ? true : service !== ('HTTP' as const)
// Databricks is currently behind a feature flag
const databricksEnabled = featureFlags[FEATURE_FLAGS.BATCH_EXPORTS_DATABRICKS]
const services = BATCH_EXPORT_SERVICE_NAMES.filter(
(service) =>
(httpEnabled ? true : service !== ('HTTP' as const)) &&
(databricksEnabled ? true : service !== ('Databricks' as const))
)
return services.map(
@@ -105,7 +110,7 @@ export const nonHogFunctionTemplatesLogic = kea<nonHogFunctionTemplatesLogicType
type: 'destination',
name: humanizeBatchExportName(service),
icon_url: BATCH_EXPORT_ICON_MAP[service],
status: 'stable',
status: service === 'Databricks' ? 'beta' : 'stable',
code: '',
code_language: 'hog',
inputs_schema: [],

View File

@@ -0,0 +1,51 @@
import { useActions, useValues } from 'kea'
import { Form } from 'kea-forms'
import { LemonButton, LemonInput, LemonModal } from '@posthog/lemon-ui'
import { LemonField } from 'lib/lemon-ui/LemonField'
import { IconDatabricks } from 'lib/lemon-ui/icons'
import { DatabricksSetupModalLogicProps, databricksSetupModalLogic } from './databricksSetupModalLogic'
export const DatabricksSetupModal = (props: DatabricksSetupModalLogicProps): JSX.Element => {
const { isDatabricksIntegrationSubmitting } = useValues(databricksSetupModalLogic(props))
const { submitDatabricksIntegration } = useActions(databricksSetupModalLogic(props))
return (
<LemonModal
isOpen={props.isOpen}
title={
<div className="flex items-center gap-2">
<IconDatabricks />
<span>Configure Databricks integration</span>
</div>
}
onClose={props.onComplete}
>
<Form logic={databricksSetupModalLogic} formKey="databricksIntegration">
<div className="gap-4 flex flex-col">
<LemonField name="serverHostname" label="Server Hostname">
<LemonInput type="text" placeholder="dbc-xxxxxxxxx-xxxx.cloud.databricks.com" />
</LemonField>
<LemonField name="clientId" label="Client ID">
<LemonInput type="text" placeholder="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" />
</LemonField>
<LemonField name="clientSecret" label="Client Secret">
<LemonInput type="password" />
</LemonField>
<div className="flex justify-end">
<LemonButton
type="primary"
htmlType="submit"
loading={isDatabricksIntegrationSubmitting}
onClick={submitDatabricksIntegration}
>
Connect
</LemonButton>
</div>
</div>
</Form>
</LemonModal>
)
}

View File

@@ -0,0 +1,63 @@
import { connect, kea, path, props } from 'kea'
import { forms } from 'kea-forms'
import api from 'lib/api'
import { integrationsLogic } from 'lib/integrations/integrationsLogic'
import { lemonToast } from 'lib/lemon-ui/LemonToast'
import { IntegrationType } from '~/types'
import type { databricksSetupModalLogicType } from './databricksSetupModalLogicType'
export interface DatabricksSetupModalLogicProps {
isOpen: boolean
integration?: IntegrationType | null
onComplete: (integrationId?: number) => void
}
export interface DatabricksFormType {
serverHostname: string
clientId: string
clientSecret: string
}
export const databricksSetupModalLogic = kea<databricksSetupModalLogicType>([
path(['integrations', 'databricks', 'databricksSetupModalLogic']),
props({} as DatabricksSetupModalLogicProps),
connect(() => ({
values: [integrationsLogic, ['integrations', 'integrationsLoading']],
actions: [integrationsLogic, ['loadIntegrations']],
})),
forms(({ props, actions, values }) => ({
databricksIntegration: {
defaults: {
serverHostname: '',
clientId: '',
clientSecret: '',
},
errors: ({ serverHostname, clientId, clientSecret }) => ({
serverHostname: serverHostname.trim() ? undefined : 'Server Hostname is required',
clientId: clientId.trim() ? undefined : 'Client ID is required',
clientSecret: clientSecret.trim() ? undefined : 'Client Secret is required',
}),
submit: async () => {
try {
const integration = await api.integrations.create({
kind: 'databricks',
config: {
server_hostname: values.databricksIntegration.serverHostname,
client_id: values.databricksIntegration.clientId,
client_secret: values.databricksIntegration.clientSecret,
},
})
actions.loadIntegrations()
lemonToast.success('Databricks integration created successfully!')
props.onComplete(integration.id)
} catch (error: any) {
lemonToast.error(error.detail || 'Failed to create Databricks integration')
throw error
}
},
},
})),
])

View File

@@ -4333,6 +4333,7 @@ export const INTEGRATION_KINDS = [
'meta-ads',
'clickup',
'reddit-ads',
'databricks',
] as const
export type IntegrationKind = (typeof INTEGRATION_KINDS)[number]
@@ -4966,8 +4967,23 @@ export type BatchExportServiceRedshift = {
}
}
export type BatchExportServiceDatabricks = {
type: 'Databricks'
integration: number
config: {
http_path: string
catalog: string
schema: string
table_name: string
use_variant_type: boolean
table_partition_field: string | null
exclude_events: string[]
include_events: string[]
}
}
// When adding a new option here also add a icon for it to
// src/scenes/pipeline/icons/
// frontend/public/services/
// and update RenderBatchExportIcon
export const BATCH_EXPORT_SERVICE_NAMES: BatchExportService['type'][] = [
'S3',
@@ -4976,6 +4992,7 @@ export const BATCH_EXPORT_SERVICE_NAMES: BatchExportService['type'][] = [
'BigQuery',
'Redshift',
'HTTP',
'Databricks',
]
export type BatchExportService =
| BatchExportServiceS3
@@ -4984,6 +5001,7 @@ export type BatchExportService =
| BatchExportServiceBigQuery
| BatchExportServiceRedshift
| BatchExportServiceHTTP
| BatchExportServiceDatabricks
export type PipelineInterval = 'hour' | 'day' | 'every 5 minutes'

View File

@@ -1662,6 +1662,7 @@ class IntegrationKind(StrEnum):
META_ADS = "meta-ads"
CLICKUP = "clickup"
REDDIT_ADS = "reddit-ads"
DATABRICKS = "databricks"
class IntervalType(StrEnum):