feat(logs): refactor logs queries again (#40731)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Frank Hamand
2025-11-04 11:49:01 +00:00
committed by GitHub
parent bbe0f444d7
commit bfb4dbf014
6 changed files with 100 additions and 60 deletions

View File

@@ -1,43 +1,80 @@
CREATE OR REPLACE FUNCTION extractIPv4Substrings AS
(
body -> extractAll(body, '(\d\.((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){2,2}([0-9]))')
);
-- temporary sql to initialise log tables for local development
-- will be removed once we have migrations set up
CREATE TABLE if not exists logs16
CREATE TABLE if not exists logs27
(
`uuid` String,
`team_id` Int32,
`trace_id` String,
`span_id` String,
`trace_flags` Int32,
`timestamp` DateTime64(6),
`observed_timestamp` DateTime64(6),
`created_at` DateTime64(6),
`body` String,
`severity_text` String,
`severity_number` Int32,
`service_name` String,
`resource_attributes` Map(String, String),
`resource_id` String,
`instrumentation_scope` String,
`event_name` String,
`attributes` Map(String, String),
`attributes_map_str` Map(String, String),
`attributes_map_float` Map(String, Float64),
`attributes_map_datetime` Map(String, DateTime64(6)),
`attribute_keys` Array(String),
`attribute_values` Array(String),
`time_bucket` DateTime MATERIALIZED toStartOfInterval(timestamp, toIntervalMinute(5)) CODEC(DoubleDelta, ZSTD(1)),
`uuid` String CODEC(ZSTD(1)),
`team_id` Int32 CODEC(ZSTD(1)),
`trace_id` String CODEC(ZSTD(1)),
`span_id` String CODEC(ZSTD(1)),
`trace_flags` Int32 CODEC(ZSTD(1)),
`timestamp` DateTime64(6) CODEC(DoubleDelta, ZSTD(1)),
`observed_timestamp` DateTime64(6) CODEC(DoubleDelta, ZSTD(1)),
`created_at` DateTime64(6) MATERIALIZED now() CODEC(DoubleDelta, ZSTD(1)),
`body` String CODEC(ZSTD(1)),
`severity_text` String CODEC(ZSTD(1)),
`severity_number` Int32 CODEC(ZSTD(1)),
`service_name` String CODEC(ZSTD(1)),
`resource_attributes` Map(String, String) CODEC(ZSTD(1)),
`resource_id` String CODEC(ZSTD(1)),
`instrumentation_scope` String CODEC(ZSTD(1)),
`event_name` String CODEC(ZSTD(1)),
`attributes_map_str` Map(String, String) CODEC(ZSTD(1)),
`attributes_map_float` Map(String, Float64) CODEC(ZSTD(1)),
`attributes_map_datetime` Map(String, DateTime64(6)) CODEC(ZSTD(1)),
`level` String ALIAS severity_text,
`mat_body_ipv4_matches` Array(String) ALIAS extractAll(body, '(\\d\\.((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){2,2}([0-9]))'),
`time_minute` DateTime ALIAS toStartOfMinute(timestamp),
`attributes` Map(String, String) ALIAS mapApply((k, v) -> (k, toJSONString(v)), attributes_map_str),
INDEX idx_severity_text_set severity_text TYPE set(10) GRANULARITY 1,
INDEX idx_attributes_str_keys mapKeys(attributes_map_str) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body_ngram body TYPE ngrambf_v1(3, 20000, 4, 0) GRANULARITY 1
INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_mat_body_ipv4_matches mat_body_ipv4_matches TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body_ngram3 body TYPE ngrambf_v1(3, 20000, 2, 0) GRANULARITY 1,
CONSTRAINT assume_time_bucket ASSUME toStartOfInterval(timestamp, toIntervalMinute(5)) = time_bucket,
PROJECTION projection_trace_span
(
SELECT
trace_id,
timestamp,
_part_offset
ORDER BY
trace_id,
timestamp
),
PROJECTION projection_aggregate_counts
(
SELECT
team_id,
time_bucket,
toStartOfMinute(timestamp),
service_name,
severity_text,
resource_attributes,
resource_id,
count() AS event_count
GROUP BY
team_id,
time_bucket,
toStartOfMinute(timestamp),
service_name,
severity_text,
resource_attributes,
resource_id
)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/logs16', '{replica}')
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/logs27', '{replica}')
PARTITION BY toDate(timestamp)
ORDER BY (team_id, toStartOfMinute(timestamp) DESC, service_name, severity_text, toUnixTimestamp(timestamp) DESC, trace_id, span_id)
SETTINGS
allow_remote_fs_zero_copy_replication = 1,
allow_experimental_reverse_key = 1;
ORDER BY (team_id, time_bucket DESC, service_name, resource_attributes, severity_text, timestamp DESC, uuid, trace_id, span_id)
SETTINGS allow_remote_fs_zero_copy_replication = 1,
allow_experimental_reverse_key = 1,
deduplicate_merge_projection_mode = 'ignore';
create or replace TABLE logs AS logs16 ENGINE = Distributed('posthog', 'default', 'logs16');
create or replace TABLE logs AS logs27 ENGINE = Distributed('posthog', 'default', 'logs27');
create table if not exists log_attributes
@@ -82,7 +119,7 @@ FROM (select
attribute.1 AS attribute_key,
CAST(JSONExtract(attribute.2, 'Dynamic'), 'String') AS attribute_value,
sumSimpleState(1) AS attribute_count
FROM logs16
FROM logs27
GROUP BY
team_id,
time_bucket,
@@ -98,7 +135,6 @@ CREATE OR REPLACE TABLE kafka_logs_avro
`trace_flags` Int32,
`timestamp` DateTime64(6),
`observed_timestamp` DateTime64(6),
`created_at` DateTime64(6),
`body` String,
`severity_text` String,
`severity_number` Int32,
@@ -107,12 +143,7 @@ CREATE OR REPLACE TABLE kafka_logs_avro
`resource_id` String,
`instrumentation_scope` String,
`event_name` String,
`attributes` Map(String, Nullable(String)),
`attributes_map_str` Map(String, Nullable(String)),
`attributes_map_float` Map(String, Nullable(Float64)),
`attributes_map_datetime` Map(String, Nullable(DateTime64(6))),
`attribute_keys` Array(Nullable(String)),
`attribute_values` Array(Nullable(String))
`attributes` Map(String, Nullable(String))
)
ENGINE = Kafka('kafka:9092', 'clickhouse_logs', 'clickhouse-logs-avro', 'Avro')
SETTINGS
@@ -121,12 +152,12 @@ SETTINGS
kafka_thread_per_consumer = 1,
kafka_num_consumers = 1,
kafka_poll_timeout_ms=15000,
kafka_poll_max_batch_size=100,
kafka_max_block_size=1000;
kafka_poll_max_batch_size=1,
kafka_max_block_size=1;
drop table if exists kafka_logs_avro_mv;
CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs16
CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs27
(
`uuid` String,
`team_id` Int32,
@@ -135,7 +166,6 @@ CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs16
`trace_flags` Int32,
`timestamp` DateTime64(6),
`observed_timestamp` DateTime64(6),
`created_at` DateTime64(6),
`body` String,
`severity_text` String,
`severity_number` Int32,
@@ -144,16 +174,15 @@ CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs16
`resource_id` String,
`instrumentation_scope` String,
`event_name` String,
`attributes` Map(String, Nullable(String)),
`attributes_map_str` Map(String, Nullable(String)),
`attributes_map_float` Map(String, Nullable(Float64)),
`attributes_map_datetime` Map(String, Nullable(DateTime64(6))),
`attribute_keys` Array(Nullable(String)),
`attribute_values` Array(Nullable(String))
`attributes` Map(String, Nullable(String))
)
AS SELECT
*,
* except (attributes, resource_attributes),
mapSort(mapApply((k,v) -> (concat(k, '__str'), JSONExtractString(v)), attributes)) as attributes_map_str,
mapSort(mapFilter((k, v) -> isNotNull(v), mapApply((k,v) -> (concat(k, '__float'), toFloat64OrNull(JSONExtract(v, 'String'))), attributes))) as attributes_map_float,
mapSort(mapFilter((k, v) -> isNotNull(v), mapApply((k,v) -> (concat(k, '__datetime'), parseDateTimeBestEffortOrNull(JSONExtract(v, 'String'), 6)), attributes))) as attributes_map_datetime,
mapSort(resource_attributes) as resource_attributes,
toInt32OrZero(_headers.value[indexOf(_headers.name, 'team_id')]) as team_id
FROM kafka_logs_avro settings materialize_skip_indexes_on_insert = 1, distributed_background_insert_sleep_time_ms=5000, distributed_background_insert_batch=true;
FROM kafka_logs_avro;
select 'clickhouse logs tables initialised successfully!';

View File

@@ -17,6 +17,7 @@ class LogsTable(Table):
"body": StringDatabaseField(name="body", nullable=False),
"attributes": StringJSONDatabaseField(name="attributes", nullable=False),
"time_bucket": DateTimeDatabaseField(name="time_bucket", nullable=False),
"time_minute": DateTimeDatabaseField(name="time_minute", nullable=False),
"timestamp": DateTimeDatabaseField(name="timestamp", nullable=False),
"observed_timestamp": DateTimeDatabaseField(name="observed_timestamp", nullable=False),
"severity_text": StringDatabaseField(name="severity_text", nullable=False),
@@ -29,6 +30,7 @@ class LogsTable(Table):
# internal fields for query optimization
"_part_starting_offset": IntegerDatabaseField(name="_part_starting_offset", nullable=True, hidden=True),
"_part_offset": IntegerDatabaseField(name="_part_offset", nullable=True, hidden=True),
"mat_body_ipv4_matches": StringJSONDatabaseField(name="mat_body_ipv4_matches", nullable=True, hidden=True),
}
def to_printed_clickhouse(self, context):

View File

@@ -168,6 +168,8 @@ HOGQL_CLICKHOUSE_FUNCTIONS: dict[str, HogQLFunctionMeta] = {
"formatReadableTimeDelta": HogQLFunctionMeta("formatReadableTimeDelta", 1, 2),
"least": HogQLFunctionMeta("least", 2, 2, case_sensitive=False),
"greatest": HogQLFunctionMeta("greatest", 2, 2, case_sensitive=False),
"indexHint": HogQLFunctionMeta("indexHint", 1, 1),
"extractIPv4Substrings": HogQLFunctionMeta("extractIPv4Substrings", 1, 1),
# time window
"tumble": HogQLFunctionMeta("tumble", 2, 2),
"hop": HogQLFunctionMeta("hop", 3, 3),

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,3 @@
import json
import datetime as dt
from zoneinfo import ZoneInfo
@@ -33,6 +32,7 @@ class LogsQueryRunner(AnalyticsQueryRunner[LogsQueryResponse]):
def __init__(self, query, *args, **kwargs):
# defensive copy of query because we mutate it
super().__init__(query.model_copy(deep=True), *args, **kwargs)
assert isinstance(self.query, LogsQuery)
self.paginator = HogQLHasMorePaginator.from_limit_context(
limit_context=LimitContext.QUERY,
@@ -109,7 +109,7 @@ class LogsQueryRunner(AnalyticsQueryRunner[LogsQueryResponse]):
"trace_id": result[1],
"span_id": result[2],
"body": result[3],
"attributes": {k: json.loads(v) for k, v in result[4].items()},
"attributes": result[4],
"timestamp": result[5].replace(tzinfo=ZoneInfo("UTC")),
"observed_timestamp": result[6].replace(tzinfo=ZoneInfo("UTC")),
"severity_text": result[7],
@@ -144,7 +144,6 @@ class LogsQueryRunner(AnalyticsQueryRunner[LogsQueryResponse]):
query.order_by = [
parse_order_expr("team_id"),
parse_order_expr(f"time_bucket {order_dir}"),
parse_order_expr(f"toUnixTimestamp(timestamp) {order_dir}"),
parse_order_expr(f"timestamp {order_dir}"),
]
final_query = parse_select(
@@ -163,11 +162,11 @@ class LogsQueryRunner(AnalyticsQueryRunner[LogsQueryResponse]):
resource_attributes,
instrumentation_scope,
event_name
FROM logs where (_part_starting_offset+_part_offset) in (select 1)
"""
FROM logs where (_part_starting_offset+_part_offset) in ({query})
""",
placeholders={"query": query},
)
assert isinstance(final_query, ast.SelectQuery)
final_query.where.right = query # type: ignore
final_query.order_by = [parse_order_expr(f"timestamp {order_dir}")]
return final_query
@@ -205,6 +204,14 @@ class LogsQueryRunner(AnalyticsQueryRunner[LogsQueryResponse]):
placeholders={"searchTerm": ast.Constant(value=f"%{self.query.searchTerm}%")},
)
)
# ip addresses are particularly bad at full text searches with our ngram 3 index
# match them separately against a materialized column of ip addresses
exprs.append(
parse_expr(
"indexHint(hasAll(mat_body_ipv4_matches, extractIPv4Substrings({searchTerm})))",
placeholders={"searchTerm": ast.Constant(value=f"{self.query.searchTerm}")},
)
)
if self.query.filterGroup:
exprs.append(property_to_expr(self.query.filterGroup, team=self.team))
@@ -249,7 +256,7 @@ class LogsQueryRunner(AnalyticsQueryRunner[LogsQueryResponse]):
# the min interval is 1 minute and max interval is 1 day
interval_count = find_closest(
_step.total_seconds(),
[1, 5] + [x * 60 for x in [1, 2, 5, 10, 15, 30, 60, 120, 240, 360, 720, 1440]],
[1, 5, 10] + [x * 60 for x in [1, 2, 5, 10, 15, 30, 60, 120, 240, 360, 720, 1440]],
)
if _step >= dt.timedelta(minutes=1):

View File

@@ -69,7 +69,7 @@ class SparklineQueryRunner(LogsQueryRunner):
""",
placeholders={
**self.query_date_range.to_placeholders(),
"time_field": ast.Field(chain=["time_bucket"])
"time_field": ast.Field(chain=["time_minute"])
if self.query_date_range.interval_name != "second"
else ast.Field(chain=["timestamp"]),
"where": self.where(),