mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(logs): switch from gRPC to http for the log capture service (#40216)
This commit is contained in:
@@ -93,7 +93,6 @@ GROUP BY
|
||||
CREATE OR REPLACE TABLE kafka_logs_avro
|
||||
(
|
||||
`uuid` String,
|
||||
`team_id` Int32,
|
||||
`trace_id` String,
|
||||
`span_id` String,
|
||||
`trace_flags` Int32,
|
||||
@@ -115,7 +114,7 @@ CREATE OR REPLACE TABLE kafka_logs_avro
|
||||
`attribute_keys` Array(Nullable(String)),
|
||||
`attribute_values` Array(Nullable(String))
|
||||
)
|
||||
ENGINE = Kafka('kafka:9092', 'logs_avro', 'clickhouse-logs-avro', 'Avro')
|
||||
ENGINE = Kafka('kafka:9092', 'clickhouse_logs', 'clickhouse-logs-avro', 'Avro')
|
||||
SETTINGS
|
||||
kafka_skip_broken_messages = 100,
|
||||
kafka_security_protocol = 'PLAINTEXT',
|
||||
@@ -153,7 +152,8 @@ CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs16
|
||||
`attribute_values` Array(Nullable(String))
|
||||
)
|
||||
AS SELECT
|
||||
*
|
||||
*,
|
||||
toInt32OrZero(_headers.value[indexOf(_headers.name, 'team_id')]) as team_id
|
||||
FROM kafka_logs_avro settings materialize_skip_indexes_on_insert = 1, distributed_background_insert_sleep_time_ms=5000, distributed_background_insert_batch=true;
|
||||
|
||||
select 'clickhouse logs tables initialised successfully!';
|
||||
|
||||
@@ -255,12 +255,12 @@ services:
|
||||
restart: on-failure
|
||||
environment:
|
||||
BIND_HOST: '0.0.0.0'
|
||||
BIND_PORT: '3308'
|
||||
BIND_PORT: '4318'
|
||||
RUST_LOG: info,rdkafka=warn
|
||||
RUST_BACKTRACE: '1'
|
||||
KAFKA_HOSTS: kafka:9092
|
||||
JWT_SECRET: '<randomly generated secret key>'
|
||||
KAFKA_TOPIC: logs_avro
|
||||
KAFKA_TOPIC: logs_ingestion
|
||||
networks:
|
||||
- otel_network
|
||||
- default
|
||||
|
||||
@@ -27,15 +27,14 @@ exporters:
|
||||
endpoint: 'jaeger-local:4317' # Sending OTLP gRPC to Jaeger
|
||||
tls:
|
||||
insecure: true # For local communication to Jaeger
|
||||
otlp/logs:
|
||||
endpoint: 'log-capture:3308'
|
||||
otlphttp/logs:
|
||||
endpoint: 'http://log-capture:4318'
|
||||
compression: none
|
||||
tls:
|
||||
insecure: true
|
||||
headers:
|
||||
# hardcoded JWT signed by the dev secret key ('<randomly generated secret key>') to ship logs to team 1
|
||||
# (TODO: use normal capture tokens instead of these jwts)
|
||||
authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZWFtX2lkIjoiMSIsImV4cCI6OTIyMzM3MjAzNjg1NDc3Nn0.M2CClSDIm9T6aDUjffL1BjI2lHEsXYLkt_pShIgItrU
|
||||
# special "local" token is automatically mapped to team_id 1 in dev environments
|
||||
authorization: Bearer phc_local
|
||||
|
||||
extensions: # Declaring the extensions
|
||||
health_check: # Default configuration is usually fine
|
||||
@@ -52,7 +51,7 @@ service:
|
||||
exporters: [otlp]
|
||||
logs:
|
||||
exporters:
|
||||
- otlp/logs
|
||||
- otlphttp/logs
|
||||
processors:
|
||||
- batch
|
||||
receivers:
|
||||
|
||||
@@ -4,7 +4,6 @@ exports[`LogsIngestionConsumer general should process a valid log message 1`] =
|
||||
[
|
||||
{
|
||||
"headers": {
|
||||
"distinct_id": "user-1",
|
||||
"team_id": "2",
|
||||
"token": "THIS IS NOT A TOKEN FOR TEAM 2",
|
||||
},
|
||||
@@ -24,7 +23,6 @@ exports[`LogsIngestionConsumer general should process multiple log messages 1`]
|
||||
[
|
||||
{
|
||||
"headers": {
|
||||
"distinct_id": "user-1",
|
||||
"team_id": "2",
|
||||
"token": "THIS IS NOT A TOKEN FOR TEAM 2",
|
||||
},
|
||||
@@ -39,7 +37,6 @@ exports[`LogsIngestionConsumer general should process multiple log messages 1`]
|
||||
},
|
||||
{
|
||||
"headers": {
|
||||
"distinct_id": "user-1",
|
||||
"team_id": "2",
|
||||
"token": "THIS IS NOT A TOKEN FOR TEAM 2",
|
||||
},
|
||||
@@ -54,7 +51,6 @@ exports[`LogsIngestionConsumer general should process multiple log messages 1`]
|
||||
},
|
||||
{
|
||||
"headers": {
|
||||
"distinct_id": "user-1",
|
||||
"team_id": "2",
|
||||
"token": "THIS IS NOT A TOKEN FOR TEAM 2",
|
||||
},
|
||||
|
||||
@@ -128,7 +128,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -144,7 +143,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
]
|
||||
const messages = createKafkaMessages(logData, {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -159,7 +157,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
it('should drop messages with missing token', async () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
distinct_id: 'user-1',
|
||||
// missing token
|
||||
})
|
||||
|
||||
@@ -168,23 +165,10 @@ describe('LogsIngestionConsumer', () => {
|
||||
expect(mockProducerObserver.getProducedKafkaMessages()).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('should drop messages with missing distinct_id', async () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
// missing distinct_id
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
|
||||
expect(mockProducerObserver.getProducedKafkaMessages()).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('should drop messages with invalid token', async () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: 'invalid-token',
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -202,7 +186,7 @@ describe('LogsIngestionConsumer', () => {
|
||||
offset: offsetIncrementer++,
|
||||
timestamp: DateTime.now().toMillis(),
|
||||
partition: 1,
|
||||
headers: [{ token: Buffer.from('missing') }, { distinct_id: Buffer.from('user-1') }],
|
||||
headers: [{ token: Buffer.from('missing') }],
|
||||
} as Message,
|
||||
]
|
||||
|
||||
@@ -226,7 +210,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -237,7 +220,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
expect(producedMessages[0].headers).toEqual({
|
||||
token: team.api_token,
|
||||
team_id: team.id.toString(),
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
})
|
||||
|
||||
@@ -245,7 +227,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
const logData = createLogMessage({ level: 'error', message: 'Critical error' })
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-123',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -260,7 +241,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
expect(message.messages[0].headers).toEqual({
|
||||
token: team.api_token,
|
||||
team_id: team.id.toString(),
|
||||
distinct_id: 'user-123',
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -286,7 +266,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
// Mock producer to throw an error
|
||||
@@ -305,7 +284,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
const logData = createLogMessage()
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -322,11 +300,9 @@ describe('LogsIngestionConsumer', () => {
|
||||
const messages = [
|
||||
...createKafkaMessages([logData1], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
}),
|
||||
...createKafkaMessages([logData2], {
|
||||
token: team2.api_token,
|
||||
distinct_id: 'user-2',
|
||||
}),
|
||||
]
|
||||
|
||||
@@ -358,7 +334,6 @@ describe('LogsIngestionConsumer', () => {
|
||||
|
||||
const messages = createKafkaMessages([logData], {
|
||||
token: team.api_token,
|
||||
distinct_id: 'user-1',
|
||||
})
|
||||
|
||||
await consumer.processKafkaBatch(messages)
|
||||
@@ -380,7 +355,7 @@ describe('LogsIngestionConsumer', () => {
|
||||
offset: offsetIncrementer++,
|
||||
timestamp: DateTime.now().toMillis(),
|
||||
partition: 1,
|
||||
headers: [{ token: Buffer.from(team.api_token) }, { distinct_id: Buffer.from('user-1') }],
|
||||
headers: [{ token: Buffer.from(team.api_token) }],
|
||||
} as Message,
|
||||
]
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import { KafkaProducerWrapper } from '~/kafka/producer'
|
||||
|
||||
import { KafkaConsumer, parseKafkaHeaders } from '../kafka/consumer'
|
||||
import { HealthCheckResult, Hub, LogsIngestionConsumerConfig, PluginServerService } from '../types'
|
||||
import { isDevEnv } from '../utils/env-utils'
|
||||
import { logger } from '../utils/logger'
|
||||
|
||||
export const logMessageDroppedCounter = new Counter({
|
||||
@@ -17,7 +18,6 @@ export const logMessageDroppedCounter = new Counter({
|
||||
export type LogsIngestionMessage = {
|
||||
token: string
|
||||
teamId: number
|
||||
distinctId: string
|
||||
message: Message
|
||||
}
|
||||
|
||||
@@ -82,7 +82,6 @@ export class LogsIngestionConsumer {
|
||||
headers: {
|
||||
token: message.token,
|
||||
team_id: message.teamId.toString(),
|
||||
distinct_id: message.distinctId,
|
||||
},
|
||||
})
|
||||
})
|
||||
@@ -98,24 +97,29 @@ export class LogsIngestionConsumer {
|
||||
try {
|
||||
const headers = parseKafkaHeaders(message.headers)
|
||||
const token = headers.token
|
||||
const distinctId = headers.distinct_id
|
||||
|
||||
if (!token || !distinctId) {
|
||||
if (!token) {
|
||||
logger.error('missing_token_or_distinct_id')
|
||||
// Write to DLQ topic maybe?
|
||||
logMessageDroppedCounter.inc({ reason: 'missing_token_or_distinct_id' })
|
||||
return
|
||||
}
|
||||
|
||||
const team = await this.hub.teamManager.getTeamByToken(token)
|
||||
let team = await this.hub.teamManager.getTeamByToken(token)
|
||||
if (isDevEnv() && token === 'phc_local') {
|
||||
// phc_local is a special token used in dev to refer to team 1
|
||||
team = await this.hub.teamManager.getTeam(1)
|
||||
}
|
||||
|
||||
if (!team) {
|
||||
// Write to DLQ topic maybe?
|
||||
logger.error('team_not_found')
|
||||
logMessageDroppedCounter.inc({ reason: 'team_not_found' })
|
||||
return
|
||||
}
|
||||
|
||||
events.push({
|
||||
token,
|
||||
distinctId,
|
||||
message,
|
||||
teamId: team.id,
|
||||
})
|
||||
|
||||
24
rust/Cargo.lock
generated
24
rust/Cargo.lock
generated
@@ -4575,6 +4575,7 @@ dependencies = [
|
||||
"apache-avro",
|
||||
"axum 0.7.5",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"capture",
|
||||
"chrono",
|
||||
"clickhouse",
|
||||
@@ -4585,6 +4586,7 @@ dependencies = [
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"opentelemetry-proto 0.29.0",
|
||||
"prost 0.13.5",
|
||||
"rdkafka",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
@@ -4593,8 +4595,6 @@ dependencies = [
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
"tonic-web",
|
||||
"tower 0.4.13",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
@@ -8465,26 +8465,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-web"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5299dd20801ad736dccb4a5ea0da7376e59cd98f213bf1c3d478cf53f4834b58"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"pin-project",
|
||||
"tokio-stream",
|
||||
"tonic 0.12.3",
|
||||
"tower-http",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
|
||||
@@ -122,7 +122,6 @@ regex = "1.11.1"
|
||||
lz-str = "0.2.1"
|
||||
opentelemetry-proto = "0.29.0"
|
||||
tonic = "0.12.3"
|
||||
tonic-web = "0.12.3"
|
||||
clickhouse = { version = "0.13.2", features = [
|
||||
"uuid",
|
||||
"time",
|
||||
|
||||
@@ -16,8 +16,6 @@ common-alloc = { path = "../common/alloc" }
|
||||
capture = { path = "../capture" }
|
||||
opentelemetry-proto = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
tonic-web = { workspace = true }
|
||||
tower = { workspace = true }
|
||||
jsonwebtoken = "8.3"
|
||||
serde = { workspace = true }
|
||||
serde_derive = { workspace = true }
|
||||
@@ -32,6 +30,8 @@ sha2 = "0.10.8"
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
apache-avro = { version = "0.18.0", features = ["zstandard"] }
|
||||
prost = "0.13.5"
|
||||
bytes = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
|
||||
@@ -10,11 +10,6 @@ pub const AVRO_SCHEMA: &str = r#"
|
||||
"doc": "Unique identifier for the log record."
|
||||
},
|
||||
{
|
||||
"name": "team_id",
|
||||
"type": ["null", "int"],
|
||||
"doc": "Identifier for the team associated with this record."
|
||||
},
|
||||
{
|
||||
"name": "trace_id",
|
||||
"type": ["null", "bytes"],
|
||||
"doc": "Identifier for the trace this log is a part of."
|
||||
|
||||
@@ -4,24 +4,19 @@ use capture::config::KafkaConfig;
|
||||
|
||||
#[derive(Envconfig, Clone)]
|
||||
pub struct Config {
|
||||
// management endpoint serves _readiness/_liveness/metrics
|
||||
#[envconfig(from = "MANAGEMENT_BIND_HOST", default = "::")]
|
||||
pub management_host: String,
|
||||
|
||||
#[envconfig(from = "MANAGEMENT_BIND_PORT", default = "8080")]
|
||||
pub management_port: u16,
|
||||
|
||||
#[envconfig(from = "BIND_HOST", default = "::")]
|
||||
pub host: String,
|
||||
|
||||
#[envconfig(from = "BIND_PORT", default = "4317")]
|
||||
#[envconfig(from = "BIND_PORT", default = "4318")]
|
||||
pub port: u16,
|
||||
|
||||
#[envconfig(from = "JWT_SECRET")]
|
||||
pub jwt_secret: String,
|
||||
|
||||
#[envconfig(from = "INSETER_PERIOD_MS", default = "1000")]
|
||||
pub inserter_period_ms: u64,
|
||||
|
||||
#[envconfig(from = "INSETER_MAX_BYTES", default = "50000000")]
|
||||
pub inserter_max_bytes: u64,
|
||||
|
||||
#[envconfig(from = "INSETER_MAX_ROWS", default = "10000")]
|
||||
pub inserter_max_rows: u64,
|
||||
|
||||
#[envconfig(nested = true)]
|
||||
pub kafka: KafkaConfig,
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ impl KafkaSink {
|
||||
self.producer.flush(Duration::new(30, 0))
|
||||
}
|
||||
|
||||
pub async fn write(&self, team_id: i32, rows: Vec<KafkaLogRow>) -> Result<(), anyhow::Error> {
|
||||
pub async fn write(&self, token: &str, rows: Vec<KafkaLogRow>) -> Result<(), anyhow::Error> {
|
||||
let schema = Schema::parse_str(AVRO_SCHEMA)?;
|
||||
let mut writer = Writer::with_codec(
|
||||
&schema,
|
||||
@@ -208,17 +208,10 @@ impl KafkaSink {
|
||||
partition: None,
|
||||
key: None::<Vec<u8>>.as_ref(),
|
||||
timestamp: None,
|
||||
headers: Some(
|
||||
OwnedHeaders::new()
|
||||
.insert(Header {
|
||||
key: "team_id",
|
||||
value: Some(&format!("{team_id}")),
|
||||
})
|
||||
.insert(Header {
|
||||
key: "schema.id",
|
||||
value: Some(&format!("{}", 1)),
|
||||
}),
|
||||
),
|
||||
headers: Some(OwnedHeaders::new().insert(Header {
|
||||
key: "token",
|
||||
value: Some(&token.to_string()),
|
||||
})),
|
||||
}) {
|
||||
Err((err, _)) => Err(anyhow!(format!("kafka error: {}", err))),
|
||||
Ok(delivery_future) => Ok(delivery_future),
|
||||
|
||||
@@ -20,29 +20,9 @@ use sha2::{Digest, Sha512};
|
||||
use tracing::debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Row, Debug, Serialize, Deserialize)]
|
||||
pub struct LogRow {
|
||||
team_id: i32,
|
||||
trace_id: [u8; 16],
|
||||
span_id: [u8; 8],
|
||||
trace_flags: u32,
|
||||
timestamp: u64,
|
||||
body: String,
|
||||
message: String,
|
||||
attributes: Vec<(String, String)>,
|
||||
severity_text: String,
|
||||
severity_number: i32,
|
||||
resource_attributes: Vec<(String, String)>,
|
||||
resource_id: String,
|
||||
instrumentation_scope: String,
|
||||
service_name: String,
|
||||
event_name: String,
|
||||
}
|
||||
|
||||
#[derive(Row, Debug, Serialize, Deserialize)]
|
||||
pub struct KafkaLogRow {
|
||||
pub uuid: String,
|
||||
pub team_id: i32,
|
||||
pub trace_id: String,
|
||||
pub span_id: String,
|
||||
pub trace_flags: u32,
|
||||
@@ -70,7 +50,6 @@ pub struct KafkaLogRow {
|
||||
|
||||
impl KafkaLogRow {
|
||||
pub fn new(
|
||||
team_id: i32,
|
||||
record: LogRecord,
|
||||
resource: Option<Resource>,
|
||||
scope: Option<InstrumentationScope>,
|
||||
@@ -119,7 +98,7 @@ impl KafkaLogRow {
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
attributes.extend(resource_attributes);
|
||||
attributes.extend(resource_attributes.clone());
|
||||
|
||||
let instrumentation_scope = match scope {
|
||||
Some(s) => format!("{}@{}", s.name, s.version),
|
||||
@@ -141,7 +120,6 @@ impl KafkaLogRow {
|
||||
|
||||
let log_row = Self {
|
||||
uuid: Uuid::now_v7().to_string(),
|
||||
team_id,
|
||||
trace_id: BASE64_STANDARD.encode(trace_id),
|
||||
span_id: BASE64_STANDARD.encode(span_id),
|
||||
trace_flags,
|
||||
@@ -151,7 +129,7 @@ impl KafkaLogRow {
|
||||
body,
|
||||
severity_text,
|
||||
severity_number,
|
||||
resource_attributes: HashMap::new(),
|
||||
resource_attributes: resource_attributes.into_iter().collect(),
|
||||
instrumentation_scope,
|
||||
event_name,
|
||||
resource_id,
|
||||
@@ -179,100 +157,6 @@ impl KafkaLogRow {
|
||||
}
|
||||
}
|
||||
|
||||
impl LogRow {
|
||||
pub fn new(
|
||||
team_id: i32,
|
||||
record: LogRecord,
|
||||
resource: Option<Resource>,
|
||||
scope: Option<InstrumentationScope>,
|
||||
) -> Result<Self> {
|
||||
// Extract body
|
||||
let body = match record.body {
|
||||
Some(body) => match body.value {
|
||||
Some(value) => match value {
|
||||
opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => {
|
||||
s.clone()
|
||||
}
|
||||
_ => format!("{value:?}"),
|
||||
},
|
||||
None => "".to_string(),
|
||||
},
|
||||
None => "".to_string(),
|
||||
};
|
||||
|
||||
let message = try_extract_message(&body).unwrap_or_default();
|
||||
|
||||
let mut severity_text = normalize_severity_text(record.severity_text);
|
||||
let mut severity_number = record.severity_number;
|
||||
|
||||
if let Some(parsed_severity) = try_extract_severity(&body) {
|
||||
severity_text = parsed_severity;
|
||||
severity_number = convert_severity_text_to_number(&severity_text);
|
||||
}
|
||||
|
||||
// severity_number takes priority if both provided
|
||||
if record.severity_number > 0 {
|
||||
severity_text = convert_severity_number_to_text(record.severity_number);
|
||||
} else {
|
||||
severity_number = convert_severity_text_to_number(&severity_text);
|
||||
}
|
||||
|
||||
let resource_id = extract_resource_id(&resource);
|
||||
let resource_attributes = extract_resource_attributes(resource);
|
||||
|
||||
let mut attributes: Vec<(String, String)> = record
|
||||
.attributes
|
||||
.into_iter()
|
||||
.map(|kv| {
|
||||
(
|
||||
kv.key,
|
||||
any_value_to_string(kv.value.unwrap_or(AnyValue {
|
||||
value: Some(Value::StringValue("".to_string())),
|
||||
})),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
attributes.extend(resource_attributes.clone());
|
||||
|
||||
let instrumentation_scope = match scope {
|
||||
Some(s) => format!("{}@{}", s.name, s.version),
|
||||
None => "".to_string(),
|
||||
};
|
||||
|
||||
let event_name = extract_string(&attributes, "event.name");
|
||||
let service_name = extract_string(&attributes, "service.name");
|
||||
|
||||
// Trace/span IDs
|
||||
let trace_id = extract_trace_id(&record.trace_id);
|
||||
let span_id = extract_span_id(&record.span_id);
|
||||
|
||||
// Trace flags
|
||||
let trace_flags = record.flags;
|
||||
|
||||
let log_row = Self {
|
||||
// uuid: Uuid::now_v7(),
|
||||
team_id,
|
||||
trace_id,
|
||||
span_id,
|
||||
trace_flags,
|
||||
timestamp: record.time_unix_nano,
|
||||
body,
|
||||
message,
|
||||
attributes,
|
||||
severity_text,
|
||||
severity_number,
|
||||
resource_attributes,
|
||||
instrumentation_scope,
|
||||
event_name,
|
||||
resource_id,
|
||||
service_name,
|
||||
};
|
||||
debug!("log: {log_row:?}");
|
||||
|
||||
Ok(log_row)
|
||||
}
|
||||
}
|
||||
|
||||
// extract a JSON value as a string. If it's a string, strip the surrounding "quotes"
|
||||
fn extract_string_from_map(attributes: &HashMap<String, String>, key: &str) -> String {
|
||||
if let Some(value) = attributes.get(key) {
|
||||
@@ -286,19 +170,6 @@ fn extract_string_from_map(attributes: &HashMap<String, String>, key: &str) -> S
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_string(attributes: &[(String, String)], key: &str) -> String {
|
||||
for (k, val) in attributes.iter() {
|
||||
if k == key {
|
||||
if let Ok(JsonValue::String(value)) = serde_json::from_str::<JsonValue>(val) {
|
||||
return value.to_string();
|
||||
} else {
|
||||
return val.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
"".to_string()
|
||||
}
|
||||
|
||||
fn extract_json_string(value: String) -> String {
|
||||
match serde_json::from_str::<JsonValue>(&value) {
|
||||
Ok(JsonValue::String(value)) => value,
|
||||
@@ -388,23 +259,6 @@ fn convert_severity_number_to_text(severity_number: i32) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
// TOOD - pull this from PG
|
||||
const MESSAGE_KEYS: [&str; 3] = ["message", "msg", "log.message"];
|
||||
|
||||
fn try_extract_message(body: &str) -> Option<String> {
|
||||
let Ok(value) = serde_json::from_str::<JsonValue>(body) else {
|
||||
return None;
|
||||
};
|
||||
|
||||
for key in MESSAGE_KEYS {
|
||||
if let Some(JsonValue::String(s)) = value.get(key) {
|
||||
return Some(s.clone());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn extract_resource_attributes(resource: Option<Resource>) -> Vec<(String, String)> {
|
||||
let Some(resource) = resource else {
|
||||
return Vec::new();
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
|
||||
use std::time::Duration;
|
||||
use tonic_web::GrpcWebLayer;
|
||||
use tower::Layer as TowerLayer;
|
||||
|
||||
use axum::routing::get;
|
||||
use axum::{routing::get, routing::post, Router};
|
||||
use capture::metrics_middleware::track_metrics;
|
||||
use common_metrics::{serve, setup_metrics_routes};
|
||||
use log_capture::config::Config;
|
||||
use log_capture::kafka::KafkaSink;
|
||||
use log_capture::service::export_logs_http;
|
||||
use log_capture::service::Service;
|
||||
use std::future::ready;
|
||||
|
||||
@@ -52,30 +50,51 @@ async fn main() {
|
||||
.await
|
||||
.expect("failed to start Kafka sink");
|
||||
|
||||
let bind = format!("{}:{}", config.host, config.port);
|
||||
let management_router = Router::new()
|
||||
.route("/", get(index))
|
||||
.route("/_readiness", get(index))
|
||||
.route(
|
||||
"/_liveness",
|
||||
get(move || ready(health_registry.get_status())),
|
||||
);
|
||||
let management_router = setup_metrics_routes(management_router);
|
||||
let management_bind = format!("{}:{}", config.management_host, config.management_port);
|
||||
info!("Healthcheck and metrics listening on {}", management_bind);
|
||||
|
||||
// Initialize ClickHouse writer and logs service
|
||||
let logs_service = match Service::new(config.clone(), kafka_sink).await {
|
||||
let logs_service = match Service::new(kafka_sink).await {
|
||||
Ok(service) => service,
|
||||
Err(e) => {
|
||||
error!("Failed to initialize log service: {}", e);
|
||||
panic!("Could not start log capture service: {e}");
|
||||
}
|
||||
};
|
||||
let http_bind = format!("{}:{}", config.host, config.port);
|
||||
info!("Listening on {}", http_bind);
|
||||
|
||||
let router = tonic::service::Routes::new(GrpcWebLayer::new().layer(tonic_web::enable(
|
||||
LogsServiceServer::new(logs_service.clone()),
|
||||
)))
|
||||
.add_service(tonic_web::enable(TraceServiceServer::new(logs_service)))
|
||||
.prepare()
|
||||
.into_axum_router()
|
||||
.route("/", get(index))
|
||||
.route("/_readiness", get(index))
|
||||
.route(
|
||||
"/_liveness",
|
||||
get(move || ready(health_registry.get_status())),
|
||||
);
|
||||
let router = setup_metrics_routes(router);
|
||||
let http_router = Router::new()
|
||||
.route("/v1/logs", post(export_logs_http))
|
||||
.with_state(logs_service)
|
||||
.layer(axum::middleware::from_fn(track_metrics));
|
||||
|
||||
serve(router, &bind).await.unwrap();
|
||||
let http_server = tokio::spawn(async move {
|
||||
if let Err(e) = serve(http_router, &http_bind).await {
|
||||
error!("HTTP server failed: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let mgmt_server = tokio::spawn(async move {
|
||||
if let Err(e) = serve(management_router, &management_bind).await {
|
||||
error!("Management server failed: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for any server to finish
|
||||
tokio::select! {
|
||||
_ = http_server => {
|
||||
error!("HTTP server stopped unexpectedly");
|
||||
}
|
||||
_ = mgmt_server => {
|
||||
error!("Management server stopped unexpectedly");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,102 +1,96 @@
|
||||
use crate::log_record::KafkaLogRow;
|
||||
use crate::{auth::authenticate_request, config::Config};
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::{
|
||||
logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse,
|
||||
};
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::{
|
||||
ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
use axum::{
|
||||
extract::State,
|
||||
http::{HeaderMap, StatusCode},
|
||||
response::Json,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use prost::Message;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::kafka::KafkaSink;
|
||||
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::{debug, error};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Service {
|
||||
config: Config,
|
||||
sink: KafkaSink,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub async fn new(config: Config, kafka_sink: KafkaSink) -> Result<Self, anyhow::Error> {
|
||||
Ok(Self {
|
||||
config,
|
||||
sink: kafka_sink,
|
||||
})
|
||||
pub async fn new(kafka_sink: KafkaSink) -> Result<Self, anyhow::Error> {
|
||||
Ok(Self { sink: kafka_sink })
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl LogsService for Service {
|
||||
async fn export(
|
||||
&self,
|
||||
request: Request<ExportLogsServiceRequest>,
|
||||
) -> Result<Response<ExportLogsServiceResponse>, Status> {
|
||||
// Extract team_id from JWT token in the Authorization header
|
||||
let team_id = match authenticate_request(&request, &self.config.jwt_secret) {
|
||||
Ok(team_id) => team_id,
|
||||
Err(status) => {
|
||||
return Err(*status);
|
||||
}
|
||||
};
|
||||
pub async fn export_logs_http(
|
||||
State(service): State<Service>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
|
||||
// The Project API key must be passed in as a Bearer token in the Authorization header
|
||||
if !headers.contains_key("Authorization") {
|
||||
error!("No Authorization header");
|
||||
return Err((
|
||||
StatusCode::UNAUTHORIZED,
|
||||
Json(json!({"error": format!("No Authorization header")})),
|
||||
));
|
||||
}
|
||||
|
||||
let team_id = match team_id.parse::<i32>() {
|
||||
Ok(id) => id,
|
||||
Err(e) => {
|
||||
error!("Failed to parse team_id '{team_id}' as i32: {e}");
|
||||
return Err(Status::invalid_argument(format!(
|
||||
"Invalid team_id format: {team_id}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let token = headers["Authorization"]
|
||||
.to_str()
|
||||
.unwrap_or("")
|
||||
.split("Bearer ")
|
||||
.last();
|
||||
if token.is_none() || token == Some("") {
|
||||
error!("No token provided");
|
||||
return Err((
|
||||
StatusCode::UNAUTHORIZED,
|
||||
Json(json!({"error": format!("No token provided")})),
|
||||
));
|
||||
}
|
||||
let export_request = ExportLogsServiceRequest::decode(body.as_ref()).map_err(|e| {
|
||||
error!("Failed to decode protobuf: {}", e);
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(json!({"error": format!("Failed to decode protobuf: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let export_request = request.into_inner();
|
||||
let mut rows: Vec<KafkaLogRow> = Vec::new();
|
||||
for resource_logs in export_request.resource_logs {
|
||||
for scope_logs in resource_logs.scope_logs {
|
||||
for log_record in scope_logs.log_records {
|
||||
let row = match KafkaLogRow::new(
|
||||
team_id,
|
||||
log_record,
|
||||
resource_logs.resource.clone(),
|
||||
scope_logs.scope.clone(),
|
||||
) {
|
||||
Ok(row) => row,
|
||||
Err(e) => {
|
||||
error!("Failed to create LogRow: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
rows.push(row);
|
||||
}
|
||||
let mut rows: Vec<KafkaLogRow> = Vec::new();
|
||||
for resource_logs in export_request.resource_logs {
|
||||
for scope_logs in resource_logs.scope_logs {
|
||||
for log_record in scope_logs.log_records {
|
||||
let row = match KafkaLogRow::new(
|
||||
log_record,
|
||||
resource_logs.resource.clone(),
|
||||
scope_logs.scope.clone(),
|
||||
) {
|
||||
Ok(row) => row,
|
||||
Err(e) => {
|
||||
error!("Failed to create LogRow: {e}");
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(json!({"error": format!("Bad input format provided")})),
|
||||
));
|
||||
}
|
||||
};
|
||||
rows.push(row);
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.sink.write(team_id, rows).await {
|
||||
error!("Failed to send logs to Kafka: {}", e);
|
||||
} else {
|
||||
debug!("Successfully sent logs to Kafka");
|
||||
}
|
||||
|
||||
// A successful OTLP export expects an ExportLogsServiceResponse.
|
||||
let response = ExportLogsServiceResponse {
|
||||
partial_success: None,
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl TraceService for Service {
|
||||
async fn export(
|
||||
&self,
|
||||
_request: Request<ExportTraceServiceRequest>,
|
||||
) -> Result<Response<ExportTraceServiceResponse>, Status> {
|
||||
let response = ExportTraceServiceResponse {
|
||||
partial_success: None,
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
if let Err(e) = service.sink.write(token.unwrap(), rows).await {
|
||||
error!("Failed to send logs to Kafka: {}", e);
|
||||
return Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!({"error": format!("Internal server error")})),
|
||||
));
|
||||
} else {
|
||||
debug!("Successfully sent logs to Kafka");
|
||||
}
|
||||
|
||||
// Return empty JSON object per OTLP spec
|
||||
Ok(Json(json!({})))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user