diff --git a/bin/clickhouse-logs.sql b/bin/clickhouse-logs.sql index 26a0397d4e..a0bcddf557 100644 --- a/bin/clickhouse-logs.sql +++ b/bin/clickhouse-logs.sql @@ -1,12 +1,14 @@ +-- temporary sql to initialise log tables for local development +-- will be removed once we have migrations set up CREATE OR REPLACE FUNCTION extractIPv4Substrings AS ( body -> extractAll(body, '(\d\.((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){2,2}([0-9]))') ); --- temporary sql to initialise log tables for local development --- will be removed once we have migrations set up -CREATE TABLE if not exists logs27 +CREATE OR REPLACE TABLE logs31 ( - `time_bucket` DateTime MATERIALIZED toStartOfInterval(timestamp, toIntervalMinute(5)) CODEC(DoubleDelta, ZSTD(1)), + -- time bucket is set to day which means it's effectively not in the order by key (same as partition) + -- but gives us flexibility to add the bucket to the order key if needed + `time_bucket` DateTime MATERIALIZED toStartOfDay(timestamp) CODEC(DoubleDelta, ZSTD(1)), `uuid` String CODEC(ZSTD(1)), `team_id` Int32 CODEC(ZSTD(1)), `trace_id` String CODEC(ZSTD(1)), @@ -20,32 +22,22 @@ CREATE TABLE if not exists logs27 `severity_number` Int32 CODEC(ZSTD(1)), `service_name` String CODEC(ZSTD(1)), `resource_attributes` Map(String, String) CODEC(ZSTD(1)), + `resource_fingerprint` UInt64 MATERIALIZED cityHash64(resource_attributes) CODEC(DoubleDelta, ZSTD(1)), `resource_id` String CODEC(ZSTD(1)), `instrumentation_scope` String CODEC(ZSTD(1)), `event_name` String CODEC(ZSTD(1)), - `attributes_map_str` Map(String, String) CODEC(ZSTD(1)), - `attributes_map_float` Map(String, Float64) CODEC(ZSTD(1)), - `attributes_map_datetime` Map(String, DateTime64(6)) CODEC(ZSTD(1)), + `attributes_map_str` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `attributes_map_float` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)), + `attributes_map_datetime` Map(LowCardinality(String), DateTime64(6)) CODEC(ZSTD(1)), `level` String ALIAS severity_text, `mat_body_ipv4_matches` Array(String) ALIAS extractAll(body, '(\\d\\.((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){2,2}([0-9]))'), `time_minute` DateTime ALIAS toStartOfMinute(timestamp), - `attributes` Map(String, String) ALIAS mapApply((k, v) -> (k, toJSONString(v)), attributes_map_str), + `attributes` Map(String, String) ALIAS mapApply((k, v) -> (left(k, -5), v), attributes_map_str), INDEX idx_severity_text_set severity_text TYPE set(10) GRANULARITY 1, INDEX idx_attributes_str_keys mapKeys(attributes_map_str) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_mat_body_ipv4_matches mat_body_ipv4_matches TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_body_ngram3 body TYPE ngrambf_v1(3, 20000, 2, 0) GRANULARITY 1, - CONSTRAINT assume_time_bucket ASSUME toStartOfInterval(timestamp, toIntervalMinute(5)) = time_bucket, - PROJECTION projection_trace_span - ( - SELECT - trace_id, - timestamp, - _part_offset - ORDER BY - trace_id, - timestamp - ), + INDEX idx_body_ngram3 body TYPE ngrambf_v1(3, 25000, 2, 0) GRANULARITY 1, PROJECTION projection_aggregate_counts ( SELECT @@ -54,8 +46,7 @@ CREATE TABLE if not exists logs27 toStartOfMinute(timestamp), service_name, severity_text, - resource_attributes, - resource_id, + resource_fingerprint, count() AS event_count GROUP BY team_id, @@ -63,43 +54,47 @@ CREATE TABLE if not exists logs27 toStartOfMinute(timestamp), service_name, severity_text, - resource_attributes, - resource_id + resource_fingerprint ) ) -ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/logs27', '{replica}') +ENGINE = MergeTree PARTITION BY toDate(timestamp) -ORDER BY (team_id, time_bucket DESC, service_name, resource_attributes, severity_text, timestamp DESC, uuid, trace_id, span_id) -SETTINGS allow_remote_fs_zero_copy_replication = 1, -allow_experimental_reverse_key = 1, -deduplicate_merge_projection_mode = 'ignore'; +PRIMARY KEY (team_id, time_bucket, service_name, resource_fingerprint, severity_text, timestamp) +ORDER BY (team_id, time_bucket, service_name, resource_fingerprint, severity_text, timestamp) +SETTINGS + index_granularity_bytes = 104857600, + index_granularity = 8192, + ttl_only_drop_parts = 1; -create or replace TABLE logs AS logs27 ENGINE = Distributed('posthog', 'default', 'logs27'); +create or replace TABLE logs AS logs31 ENGINE = Distributed('posthog', 'default', 'logs31'); -create table if not exists log_attributes +create or replace table default.log_attributes ( `team_id` Int32, `time_bucket` DateTime64(0), `service_name` LowCardinality(String), + `resource_id` String DEFAULT '', + `resource_fingerprint` UInt64 DEFAULT 0, `attribute_key` LowCardinality(String), `attribute_value` String, `attribute_count` SimpleAggregateFunction(sum, UInt64), INDEX idx_attribute_key attribute_key TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_attribute_value attribute_value TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_attribute_value attribute_value TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_attribute_key_n3 attribute_key TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1, INDEX idx_attribute_value_n3 attribute_value TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1 ) -ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/log_attributes', '{replica}') +ENGINE = AggregatingMergeTree PARTITION BY toDate(time_bucket) -ORDER BY (team_id, service_name, time_bucket, attribute_key, attribute_value); +ORDER BY (team_id, time_bucket, resource_fingerprint, attribute_key, attribute_value); -set enable_dynamic_type=1; -CREATE MATERIALIZED VIEW if not exists log_to_log_attributes TO log_attributes +drop view if exists log_to_log_attributes; +CREATE MATERIALIZED VIEW log_to_log_attributes TO log_attributes ( `team_id` Int32, `time_bucket` DateTime64(0), `service_name` LowCardinality(String), + `resource_fingerprint` UInt64, `attribute_key` LowCardinality(String), `attribute_value` String, `attribute_count` SimpleAggregateFunction(sum, UInt64) @@ -108,23 +103,28 @@ AS SELECT team_id, time_bucket, service_name, + resource_fingerprint, attribute_key, attribute_value, attribute_count -FROM (select - team_id AS team_id, - toStartOfInterval(timestamp, toIntervalMinute(10)) AS time_bucket, - service_name AS service_name, - arrayJoin(arrayMap((k, v) -> (k, if(length(v) > 256, '', v)), arrayFilter((k, v) -> (length(k) < 256), CAST(attributes, 'Array(Tuple(String, String))')))) AS attribute, - attribute.1 AS attribute_key, - CAST(JSONExtract(attribute.2, 'Dynamic'), 'String') AS attribute_value, - sumSimpleState(1) AS attribute_count -FROM logs27 -GROUP BY - team_id, - time_bucket, - service_name, - attribute +FROM +( + SELECT + team_id AS team_id, + toStartOfInterval(timestamp, toIntervalMinute(10)) AS time_bucket, + service_name AS service_name, + resource_fingerprint, + arrayJoin(mapFilter((k, v) -> ((length(k) < 256) AND (length(v) < 256)), attributes)) AS attribute, + attribute.1 AS attribute_key, + attribute.2 AS attribute_value, + sumSimpleState(1) AS attribute_count + FROM logs31 + GROUP BY + team_id, + time_bucket, + service_name, + resource_fingerprint, + attribute ); CREATE OR REPLACE TABLE kafka_logs_avro @@ -139,11 +139,10 @@ CREATE OR REPLACE TABLE kafka_logs_avro `severity_text` String, `severity_number` Int32, `service_name` String, - `resource_attributes` Map(String, String), - `resource_id` String, + `resource_attributes` Map(LowCardinality(String), String), `instrumentation_scope` String, `event_name` String, - `attributes` Map(String, Nullable(String)) + `attributes` Map(LowCardinality(String), String) ) ENGINE = Kafka('kafka:9092', 'clickhouse_logs', 'clickhouse-logs-avro', 'Avro') SETTINGS @@ -152,15 +151,14 @@ SETTINGS kafka_thread_per_consumer = 1, kafka_num_consumers = 1, kafka_poll_timeout_ms=15000, - kafka_poll_max_batch_size=1, - kafka_max_block_size=1; + kafka_poll_max_batch_size=10, + kafka_max_block_size=10; drop table if exists kafka_logs_avro_mv; -CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs27 +CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs31 ( `uuid` String, - `team_id` Int32, `trace_id` String, `span_id` String, `trace_flags` Int32, @@ -170,11 +168,10 @@ CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs27 `severity_text` String, `severity_number` Int32, `service_name` String, - `resource_attributes` Map(String, String), - `resource_id` String, + `resource_attributes` Map(LowCardinality(String), String), `instrumentation_scope` String, `event_name` String, - `attributes` Map(String, Nullable(String)) + `attributes` Map(LowCardinality(String), String) ) AS SELECT * except (attributes, resource_attributes), diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 24c61fedb9..bfb2e5fa0b 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -111,7 +111,7 @@ services: service: clickhouse hostname: clickhouse # Development performance optimizations - mem_limit: 4g + mem_limit: 6g cpus: 2 environment: - AWS_ACCESS_KEY_ID=object_storage_root_user diff --git a/rust/Cargo.lock b/rust/Cargo.lock index af46ab1623..052c0ddf00 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1574,7 +1574,6 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "sha2", "thiserror 2.0.16", "tokio", "tonic 0.12.3", diff --git a/rust/capture-logs/Cargo.toml b/rust/capture-logs/Cargo.toml index 511ce68368..820ef459c4 100644 --- a/rust/capture-logs/Cargo.toml +++ b/rust/capture-logs/Cargo.toml @@ -27,7 +27,6 @@ serde_json = { workspace = true } metrics = { workspace = true } base64 = { workspace = true } rdkafka = { workspace = true } -sha2 = "0.10.8" uuid = { workspace = true } chrono = { workspace = true } apache-avro = { version = "0.18.0", features = ["zstandard"] } diff --git a/rust/capture-logs/src/avro_schema.rs b/rust/capture-logs/src/avro_schema.rs index 89e52a6f47..f848a8fa7b 100644 --- a/rust/capture-logs/src/avro_schema.rs +++ b/rust/capture-logs/src/avro_schema.rs @@ -41,14 +41,6 @@ pub const AVRO_SCHEMA: &str = r#" "doc": "The timestamp when the event was observed or ingested, in microseconds since epoch." }, { - "name": "created_at", - "type": ["null", { - "type": "long", - "logicalType": "timestamp-micros" - }], - "doc": "The timestamp when the record was created in the system, in microseconds since epoch." - }, - { "name": "body", "type": ["null", "string"], "doc": "The main content or message of the log." @@ -77,11 +69,6 @@ pub const AVRO_SCHEMA: &str = r#" "doc": "Attributes describing the resource that produced the log (e.g., host, region)." }, { - "name": "resource_id", - "type": ["null", "string"], - "doc": "A unique identifier for the resource." - }, - { "name": "instrumentation_scope", "type": ["null", "string"], "doc": "The name of the library or framework that captured the log." @@ -98,33 +85,6 @@ pub const AVRO_SCHEMA: &str = r#" "values": "string" }], "doc": "A map of custom string-valued attributes associated with the log." - }, - { - "name": "attributes_map_str", - "type": ["null", { - "type": "map", - "values": "string" - }], - "doc": "Additional map of string attributes." - }, - { - "name": "attributes_map_float", - "type": ["null", { - "type": "map", - "values": "double" - }], - "doc": "Map of custom double-precision float attributes." - }, - { - "name": "attributes_map_datetime", - "type": ["null", { - "type": "map", - "values": { - "type": "long", - "logicalType": "timestamp-millis" - } - }], - "doc": "Map of custom timestamp-valued attributes." } ] }"#; diff --git a/rust/capture-logs/src/kafka.rs b/rust/capture-logs/src/kafka.rs index 103efabe5f..131fb34960 100644 --- a/rust/capture-logs/src/kafka.rs +++ b/rust/capture-logs/src/kafka.rs @@ -201,7 +201,7 @@ impl KafkaSink { Codec::Zstandard(ZstandardSettings::default()), ); - for row in rows { + for row in &rows { writer.append_ser(row)?; } @@ -226,6 +226,10 @@ impl KafkaSink { .insert(Header { key: "bytes_compressed", value: Some(&payload.len().to_string()), + }) + .insert(Header { + key: "record_count", + value: Some(&rows.len().to_string()), }), ), }) { diff --git a/rust/capture-logs/src/log_record.rs b/rust/capture-logs/src/log_record.rs index 6b6938aac3..0d9ce3b28a 100644 --- a/rust/capture-logs/src/log_record.rs +++ b/rust/capture-logs/src/log_record.rs @@ -16,7 +16,6 @@ use opentelemetry_proto::tonic::{ }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value as JsonValue}; -use sha2::{Digest, Sha512}; use tracing::debug; use uuid::Uuid; @@ -30,20 +29,14 @@ pub struct KafkaLogRow { pub timestamp: DateTime, #[serde(with = "ts_microseconds")] pub observed_timestamp: DateTime, - #[serde(with = "ts_microseconds")] - pub created_at: DateTime, pub body: String, pub severity_text: String, pub severity_number: i32, pub service_name: String, pub resource_attributes: HashMap, - pub resource_id: String, pub instrumentation_scope: String, pub event_name: String, pub attributes: HashMap, - pub attributes_map_str: HashMap, - pub attributes_map_float: HashMap, - pub attributes_map_datetime: HashMap, } impl KafkaLogRow { @@ -81,7 +74,6 @@ impl KafkaLogRow { severity_number = convert_severity_text_to_number(&severity_text); } - let resource_id = extract_resource_id(&resource); let resource_attributes = extract_resource_attributes(resource); let mut attributes: HashMap = record @@ -123,29 +115,14 @@ impl KafkaLogRow { trace_flags, timestamp, observed_timestamp, - created_at: observed_timestamp, body, severity_text, severity_number, resource_attributes: resource_attributes.into_iter().collect(), instrumentation_scope, event_name, - resource_id, service_name, - attributes: attributes.clone(), - attributes_map_str: attributes - .clone() - .into_iter() - .map(|(k, v)| (k + "__str", extract_json_string(v))) - .collect(), - attributes_map_float: attributes - .clone() - .into_iter() - .map(|(k, v)| (k, extract_json_float(v))) - .filter(|(_, v)| v.is_some()) - .map(|(k, v)| (k + "__float", v.unwrap())) - .collect(), - attributes_map_datetime: HashMap::new(), + attributes, }; debug!("log: {:?}", log_row); @@ -166,20 +143,6 @@ fn extract_string_from_map(attributes: &HashMap, key: &str) -> S } } -fn extract_json_string(value: String) -> String { - match serde_json::from_str::(&value) { - Ok(JsonValue::String(value)) => value, - _ => value, - } -} - -fn extract_json_float(value: String) -> Option { - match serde_json::from_str::(&value) { - Ok(JsonValue::Number(value)) => value.as_f64(), - _ => None, - } -} - fn extract_trace_id(input: &[u8]) -> [u8; 16] { if input.len() == 16 { let mut bytes = [0; 16]; @@ -274,26 +237,6 @@ fn extract_resource_attributes(resource: Option) -> Vec<(String, Strin .collect() } -fn extract_resource_id(resource: &Option) -> String { - let Some(resource) = resource else { - return "".to_string(); - }; - - let mut hasher = Sha512::new(); - for pair in resource.attributes.iter() { - hasher.update(pair.key.as_bytes()); - hasher.update( - pair.value - .clone() - .map(any_value_to_json) - .map(|v| v.to_string()) - .unwrap_or_default(), - ); - } - - format!("{:x}", hasher.finalize()) -} - // TODO - pull this from PG const SEVERITY_KEYS: [&str; 4] = ["level", "severity", "log.level", "config.log_level"];