feat(capture-logs): remove unused attributes from logs capture (#41379)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Frank Hamand
2025-11-13 16:54:51 +00:00
committed by GitHub
parent 8895a68920
commit 9032507735
7 changed files with 64 additions and 162 deletions

View File

@@ -1,12 +1,14 @@
-- temporary sql to initialise log tables for local development
-- will be removed once we have migrations set up
CREATE OR REPLACE FUNCTION extractIPv4Substrings AS
(
body -> extractAll(body, '(\d\.((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){2,2}([0-9]))')
);
-- temporary sql to initialise log tables for local development
-- will be removed once we have migrations set up
CREATE TABLE if not exists logs27
CREATE OR REPLACE TABLE logs31
(
`time_bucket` DateTime MATERIALIZED toStartOfInterval(timestamp, toIntervalMinute(5)) CODEC(DoubleDelta, ZSTD(1)),
-- time bucket is set to day which means it's effectively not in the order by key (same as partition)
-- but gives us flexibility to add the bucket to the order key if needed
`time_bucket` DateTime MATERIALIZED toStartOfDay(timestamp) CODEC(DoubleDelta, ZSTD(1)),
`uuid` String CODEC(ZSTD(1)),
`team_id` Int32 CODEC(ZSTD(1)),
`trace_id` String CODEC(ZSTD(1)),
@@ -20,32 +22,22 @@ CREATE TABLE if not exists logs27
`severity_number` Int32 CODEC(ZSTD(1)),
`service_name` String CODEC(ZSTD(1)),
`resource_attributes` Map(String, String) CODEC(ZSTD(1)),
`resource_fingerprint` UInt64 MATERIALIZED cityHash64(resource_attributes) CODEC(DoubleDelta, ZSTD(1)),
`resource_id` String CODEC(ZSTD(1)),
`instrumentation_scope` String CODEC(ZSTD(1)),
`event_name` String CODEC(ZSTD(1)),
`attributes_map_str` Map(String, String) CODEC(ZSTD(1)),
`attributes_map_float` Map(String, Float64) CODEC(ZSTD(1)),
`attributes_map_datetime` Map(String, DateTime64(6)) CODEC(ZSTD(1)),
`attributes_map_str` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
`attributes_map_float` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)),
`attributes_map_datetime` Map(LowCardinality(String), DateTime64(6)) CODEC(ZSTD(1)),
`level` String ALIAS severity_text,
`mat_body_ipv4_matches` Array(String) ALIAS extractAll(body, '(\\d\\.((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){2,2}([0-9]))'),
`time_minute` DateTime ALIAS toStartOfMinute(timestamp),
`attributes` Map(String, String) ALIAS mapApply((k, v) -> (k, toJSONString(v)), attributes_map_str),
`attributes` Map(String, String) ALIAS mapApply((k, v) -> (left(k, -5), v), attributes_map_str),
INDEX idx_severity_text_set severity_text TYPE set(10) GRANULARITY 1,
INDEX idx_attributes_str_keys mapKeys(attributes_map_str) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_mat_body_ipv4_matches mat_body_ipv4_matches TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body_ngram3 body TYPE ngrambf_v1(3, 20000, 2, 0) GRANULARITY 1,
CONSTRAINT assume_time_bucket ASSUME toStartOfInterval(timestamp, toIntervalMinute(5)) = time_bucket,
PROJECTION projection_trace_span
(
SELECT
trace_id,
timestamp,
_part_offset
ORDER BY
trace_id,
timestamp
),
INDEX idx_body_ngram3 body TYPE ngrambf_v1(3, 25000, 2, 0) GRANULARITY 1,
PROJECTION projection_aggregate_counts
(
SELECT
@@ -54,8 +46,7 @@ CREATE TABLE if not exists logs27
toStartOfMinute(timestamp),
service_name,
severity_text,
resource_attributes,
resource_id,
resource_fingerprint,
count() AS event_count
GROUP BY
team_id,
@@ -63,43 +54,47 @@ CREATE TABLE if not exists logs27
toStartOfMinute(timestamp),
service_name,
severity_text,
resource_attributes,
resource_id
resource_fingerprint
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/logs27', '{replica}')
ENGINE = MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (team_id, time_bucket DESC, service_name, resource_attributes, severity_text, timestamp DESC, uuid, trace_id, span_id)
SETTINGS allow_remote_fs_zero_copy_replication = 1,
allow_experimental_reverse_key = 1,
deduplicate_merge_projection_mode = 'ignore';
PRIMARY KEY (team_id, time_bucket, service_name, resource_fingerprint, severity_text, timestamp)
ORDER BY (team_id, time_bucket, service_name, resource_fingerprint, severity_text, timestamp)
SETTINGS
index_granularity_bytes = 104857600,
index_granularity = 8192,
ttl_only_drop_parts = 1;
create or replace TABLE logs AS logs27 ENGINE = Distributed('posthog', 'default', 'logs27');
create or replace TABLE logs AS logs31 ENGINE = Distributed('posthog', 'default', 'logs31');
create table if not exists log_attributes
create or replace table default.log_attributes
(
`team_id` Int32,
`time_bucket` DateTime64(0),
`service_name` LowCardinality(String),
`resource_id` String DEFAULT '',
`resource_fingerprint` UInt64 DEFAULT 0,
`attribute_key` LowCardinality(String),
`attribute_value` String,
`attribute_count` SimpleAggregateFunction(sum, UInt64),
INDEX idx_attribute_key attribute_key TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attribute_value attribute_value TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_attribute_value attribute_value TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attribute_key_n3 attribute_key TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1,
INDEX idx_attribute_value_n3 attribute_value TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1
)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/log_attributes', '{replica}')
ENGINE = AggregatingMergeTree
PARTITION BY toDate(time_bucket)
ORDER BY (team_id, service_name, time_bucket, attribute_key, attribute_value);
ORDER BY (team_id, time_bucket, resource_fingerprint, attribute_key, attribute_value);
set enable_dynamic_type=1;
CREATE MATERIALIZED VIEW if not exists log_to_log_attributes TO log_attributes
drop view if exists log_to_log_attributes;
CREATE MATERIALIZED VIEW log_to_log_attributes TO log_attributes
(
`team_id` Int32,
`time_bucket` DateTime64(0),
`service_name` LowCardinality(String),
`resource_fingerprint` UInt64,
`attribute_key` LowCardinality(String),
`attribute_value` String,
`attribute_count` SimpleAggregateFunction(sum, UInt64)
@@ -108,23 +103,28 @@ AS SELECT
team_id,
time_bucket,
service_name,
resource_fingerprint,
attribute_key,
attribute_value,
attribute_count
FROM (select
team_id AS team_id,
toStartOfInterval(timestamp, toIntervalMinute(10)) AS time_bucket,
service_name AS service_name,
arrayJoin(arrayMap((k, v) -> (k, if(length(v) > 256, '', v)), arrayFilter((k, v) -> (length(k) < 256), CAST(attributes, 'Array(Tuple(String, String))')))) AS attribute,
attribute.1 AS attribute_key,
CAST(JSONExtract(attribute.2, 'Dynamic'), 'String') AS attribute_value,
sumSimpleState(1) AS attribute_count
FROM logs27
GROUP BY
team_id,
time_bucket,
service_name,
attribute
FROM
(
SELECT
team_id AS team_id,
toStartOfInterval(timestamp, toIntervalMinute(10)) AS time_bucket,
service_name AS service_name,
resource_fingerprint,
arrayJoin(mapFilter((k, v) -> ((length(k) < 256) AND (length(v) < 256)), attributes)) AS attribute,
attribute.1 AS attribute_key,
attribute.2 AS attribute_value,
sumSimpleState(1) AS attribute_count
FROM logs31
GROUP BY
team_id,
time_bucket,
service_name,
resource_fingerprint,
attribute
);
CREATE OR REPLACE TABLE kafka_logs_avro
@@ -139,11 +139,10 @@ CREATE OR REPLACE TABLE kafka_logs_avro
`severity_text` String,
`severity_number` Int32,
`service_name` String,
`resource_attributes` Map(String, String),
`resource_id` String,
`resource_attributes` Map(LowCardinality(String), String),
`instrumentation_scope` String,
`event_name` String,
`attributes` Map(String, Nullable(String))
`attributes` Map(LowCardinality(String), String)
)
ENGINE = Kafka('kafka:9092', 'clickhouse_logs', 'clickhouse-logs-avro', 'Avro')
SETTINGS
@@ -152,15 +151,14 @@ SETTINGS
kafka_thread_per_consumer = 1,
kafka_num_consumers = 1,
kafka_poll_timeout_ms=15000,
kafka_poll_max_batch_size=1,
kafka_max_block_size=1;
kafka_poll_max_batch_size=10,
kafka_max_block_size=10;
drop table if exists kafka_logs_avro_mv;
CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs27
CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs31
(
`uuid` String,
`team_id` Int32,
`trace_id` String,
`span_id` String,
`trace_flags` Int32,
@@ -170,11 +168,10 @@ CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs27
`severity_text` String,
`severity_number` Int32,
`service_name` String,
`resource_attributes` Map(String, String),
`resource_id` String,
`resource_attributes` Map(LowCardinality(String), String),
`instrumentation_scope` String,
`event_name` String,
`attributes` Map(String, Nullable(String))
`attributes` Map(LowCardinality(String), String)
)
AS SELECT
* except (attributes, resource_attributes),

View File

@@ -111,7 +111,7 @@ services:
service: clickhouse
hostname: clickhouse
# Development performance optimizations
mem_limit: 4g
mem_limit: 6g
cpus: 2
environment:
- AWS_ACCESS_KEY_ID=object_storage_root_user

1
rust/Cargo.lock generated
View File

@@ -1574,7 +1574,6 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"sha2",
"thiserror 2.0.16",
"tokio",
"tonic 0.12.3",

View File

@@ -27,7 +27,6 @@ serde_json = { workspace = true }
metrics = { workspace = true }
base64 = { workspace = true }
rdkafka = { workspace = true }
sha2 = "0.10.8"
uuid = { workspace = true }
chrono = { workspace = true }
apache-avro = { version = "0.18.0", features = ["zstandard"] }

View File

@@ -41,14 +41,6 @@ pub const AVRO_SCHEMA: &str = r#"
"doc": "The timestamp when the event was observed or ingested, in microseconds since epoch."
},
{
"name": "created_at",
"type": ["null", {
"type": "long",
"logicalType": "timestamp-micros"
}],
"doc": "The timestamp when the record was created in the system, in microseconds since epoch."
},
{
"name": "body",
"type": ["null", "string"],
"doc": "The main content or message of the log."
@@ -77,11 +69,6 @@ pub const AVRO_SCHEMA: &str = r#"
"doc": "Attributes describing the resource that produced the log (e.g., host, region)."
},
{
"name": "resource_id",
"type": ["null", "string"],
"doc": "A unique identifier for the resource."
},
{
"name": "instrumentation_scope",
"type": ["null", "string"],
"doc": "The name of the library or framework that captured the log."
@@ -98,33 +85,6 @@ pub const AVRO_SCHEMA: &str = r#"
"values": "string"
}],
"doc": "A map of custom string-valued attributes associated with the log."
},
{
"name": "attributes_map_str",
"type": ["null", {
"type": "map",
"values": "string"
}],
"doc": "Additional map of string attributes."
},
{
"name": "attributes_map_float",
"type": ["null", {
"type": "map",
"values": "double"
}],
"doc": "Map of custom double-precision float attributes."
},
{
"name": "attributes_map_datetime",
"type": ["null", {
"type": "map",
"values": {
"type": "long",
"logicalType": "timestamp-millis"
}
}],
"doc": "Map of custom timestamp-valued attributes."
}
]
}"#;

View File

@@ -201,7 +201,7 @@ impl KafkaSink {
Codec::Zstandard(ZstandardSettings::default()),
);
for row in rows {
for row in &rows {
writer.append_ser(row)?;
}
@@ -226,6 +226,10 @@ impl KafkaSink {
.insert(Header {
key: "bytes_compressed",
value: Some(&payload.len().to_string()),
})
.insert(Header {
key: "record_count",
value: Some(&rows.len().to_string()),
}),
),
}) {

View File

@@ -16,7 +16,6 @@ use opentelemetry_proto::tonic::{
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use sha2::{Digest, Sha512};
use tracing::debug;
use uuid::Uuid;
@@ -30,20 +29,14 @@ pub struct KafkaLogRow {
pub timestamp: DateTime<Utc>,
#[serde(with = "ts_microseconds")]
pub observed_timestamp: DateTime<Utc>,
#[serde(with = "ts_microseconds")]
pub created_at: DateTime<Utc>,
pub body: String,
pub severity_text: String,
pub severity_number: i32,
pub service_name: String,
pub resource_attributes: HashMap<String, String>,
pub resource_id: String,
pub instrumentation_scope: String,
pub event_name: String,
pub attributes: HashMap<String, String>,
pub attributes_map_str: HashMap<String, String>,
pub attributes_map_float: HashMap<String, f64>,
pub attributes_map_datetime: HashMap<String, f64>,
}
impl KafkaLogRow {
@@ -81,7 +74,6 @@ impl KafkaLogRow {
severity_number = convert_severity_text_to_number(&severity_text);
}
let resource_id = extract_resource_id(&resource);
let resource_attributes = extract_resource_attributes(resource);
let mut attributes: HashMap<String, String> = record
@@ -123,29 +115,14 @@ impl KafkaLogRow {
trace_flags,
timestamp,
observed_timestamp,
created_at: observed_timestamp,
body,
severity_text,
severity_number,
resource_attributes: resource_attributes.into_iter().collect(),
instrumentation_scope,
event_name,
resource_id,
service_name,
attributes: attributes.clone(),
attributes_map_str: attributes
.clone()
.into_iter()
.map(|(k, v)| (k + "__str", extract_json_string(v)))
.collect(),
attributes_map_float: attributes
.clone()
.into_iter()
.map(|(k, v)| (k, extract_json_float(v)))
.filter(|(_, v)| v.is_some())
.map(|(k, v)| (k + "__float", v.unwrap()))
.collect(),
attributes_map_datetime: HashMap::new(),
attributes,
};
debug!("log: {:?}", log_row);
@@ -166,20 +143,6 @@ fn extract_string_from_map(attributes: &HashMap<String, String>, key: &str) -> S
}
}
fn extract_json_string(value: String) -> String {
match serde_json::from_str::<JsonValue>(&value) {
Ok(JsonValue::String(value)) => value,
_ => value,
}
}
fn extract_json_float(value: String) -> Option<f64> {
match serde_json::from_str::<JsonValue>(&value) {
Ok(JsonValue::Number(value)) => value.as_f64(),
_ => None,
}
}
fn extract_trace_id(input: &[u8]) -> [u8; 16] {
if input.len() == 16 {
let mut bytes = [0; 16];
@@ -274,26 +237,6 @@ fn extract_resource_attributes(resource: Option<Resource>) -> Vec<(String, Strin
.collect()
}
fn extract_resource_id(resource: &Option<Resource>) -> String {
let Some(resource) = resource else {
return "".to_string();
};
let mut hasher = Sha512::new();
for pair in resource.attributes.iter() {
hasher.update(pair.key.as_bytes());
hasher.update(
pair.value
.clone()
.map(any_value_to_json)
.map(|v| v.to_string())
.unwrap_or_default(),
);
}
format!("{:x}", hasher.finalize())
}
// TODO - pull this from PG
const SEVERITY_KEYS: [&str; 4] = ["level", "severity", "log.level", "config.log_level"];