feat: rust kafka timestamp lag exporter (#29461)

This commit is contained in:
Frank Hamand
2025-03-04 21:06:17 +00:00
committed by GitHub
parent c4e53550f6
commit 7fe44fd77e
10 changed files with 466 additions and 0 deletions

View File

@@ -37,6 +37,8 @@ jobs:
dockerfile: ./rust/Dockerfile-feature-flags
- image: batch-import-worker
dockerfile: ./rust/Dockerfile
- image: e2e-lag-exporter
dockerfile: ./rust/Dockerfile
runs-on: depot-ubuntu-22.04-4
permissions:
id-token: write # allow issuing OIDC tokens for this workflow run
@@ -55,6 +57,7 @@ jobs:
hook-migrator_digest: ${{ steps.digest.outputs.hook-migrator_digest }}
cymbal_digest: ${{ steps.digest.outputs.cymbal_digest }}
feature-flags_digest: ${{ steps.digest.outputs.feature-flags_digest }}
e2e-lag-exporter: ${{ steps.digest.outputs.e2e-lag-exporter }}
defaults:
run:
working-directory: rust

15
rust/Cargo.lock generated
View File

@@ -1849,6 +1849,21 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "e2e-lag-exporter"
version = "0.1.0"
dependencies = [
"anyhow",
"envconfig",
"futures",
"metrics",
"metrics-exporter-prometheus",
"rdkafka",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "ecdsa"
version = "0.14.8"

View File

@@ -21,6 +21,7 @@ members = [
"cyclotron-fetch",
"cymbal",
"posthog-cli",
"e2e-lag-exporter",
]
[workspace.lints.rust]

View File

@@ -0,0 +1,15 @@
[package]
name = "e2e-lag-exporter"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = { workspace = true }
envconfig = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
rdkafka = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@@ -0,0 +1,41 @@
# e2e-lag-exporter
A Rust service that monitors Kafka consumer group lag and exports metrics for Prometheus.
## Features
- Tracks the number of messages behind for a consumer group (message count lag)
- Tracks the time lag based on message timestamps (how old are the latest messages processed)
- Exports metrics in Prometheus format
## Configuration
Configuration is done via environment variables:
| Variable | Description | Default |
|----------|-------------|---------|
| `KAFKA_HOSTS` | Comma-separated list of Kafka brokers | `kafka:9092` |
| `KAFKA_CONSUMERGROUP` | Consumer group to monitor | _required_ |
| `CHECK_INTERVAL_MS` | How often to check lag (milliseconds) | `20000` |
| `METRICS_PORT` | Port to expose Prometheus metrics | `9090` |
| `LOG_LEVEL` | Logging level | `info` |
## Metrics
The service exposes the following metrics:
- `consumer_lag` - Number of messages behind for the consumer group (per topic/partition)
- `consumer_last_message_timestamp` - Timestamp of the last message received on the group (per topic/partition)
## Usage
```bash
# Build the service
cargo build --release
# Run with default settings
./target/release/e2e-lag-exporter
# Run with custom settings
KAFKA_HOSTS=localhost:9092 KAFKA_CONSUMERGROUP=my-group METRICS_PORT=9090 ./target/release/e2e-lag-exporter
```

View File

@@ -0,0 +1,33 @@
use envconfig::Envconfig;
use std::time::Duration;
use tracing::Level;
#[derive(Envconfig, Debug, Clone)]
pub struct Config {
#[envconfig(from = "KAFKA_HOSTS", default = "kafka:9092")]
pub kafka_hosts: String,
#[envconfig(from = "KAFKA_CONSUMERGROUP")]
pub kafka_consumer_group: String,
#[envconfig(from = "KAFKA_TOPIC")]
pub kafka_topic: String,
#[envconfig(from = "KAFKA_TLS", default = "false")]
pub kafka_tls: bool,
#[envconfig(from = "LOG_LEVEL", default = "info")]
pub log_level: Level,
#[envconfig(from = "METRICS_PORT", default = "9090")]
pub metrics_port: u16,
#[envconfig(from = "CHECK_INTERVAL_MS", default = "20000")]
pub lag_check_interval_ms: u64,
}
impl Config {
pub fn lag_check_interval(&self) -> Duration {
Duration::from_millis(self.lag_check_interval_ms)
}
}

View File

@@ -0,0 +1,246 @@
use anyhow::{Context, Result};
use futures::future::join_all;
use rdkafka::admin::AdminClient;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::Message;
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use rdkafka::util::Timeout;
use std::time::Duration;
use tracing::{debug, error, info};
use crate::config::Config;
use crate::metrics;
pub struct KafkaMonitor {
admin_client: AdminClient<rdkafka::client::DefaultClientContext>,
consumer: StreamConsumer,
config: Config, // Used for consumer group name and other config
message_consumer: StreamConsumer,
}
impl KafkaMonitor {
pub fn new(config: Config) -> Result<Self> {
let mut common_config = ClientConfig::new();
common_config
.set("bootstrap.servers", &config.kafka_hosts)
.set("session.timeout.ms", "6000");
if config.kafka_tls {
common_config
.set("security.protocol", "ssl")
.set("enable.ssl.certificate.verification", "false");
};
let admin_client: AdminClient<_> = common_config
.clone()
.create()
.context("Failed to create Kafka admin client")?;
let consumer: StreamConsumer = common_config
.clone()
.set("group.id", &config.kafka_consumer_group)
.set("enable.auto.commit", "false")
.create()
.context("Failed to create Kafka consumer")?;
// Create a consumer to get the message at the offset
let message_consumer: StreamConsumer = common_config
.clone()
.set(
"group.id",
format!("{}-lag-checker", &config.kafka_consumer_group),
)
.set("enable.auto.commit", "false")
.create()
.context("Failed to create message fetcher consumer")?;
let metadata = admin_client
.inner()
.fetch_metadata(None, Timeout::from(Duration::from_secs(10)))
.context("Failed to fetch Kafka metadata")?;
let topic = metadata
.topics()
.iter()
.find(|t| t.name() == config.kafka_topic.as_str())
.unwrap();
let topic_name = topic.name();
debug!("Checking lag for topic: {}", topic_name);
let mut tpl = TopicPartitionList::new();
for partition in topic.partitions() {
tpl.add_partition(topic_name, partition.id());
}
// Assign the consumer to the specific offset
message_consumer
.assign(&tpl)
.context("Failed to assign consumer to partition")?;
Ok(Self {
admin_client,
consumer,
config,
message_consumer,
})
}
pub async fn check_lag(&self) -> Result<()> {
let metadata = self
.admin_client
.inner()
.fetch_metadata(None, Timeout::from(Duration::from_secs(10)))
.context("Failed to fetch Kafka metadata")?;
let topic = metadata
.topics()
.iter()
.find(|t| t.name() == self.config.kafka_topic)
.unwrap();
let topic_name = topic.name();
debug!("Checking lag for topic: {}", topic_name);
let mut tpl = TopicPartitionList::new();
for partition in topic.partitions() {
tpl.add_partition(topic_name, partition.id());
}
// Get consumer group offsets
match self
.consumer
.committed_offsets(tpl.clone(), Timeout::from(Duration::from_secs(10)))
{
Ok(committed_tpl) => {
let mut futes: Vec<_> = vec![];
// Process each partition
for tpl_elem in committed_tpl.elements() {
let partition_id = tpl_elem.partition();
let topic_name = tpl_elem.topic();
if let Offset::Offset(consumer_offset) = tpl_elem.offset() {
// Get high watermark (latest offset)
match self.consumer.fetch_watermarks(
topic_name,
partition_id,
Timeout::from(Duration::from_secs(10)),
) {
Ok((_, high_watermark)) => {
// Calculate lag in messages
let lag = if high_watermark >= consumer_offset {
high_watermark - consumer_offset
} else {
0
};
info!(
"Topic: {}, Partition: {}, Consumer offset: {}, High watermark: {}, Lag: {}",
topic_name, partition_id, consumer_offset, high_watermark, lag
);
// Record message count lag metric
metrics::record_lag_count(
topic_name,
partition_id,
&self.config.kafka_consumer_group,
lag,
);
let topic_name_owned = topic_name.to_owned();
// concurrently fetch messages, there is one per partition so e.g. 128 fetches
futes.push(async move {
// Fetch the message at consumer offset to get timestamp
match self.fetch_message_at_offset(
&topic_name_owned,
partition_id,
consumer_offset
).await {
Ok(Some((_, Some(timestamp)))) => {
metrics::record_timestamp(
&topic_name_owned,
partition_id,
&self.config.kafka_consumer_group,
timestamp
);
}
Ok(_) => {
debug!(
"Could not determine timestamp for offset {} in {}/{}",
consumer_offset, topic_name_owned, partition_id
);
}
Err(e) => {
error!(
"Error fetching message: {:?}", e
);
}
}
})
}
Err(e) => {
error!(
"Failed to fetch watermarks for {}/{}: {:?}",
topic_name, partition_id, e
);
}
}
} else {
debug!("No offset for {}/{}", topic_name, partition_id);
}
}
join_all(futes).await;
}
Err(e) => {
error!("Error fetching committed offsets: {:?}", e);
}
}
Ok(())
}
async fn fetch_message_at_offset(
&self,
topic: &str,
partition: i32,
consumer_offset: i64,
) -> Result<Option<(i64, Option<i64>)>> {
self.message_consumer
.seek(
topic,
partition,
Offset::Offset(consumer_offset.saturating_sub(1)),
Duration::from_secs(5),
)
.context("Failed to seek messageconsumer")?;
// Try to get a single message at the consumer offset
let timeout = Duration::from_secs(1);
match tokio::time::timeout(timeout, self.message_consumer.recv()).await {
Ok(result) => match result {
Ok(msg) => {
// Return the message offset and timestamp (if available)
let offset = msg.offset();
let timestamp = msg.timestamp().to_millis();
debug!(
"Message found at offset {} with timestamp {:?}",
offset, timestamp
);
Ok(Some((offset, timestamp)))
}
Err(e) => {
error!("Failed to get message: {:?}", e);
Ok(None)
}
},
Err(_) => {
debug!("Timeout waiting for message at offset {}", consumer_offset);
Ok(None)
}
}
}
}

View File

@@ -0,0 +1,7 @@
pub mod config;
pub mod kafka;
pub mod metrics;
// Re-export main modules for library users
pub use config::Config;
pub use kafka::KafkaMonitor;

View File

@@ -0,0 +1,60 @@
mod config;
mod kafka;
mod metrics;
use anyhow::{Context, Result};
use config::Config;
use envconfig::Envconfig;
use kafka::KafkaMonitor;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::net::SocketAddr;
use tokio::time;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<()> {
// Load configuration from environment variables
let config = Config::init_from_env().context("Failed to load configuration")?;
// Setup tracing
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_max_level(config.log_level)
.init();
info!("Starting e2e-lag-exporter with config: {:?}", config);
// Register metrics
metrics::register_metrics();
// Setup Prometheus metrics exporter
let metrics_addr = format!("0.0.0.0:{}", config.metrics_port).parse::<SocketAddr>()?;
PrometheusBuilder::new()
.with_http_listener(metrics_addr)
.install()
.context("Failed to install Prometheus metrics exporter")?;
info!("Metrics server listening on {}", metrics_addr);
// Create Kafka monitor
let kafka_monitor =
KafkaMonitor::new(config.clone()).context("Failed to create Kafka monitor")?;
info!("Monitoring consumer group: {}", config.kafka_consumer_group);
// Main loop: check lag at configured intervals
let mut interval = time::interval(config.lag_check_interval());
loop {
interval.tick().await;
match kafka_monitor.check_lag().await {
Ok(_) => {
info!("Successfully checked consumer lag");
}
Err(e) => {
error!("Error checking consumer lag: {:?}", e);
}
}
}
}

View File

@@ -0,0 +1,45 @@
use metrics::{describe_gauge, gauge};
// Metrics constants
pub const METRIC_CONSUMER_LAG: &str = "consumer_lag";
pub const METRIC_CONSUMER_TIMESTAMP: &str = "consumer_last_message_timestamp";
/// Register all metrics with descriptions
pub fn register_metrics() {
describe_gauge!(
METRIC_CONSUMER_LAG,
"Number of messages behind for the consumer group"
);
describe_gauge!(
METRIC_CONSUMER_TIMESTAMP,
"Timestamp of the last message consumed by the consumer group"
);
}
/// Record the consumer lag count metric
pub fn record_lag_count(topic: &str, partition: i32, consumergroup: &str, lag: i64) {
let topic_owned = topic.to_string();
let partition_str = format!("{}", partition);
let consumergroup_owned = consumergroup.to_string();
gauge!(METRIC_CONSUMER_LAG,
"topic" => topic_owned,
"partition" => partition_str,
"consumergroup" => consumergroup_owned,
)
.set(lag as f64);
}
/// Record the consumer lag time metric in milliseconds
pub fn record_timestamp(topic: &str, partition: i32, consumergroup: &str, timestamp: i64) {
let topic_owned = topic.to_string();
let partition_str = format!("{}", partition);
let consumergroup_owned = consumergroup.to_string();
gauge!(METRIC_CONSUMER_TIMESTAMP,
"topic" => topic_owned,
"partition" => partition_str,
"consumergroup" => consumergroup_owned,
)
.set(timestamp as f64);
}