mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(cdp): Move to undici for legacy fetch (#32507)
This commit is contained in:
@@ -14,6 +14,12 @@ jest.mock('node-fetch', () => ({
|
||||
default: jest.fn(),
|
||||
}))
|
||||
|
||||
jest.mock('undici', () => ({
|
||||
__esModule: true,
|
||||
...jest.requireActual('undici'), // Only mock fetch(), leave Request, Response, FetchError, etc. alone
|
||||
fetch: jest.spyOn(jest.requireActual('undici'), 'fetch'),
|
||||
}))
|
||||
|
||||
beforeEach(() => {
|
||||
jest.spyOn(logger, 'info')
|
||||
jest.spyOn(logger, 'warn')
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
import { FetchError } from 'node-fetch'
|
||||
|
||||
import { SecureFetch } from './fetch'
|
||||
|
||||
// Restore the real fetch implementation for this test file
|
||||
jest.unmock('node-fetch')
|
||||
|
||||
const realDnsLookup = jest.requireActual('dns/promises').lookup
|
||||
jest.mock('dns/promises', () => ({
|
||||
lookup: jest.fn((hostname: string, options?: any) => {
|
||||
return realDnsLookup(hostname, options)
|
||||
}),
|
||||
}))
|
||||
|
||||
import dns from 'dns/promises'
|
||||
import { range } from 'lodash'
|
||||
|
||||
describe('secureFetch', () => {
|
||||
beforeEach(() => {
|
||||
jest.setTimeout(1000)
|
||||
jest.mocked(dns.lookup).mockImplementation(realDnsLookup)
|
||||
// NOTE: We are testing production-only features hence the override
|
||||
process.env.NODE_ENV = 'production'
|
||||
})
|
||||
|
||||
describe('trackedFetch', () => {
|
||||
// By default security features are only enabled in production but for tests we want to enable them
|
||||
const trackedFetch = new SecureFetch({
|
||||
allowUnsafe: false,
|
||||
})
|
||||
|
||||
it('should raise if the URL is unsafe', async () => {
|
||||
await expect(trackedFetch.fetch('http://localhost')).rejects.toMatchInlineSnapshot(
|
||||
`[FetchError: request to http://localhost/ failed, reason: Internal hostname]`
|
||||
)
|
||||
})
|
||||
|
||||
it('should raise if the URL is unknown', async () => {
|
||||
await expect(trackedFetch.fetch('http://unknown.domain.unknown')).rejects.toMatchInlineSnapshot(
|
||||
`[FetchError: request to http://unknown.domain.unknown/ failed, reason: Invalid hostname]`
|
||||
)
|
||||
})
|
||||
|
||||
it('should successfully fetch from safe URLs', async () => {
|
||||
// This will make a real HTTP request
|
||||
const response = await trackedFetch.fetch('https://example.com')
|
||||
expect(response.ok).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('IPv4 address validation', () => {
|
||||
const trackedFetch = new SecureFetch({
|
||||
allowUnsafe: false,
|
||||
})
|
||||
|
||||
beforeEach(() => {
|
||||
jest.mocked(dns.lookup).mockClear()
|
||||
})
|
||||
|
||||
it.each([
|
||||
['0.0.0.0', 'This network'],
|
||||
['0.1.2.3', 'This network'],
|
||||
['127.0.0.1', 'Loopback'],
|
||||
['127.1.2.3', 'Loopback'],
|
||||
['169.254.0.1', 'Link-local'],
|
||||
['169.254.1.2', 'Link-local'],
|
||||
['255.255.255.255', 'Broadcast'],
|
||||
['224.0.0.1', 'Non-unicast (multicast)'],
|
||||
['192.168.1.1', 'Private network'],
|
||||
['10.0.0.1', 'Private network'],
|
||||
['172.16.0.1', 'Private network'],
|
||||
])('should block requests to %s (%s)', async (ip) => {
|
||||
jest.mocked(dns.lookup).mockResolvedValue([{ address: ip, family: 4 }] as any)
|
||||
|
||||
await expect(trackedFetch.fetch(`http://example.com`)).rejects.toThrow(
|
||||
new FetchError(`request to http://example.com/ failed, reason: Internal hostname`, 'posthog-host-guard')
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
// NOTE: Skipped as this is mostly to validate against the new request implementation
|
||||
describe.skip('parallel requests execution', () => {
|
||||
jest.retryTimes(3)
|
||||
const trackedFetch = new SecureFetch({
|
||||
allowUnsafe: true,
|
||||
})
|
||||
it('should execute requests in parallel', async () => {
|
||||
const start = performance.now()
|
||||
const timings: number[] = []
|
||||
const parallelRequests = 100
|
||||
|
||||
const requests = range(parallelRequests).map(() =>
|
||||
trackedFetch.fetch('https://example.com').then(() => {
|
||||
timings.push(performance.now() - start)
|
||||
})
|
||||
)
|
||||
|
||||
await Promise.all(requests)
|
||||
|
||||
expect(timings).toHaveLength(parallelRequests)
|
||||
|
||||
// NOTE: Not the easiest thing to test - what we are testing is that the requests are executed in parallel
|
||||
// so the total time should be close to the time it takes to execute one request.
|
||||
// It's far from perfect but it at the very least caches
|
||||
const totalTime = performance.now() - start
|
||||
const firstTime = timings[0]
|
||||
|
||||
expect(totalTime).toBeGreaterThan(firstTime - 100)
|
||||
expect(totalTime).toBeLessThan(firstTime + 100)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,72 +0,0 @@
|
||||
import http from 'http'
|
||||
import https from 'https'
|
||||
import fetch, { type RequestInfo, type RequestInit, type Response, FetchError, Request } from 'node-fetch'
|
||||
import { URL } from 'url'
|
||||
|
||||
import { defaultConfig } from '../config/config'
|
||||
import { runInstrumentedFunction } from '../main/utils'
|
||||
import { isProdEnv } from './env-utils'
|
||||
import { httpStaticLookup } from './request'
|
||||
|
||||
export type { Response }
|
||||
|
||||
function validateUrl(url: string): URL {
|
||||
// Raise if the provided URL seems unsafe, otherwise do nothing.
|
||||
let parsedUrl: URL
|
||||
try {
|
||||
parsedUrl = new URL(url)
|
||||
} catch (err) {
|
||||
throw new FetchError('Invalid URL', 'posthog-host-guard')
|
||||
}
|
||||
if (!parsedUrl.hostname) {
|
||||
throw new FetchError('No hostname', 'posthog-host-guard')
|
||||
}
|
||||
if (parsedUrl.protocol !== 'http:' && parsedUrl.protocol !== 'https:') {
|
||||
throw new FetchError('Scheme must be either HTTP or HTTPS', 'posthog-host-guard')
|
||||
}
|
||||
return parsedUrl
|
||||
}
|
||||
|
||||
const COMMON_AGENT_OPTIONS: http.AgentOptions = { keepAlive: false }
|
||||
|
||||
const getSafeAgent = (url: URL) => {
|
||||
return url.protocol === 'http:'
|
||||
? new http.Agent({ ...COMMON_AGENT_OPTIONS, lookup: httpStaticLookup })
|
||||
: new https.Agent({ ...COMMON_AGENT_OPTIONS, lookup: httpStaticLookup })
|
||||
}
|
||||
|
||||
// @deprecated Use the fetch function from request.ts instead
|
||||
export class SecureFetch {
|
||||
constructor(private options?: { allowUnsafe?: boolean }) {}
|
||||
|
||||
fetch(url: RequestInfo, init: RequestInit = {}): Promise<Response> {
|
||||
return runInstrumentedFunction({
|
||||
statsKey: 'secureFetch',
|
||||
func: async () => {
|
||||
init.timeout = init.timeout ?? defaultConfig.EXTERNAL_REQUEST_TIMEOUT_MS
|
||||
const request = new Request(url, init)
|
||||
|
||||
const allowUnsafe =
|
||||
this.options?.allowUnsafe ?? (process.env.NODE_ENV?.includes('functional-tests') || !isProdEnv())
|
||||
|
||||
if (allowUnsafe) {
|
||||
// NOTE: Agent is false to disable keep alive, and increase parallelization
|
||||
return await fetch(url, { ...init, agent: false })
|
||||
}
|
||||
|
||||
validateUrl(request.url)
|
||||
return await fetch(url, {
|
||||
...init,
|
||||
agent: getSafeAgent,
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const defaultSecureFetch = new SecureFetch()
|
||||
|
||||
// @deprecated Use the fetch function from request.ts instead
|
||||
export const trackedFetch = (url: RequestInfo, init?: RequestInit): Promise<Response> => {
|
||||
return defaultSecureFetch.fetch(url, init)
|
||||
}
|
||||
@@ -8,7 +8,7 @@ jest.mock('dns/promises', () => ({
|
||||
import dns from 'dns/promises'
|
||||
import { range } from 'lodash'
|
||||
|
||||
import { fetch, raiseIfUserProvidedUrlUnsafe, SecureRequestError } from './request'
|
||||
import { fetch, legacyFetch, raiseIfUserProvidedUrlUnsafe, SecureRequestError } from './request'
|
||||
|
||||
describe('fetch', () => {
|
||||
beforeEach(() => {
|
||||
@@ -118,3 +118,90 @@ describe('fetch', () => {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('legacyFetch', () => {
|
||||
beforeEach(() => {
|
||||
jest.setTimeout(1000)
|
||||
jest.mocked(dns.lookup).mockImplementation(realDnsLookup)
|
||||
// NOTE: We are testing production-only features hence the override
|
||||
process.env.NODE_ENV = 'production'
|
||||
})
|
||||
|
||||
describe('calls', () => {
|
||||
// By default security features are only enabled in production but for tests we want to enable them
|
||||
it('should raise if the URL is unsafe', async () => {
|
||||
await expect(legacyFetch('http://localhost')).rejects.toMatchInlineSnapshot(`[TypeError: fetch failed]`)
|
||||
})
|
||||
|
||||
it('should raise if the URL is unknown', async () => {
|
||||
await expect(legacyFetch('http://unknown.domain.unknown')).rejects.toMatchInlineSnapshot(
|
||||
`[TypeError: fetch failed]`
|
||||
)
|
||||
})
|
||||
|
||||
it('should successfully fetch from safe URLs', async () => {
|
||||
// This will make a real HTTP request
|
||||
const response = await legacyFetch('https://example.com')
|
||||
expect(response.ok).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('IPv4 address validation', () => {
|
||||
beforeEach(() => {
|
||||
jest.mocked(dns.lookup).mockClear()
|
||||
})
|
||||
|
||||
it.each([
|
||||
['0.0.0.0', 'This network'],
|
||||
['0.1.2.3', 'This network'],
|
||||
['127.0.0.1', 'Loopback'],
|
||||
['127.1.2.3', 'Loopback'],
|
||||
['169.254.0.1', 'Link-local'],
|
||||
['169.254.1.2', 'Link-local'],
|
||||
['255.255.255.255', 'Broadcast'],
|
||||
['224.0.0.1', 'Non-unicast (multicast)'],
|
||||
['192.168.1.1', 'Private network'],
|
||||
['10.0.0.1', 'Private network'],
|
||||
['172.16.0.1', 'Private network'],
|
||||
])('should block requests to %s (%s)', async (ip) => {
|
||||
jest.mocked(dns.lookup).mockResolvedValue([{ address: ip, family: 4 }] as any)
|
||||
|
||||
const err = await legacyFetch(`http://example.com`).catch((err) => {
|
||||
return err
|
||||
})
|
||||
|
||||
expect(err.name).toBe('TypeError')
|
||||
expect(err.toString()).toContain('fetch failed')
|
||||
expect(err.cause.toString()).toContain('Internal hostname')
|
||||
})
|
||||
})
|
||||
|
||||
// NOTE: Skipped as this is mostly to validate against the new request implementation
|
||||
describe.skip('parallel requests execution', () => {
|
||||
jest.retryTimes(3)
|
||||
it('should execute requests in parallel', async () => {
|
||||
const start = performance.now()
|
||||
const timings: number[] = []
|
||||
const parallelRequests = 100
|
||||
|
||||
const requests = range(parallelRequests).map(() =>
|
||||
legacyFetch('https://example.com').then(() => {
|
||||
timings.push(performance.now() - start)
|
||||
})
|
||||
)
|
||||
|
||||
await Promise.all(requests)
|
||||
|
||||
expect(timings).toHaveLength(parallelRequests)
|
||||
|
||||
// NOTE: Not the easiest thing to test - what we are testing is that the requests are executed in parallel
|
||||
// so the total time should be close to the time it takes to execute one request.
|
||||
// It's far from perfect but it at the very least caches
|
||||
const totalTime = performance.now() - start
|
||||
const firstTime = timings[0]
|
||||
|
||||
expect(totalTime).toBeGreaterThan(firstTime - 100)
|
||||
expect(totalTime).toBeLessThan(firstTime + 100)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -3,7 +3,17 @@ import dns from 'dns/promises'
|
||||
import * as ipaddr from 'ipaddr.js'
|
||||
import net from 'node:net'
|
||||
import { Counter } from 'prom-client'
|
||||
import { type HeadersInit, Agent, errors, request } from 'undici'
|
||||
import {
|
||||
type HeadersInit,
|
||||
Agent,
|
||||
errors,
|
||||
fetch as undiciFetch,
|
||||
request,
|
||||
RequestInfo,
|
||||
RequestInit,
|
||||
Response,
|
||||
} from 'undici'
|
||||
export { Response } from 'undici'
|
||||
import { URL } from 'url'
|
||||
|
||||
import { defaultConfig } from '../config/config'
|
||||
@@ -194,3 +204,23 @@ export async function fetch(url: string, options: FetchOptions = {}): Promise<Fe
|
||||
text: async () => await result.body.text(),
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy fetch implementation that exposes the entire fetch implementation
|
||||
export function legacyFetch(input: RequestInfo, options?: RequestInit): Promise<Response> {
|
||||
let parsed: URL
|
||||
try {
|
||||
parsed = typeof input === 'string' ? new URL(input) : input instanceof URL ? input : new URL(input.url)
|
||||
} catch {
|
||||
throw new Error('Invalid URL')
|
||||
}
|
||||
|
||||
if (!parsed.hostname || !(parsed.protocol === 'http:' || parsed.protocol === 'https:')) {
|
||||
throw new Error('URL must have HTTP or HTTPS protocol and a valid hostname')
|
||||
}
|
||||
|
||||
const requestOptions = options ?? {}
|
||||
requestOptions.dispatcher = sharedSecureAgent
|
||||
requestOptions.signal = AbortSignal.timeout(defaultConfig.EXTERNAL_REQUEST_TIMEOUT_MS)
|
||||
|
||||
return undiciFetch(parsed.toString(), requestOptions)
|
||||
}
|
||||
|
||||
@@ -4,9 +4,9 @@ import { RustyHook } from 'worker/rusty-hook'
|
||||
import { Action, Hook, HookPayload, PostIngestionEvent, Team } from '../../types'
|
||||
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
|
||||
import { convertToHookPayload } from '../../utils/event'
|
||||
import { trackedFetch } from '../../utils/fetch'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { captureException } from '../../utils/posthog'
|
||||
import { legacyFetch } from '../../utils/request'
|
||||
import { TeamManager } from '../../utils/team-manager'
|
||||
import { AppMetric, AppMetrics } from './app-metrics'
|
||||
import { WebhookFormatter } from './webhook-formatter'
|
||||
@@ -187,11 +187,10 @@ export class HookCommander {
|
||||
|
||||
try {
|
||||
await instrumentWebhookStep('fetch', async () => {
|
||||
const request = await trackedFetch(url, {
|
||||
const request = await legacyFetch(url, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(body, null, 4),
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
timeout: this.EXTERNAL_REQUEST_TIMEOUT,
|
||||
})
|
||||
// special handling for hooks
|
||||
if (hook && request.status === 410) {
|
||||
|
||||
@@ -7,8 +7,8 @@ import {
|
||||
convertToPostHogEvent,
|
||||
mutatePostIngestionEventWithElementsList,
|
||||
} from '../../utils/event'
|
||||
import { trackedFetch } from '../../utils/fetch'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { legacyFetch } from '../../utils/request'
|
||||
import { IllegalOperationError } from '../../utils/utils'
|
||||
import { WebhookFormatter } from '../ingestion/webhook-formatter'
|
||||
import { pluginActionMsSummary } from '../metrics'
|
||||
@@ -175,11 +175,10 @@ async function runSingleTeamPluginComposeWebhook(
|
||||
const timer = new Date()
|
||||
|
||||
try {
|
||||
const request = await trackedFetch(webhook.url, {
|
||||
const request = await legacyFetch(webhook.url, {
|
||||
method: webhook.method || 'POST',
|
||||
body: webhook.body,
|
||||
headers: webhook.headers || { 'Content-Type': 'application/json' },
|
||||
timeout: hub.EXTERNAL_REQUEST_TIMEOUT_MS,
|
||||
})
|
||||
if (request.ok) {
|
||||
pluginActionMsSummary
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import fetch, { Headers, Response } from 'node-fetch'
|
||||
|
||||
import { Hub, PluginConfig } from '../../../types'
|
||||
import { legacyFetch, Response } from '../../../utils/request'
|
||||
|
||||
const DEFAULT_API_HOST = 'https://app.posthog.com'
|
||||
|
||||
@@ -75,17 +74,13 @@ export function createApi(server: Hub, pluginConfig: PluginConfig): ApiExtension
|
||||
Authorization: `Bearer ${apiKey}`,
|
||||
...(method === ApiMethod.Post || method === ApiMethod.Patch ? { 'Content-Type': 'application/json' } : {}),
|
||||
...options.headers,
|
||||
}
|
||||
} as any
|
||||
|
||||
if (method === ApiMethod.Delete || method === ApiMethod.Get) {
|
||||
return await fetch(url, { headers, method })
|
||||
return await legacyFetch(url, { headers, method })
|
||||
}
|
||||
|
||||
return await fetch(url, {
|
||||
headers,
|
||||
method,
|
||||
body: JSON.stringify(options.data || {}),
|
||||
})
|
||||
return await legacyFetch(url, { headers, method, body: JSON.stringify(options.data || {}) })
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -8,7 +8,7 @@ import { PassThrough } from 'stream'
|
||||
import * as url from 'url'
|
||||
|
||||
import { isTestEnv } from '../../utils/env-utils'
|
||||
import { trackedFetch } from '../../utils/fetch'
|
||||
import { legacyFetch } from '../../utils/request'
|
||||
import { writeToFile } from './extensions/test-utils'
|
||||
|
||||
export const AVAILABLE_IMPORTS = {
|
||||
@@ -22,7 +22,7 @@ export const AVAILABLE_IMPORTS = {
|
||||
'@posthog/plugin-scaffold': scaffold,
|
||||
'aws-sdk': AWS,
|
||||
'generic-pool': genericPool,
|
||||
'node-fetch': trackedFetch,
|
||||
'node-fetch': legacyFetch,
|
||||
crypto: crypto,
|
||||
stream: { PassThrough },
|
||||
url: url,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { PluginEvent } from '@posthog/plugin-scaffold'
|
||||
import { DateTime } from 'luxon'
|
||||
import fetch from 'node-fetch'
|
||||
import { fetch } from 'undici'
|
||||
|
||||
import { MeasuringPersonsStoreForDistinctIdBatch } from '~/src/worker/ingestion/persons/measuring-person-store'
|
||||
|
||||
@@ -164,12 +164,13 @@ describe('Event Pipeline integration test', () => {
|
||||
text: '[Test Action](https://example.com/project/2/action/69) was triggered by [abc](https://example.com/project/2/person/abc)',
|
||||
}
|
||||
|
||||
expect(fetch).toHaveBeenCalledWith('https://webhook.example.com/', {
|
||||
agent: false,
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
const details = JSON.parse(JSON.stringify((fetch as any).mock.calls))
|
||||
expect(details[0][0]).toEqual('https://webhook.example.com/')
|
||||
expect(details[0][1]).toMatchObject({
|
||||
body: JSON.stringify(expectedPayload, undefined, 4),
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
method: 'POST',
|
||||
timeout: 10000,
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { DateTime } from 'luxon'
|
||||
import fetch from 'node-fetch'
|
||||
import { fetch } from 'undici'
|
||||
|
||||
import { Action, ISOTimestamp, PostIngestionEvent, Team } from '../../../src/types'
|
||||
import { AppMetrics } from '../../../src/worker/ingestion/app-metrics'
|
||||
@@ -46,13 +46,9 @@ describe('hooks', () => {
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
|
||||
// @ts-expect-error mock exists because we mock it ourselves
|
||||
expect(fetch.mock.calls[0]).toMatchInlineSnapshot(`
|
||||
[
|
||||
"https://example.com/",
|
||||
{
|
||||
"agent": false,
|
||||
"body": "{
|
||||
expect(jest.mocked(fetch).mock.calls[0][0]).toMatchInlineSnapshot(`"https://example.com/"`)
|
||||
expect(jest.mocked(fetch).mock.calls[0][1]?.body).toMatchInlineSnapshot(`
|
||||
"{
|
||||
"hook": {
|
||||
"id": "id",
|
||||
"event": "foo",
|
||||
@@ -64,14 +60,7 @@ describe('hooks', () => {
|
||||
"elementsList": [],
|
||||
"person": {}
|
||||
}
|
||||
}",
|
||||
"headers": {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
"method": "POST",
|
||||
"timeout": 20000,
|
||||
},
|
||||
]
|
||||
}"
|
||||
`)
|
||||
})
|
||||
|
||||
@@ -97,13 +86,9 @@ describe('hooks', () => {
|
||||
)
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
|
||||
// @ts-expect-error mock exists because we mock it ourselves
|
||||
expect(fetch.mock.calls[0]).toMatchInlineSnapshot(`
|
||||
[
|
||||
"https://example.com/",
|
||||
{
|
||||
"agent": false,
|
||||
"body": "{
|
||||
expect(jest.mocked(fetch).mock.calls[0][0]).toMatchInlineSnapshot(`"https://example.com/"`)
|
||||
expect(jest.mocked(fetch).mock.calls[0][1]?.body).toMatchInlineSnapshot(`
|
||||
"{
|
||||
"hook": {
|
||||
"id": "id",
|
||||
"event": "foo",
|
||||
@@ -125,33 +110,23 @@ describe('hooks', () => {
|
||||
"created_at": "2024-01-01T00:00:00.000Z"
|
||||
}
|
||||
}
|
||||
}",
|
||||
"headers": {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
"method": "POST",
|
||||
"timeout": 20000,
|
||||
},
|
||||
]
|
||||
}"
|
||||
`)
|
||||
})
|
||||
|
||||
test('private IP hook forbidden in prod', async () => {
|
||||
process.env.NODE_ENV = 'production'
|
||||
|
||||
// Unmock the node-fetch module
|
||||
|
||||
const realFetch = jest.requireActual('node-fetch')
|
||||
jest.mocked(fetch).mockImplementation(realFetch.default)
|
||||
|
||||
await expect(
|
||||
hookCommander.postWebhook({ event: 'foo', properties: {} } as PostIngestionEvent, action, team, {
|
||||
const err = await hookCommander
|
||||
.postWebhook({ event: 'foo', properties: {} } as PostIngestionEvent, action, team, {
|
||||
...hook,
|
||||
target: 'http://localhost:8000',
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(
|
||||
`"request to http://localhost:8000/ failed, reason: Internal hostname"`
|
||||
)
|
||||
.catch((err) => err)
|
||||
|
||||
expect(err.name).toBe('TypeError')
|
||||
expect(err.cause.name).toBe('SecureRequestError')
|
||||
expect(err.cause.message).toContain('Internal hostname')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import fetch from 'node-fetch'
|
||||
import { fetch } from 'undici'
|
||||
|
||||
import { Hub } from '../../src/types'
|
||||
import { closeHub, createHub } from '../../src/utils/db/hub'
|
||||
@@ -43,7 +43,7 @@ describe('VMs are extra lazy 💤', () => {
|
||||
await lazyVm.getPluginMethod('onEvent')
|
||||
expect(lazyVm.ready).toEqual(true)
|
||||
expect(lazyVm.setupPluginIfNeeded).toHaveBeenCalled()
|
||||
expect(fetch).toHaveBeenCalledWith('https://onevent.com/', { agent: false, timeout: 10000 })
|
||||
expect(fetch).toHaveBeenCalledWith('https://onevent.com/', expect.anything())
|
||||
})
|
||||
|
||||
test('getting methods and tasks returns null if plugin is in errored state', async () => {
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
import { mockProducerObserver } from '../helpers/mocks/producer.mock'
|
||||
|
||||
import { PluginEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
|
||||
import fetch from 'node-fetch'
|
||||
import { fetch } from 'undici'
|
||||
|
||||
import { KAFKA_PLUGIN_LOG_ENTRIES } from '../../src/config/kafka-topics'
|
||||
import { Hub, PluginLogEntrySource, PluginLogEntryType } from '../../src/types'
|
||||
@@ -46,10 +46,26 @@ describe('vm tests', () => {
|
||||
|
||||
beforeEach(async () => {
|
||||
hub = await createHub()
|
||||
|
||||
jest.mocked(fetch).mockImplementation((...args) => {
|
||||
const responsesToUrls: Record<string, any> = {
|
||||
'https://google.com/results.json?query=fetched': { count: 2, query: 'bla', results: [true, true] },
|
||||
'https://app.posthog.com/api/event?token=THIS+IS+NOT+A+TOKEN+FOR+TEAM+2': { hello: 'world' },
|
||||
'https://onevent.com/': { success: true },
|
||||
'https://www.example.com': { example: 'data' },
|
||||
}
|
||||
|
||||
const response = responsesToUrls[args[0] as unknown as string] || { fetch: 'mock' }
|
||||
|
||||
return Promise.resolve({
|
||||
json: jest.fn().mockResolvedValue(response),
|
||||
} as any)
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await closeHub(hub)
|
||||
jest.mocked(fetch).mockClear()
|
||||
})
|
||||
|
||||
test('empty plugins', async () => {
|
||||
@@ -122,10 +138,7 @@ describe('vm tests', () => {
|
||||
})
|
||||
expect(fetch).not.toHaveBeenCalled()
|
||||
await vm.methods.teardownPlugin!()
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=hoho', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=hoho', expect.anything())
|
||||
})
|
||||
|
||||
test('processEvent', async () => {
|
||||
@@ -379,10 +392,7 @@ describe('vm tests', () => {
|
||||
event: 'export',
|
||||
}
|
||||
await vm.methods.onEvent!(event)
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=export', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=export', expect.anything())
|
||||
})
|
||||
|
||||
test('export default', async () => {
|
||||
@@ -401,10 +411,10 @@ describe('vm tests', () => {
|
||||
event: 'default export',
|
||||
}
|
||||
await vm.methods.onEvent!(event)
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=default export', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith(
|
||||
'https://google.com/results.json?query=default%20export',
|
||||
expect.anything()
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -734,10 +744,7 @@ describe('vm tests', () => {
|
||||
}
|
||||
|
||||
await vm.methods.processEvent!(event)
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=fetched', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=fetched', expect.anything())
|
||||
|
||||
expect(event.properties).toEqual({ count: 2, query: 'bla', results: [true, true] })
|
||||
})
|
||||
@@ -759,10 +766,7 @@ describe('vm tests', () => {
|
||||
}
|
||||
|
||||
await vm.methods.processEvent!(event)
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=fetched', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=fetched', expect.anything())
|
||||
|
||||
expect(event.properties).toEqual({ count: 2, query: 'bla', results: [true, true] })
|
||||
})
|
||||
@@ -783,10 +787,7 @@ describe('vm tests', () => {
|
||||
}
|
||||
|
||||
await vm.methods.processEvent!(event)
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=fetched', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=fetched', expect.anything())
|
||||
|
||||
expect(event.properties).toEqual({ count: 2, query: 'bla', results: [true, true] })
|
||||
})
|
||||
@@ -823,7 +824,10 @@ describe('vm tests', () => {
|
||||
|
||||
expect(event.properties?.get).toEqual({ hello: 'world' })
|
||||
expect((fetch as any).mock.calls.length).toEqual(8)
|
||||
expect((fetch as any).mock.calls).toEqual([
|
||||
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
const details = JSON.parse(JSON.stringify((fetch as any).mock.calls))
|
||||
expect(details).toMatchObject([
|
||||
[
|
||||
'https://app.posthog.com/api/event?token=THIS+IS+NOT+A+TOKEN+FOR+TEAM+2',
|
||||
{
|
||||
@@ -939,10 +943,7 @@ describe('vm tests', () => {
|
||||
event: 'onEvent',
|
||||
}
|
||||
await vm.methods.onEvent!(event)
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=onEvent', {
|
||||
agent: false,
|
||||
timeout: 10000,
|
||||
})
|
||||
expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=onEvent', expect.anything())
|
||||
})
|
||||
|
||||
test('imports', async () => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Hub, PluginConfig, PluginConfigVMResponse } from '../../src/types'
|
||||
import { closeHub, createHub } from '../../src/utils/db/hub'
|
||||
import { createPluginConfigVM, TimeoutError } from '../../src/worker/vm/vm'
|
||||
import { createPluginConfigVM } from '../../src/worker/vm/vm'
|
||||
import { pluginConfig39 } from '../helpers/plugins'
|
||||
import { resetTestDatabase } from '../helpers/sql'
|
||||
|
||||
@@ -221,10 +221,6 @@ describe('vm timeout tests', () => {
|
||||
test('small promises', async () => {
|
||||
const indexJs = `
|
||||
async function processEvent (event, meta) {
|
||||
const data = await fetch('https://www.example.com').then(response => response.json()).then(data => {
|
||||
return data
|
||||
})
|
||||
|
||||
await new Promise(resolve => __jestSetTimeout(() => resolve(), 800))
|
||||
await new Promise(resolve => __jestSetTimeout(() => resolve(), 800))
|
||||
await new Promise(resolve => __jestSetTimeout(() => resolve(), 800))
|
||||
@@ -242,7 +238,6 @@ describe('vm timeout tests', () => {
|
||||
try {
|
||||
await vm.methods.processEvent!({ ...defaultEvent })
|
||||
} catch (e) {
|
||||
expect(e).toBeInstanceOf(TimeoutError)
|
||||
errorMessage = e.message
|
||||
caller = e.caller
|
||||
}
|
||||
@@ -259,9 +254,6 @@ describe('vm timeout tests', () => {
|
||||
// const __asyncGuard = false
|
||||
async function processEvent (event, meta) {
|
||||
const __asyncGuard = (a) => a
|
||||
const data = await fetch('https://www.example.com').then(response => response.json()).then(data => {
|
||||
return data
|
||||
})
|
||||
|
||||
await new Promise(resolve => __jestSetTimeout(() => resolve(), 800))
|
||||
await new Promise(resolve => __jestSetTimeout(() => resolve(), 800))
|
||||
|
||||
Reference in New Issue
Block a user