feat(logs): add bytes headers to log capture producer (#40675)

This commit is contained in:
Frank Hamand
2025-10-31 10:37:56 +00:00
committed by GitHub
parent 2dbf18cadf
commit 08b6045212
4 changed files with 26 additions and 26 deletions

View File

@@ -125,22 +125,6 @@ pub const AVRO_SCHEMA: &str = r#"
}
}],
"doc": "Map of custom timestamp-valued attributes."
},
{
"name": "attribute_keys",
"type": ["null", {
"type": "array",
"items": "string"
}],
"doc": "An ordered list of attribute keys."
},
{
"name": "attribute_values",
"type": ["null", {
"type": "array",
"items": "string"
}],
"doc": "An ordered list of attribute values, corresponding to attribute_keys."
}
]
}"#;

View File

@@ -188,7 +188,12 @@ impl KafkaSink {
self.producer.flush(Duration::new(30, 0))
}
pub async fn write(&self, token: &str, rows: Vec<KafkaLogRow>) -> Result<(), anyhow::Error> {
pub async fn write(
&self,
token: &str,
rows: Vec<KafkaLogRow>,
uncompressed_bytes: u64,
) -> Result<(), anyhow::Error> {
let schema = Schema::parse_str(AVRO_SCHEMA)?;
let mut writer = Writer::with_codec(
&schema,
@@ -208,10 +213,21 @@ impl KafkaSink {
partition: None,
key: None::<Vec<u8>>.as_ref(),
timestamp: None,
headers: Some(OwnedHeaders::new().insert(Header {
key: "token",
value: Some(&token.to_string()),
})),
headers: Some(
OwnedHeaders::new()
.insert(Header {
key: "token",
value: Some(&token.to_string()),
})
.insert(Header {
key: "bytes_uncompressed",
value: Some(&uncompressed_bytes.to_string()),
})
.insert(Header {
key: "bytes_compressed",
value: Some(&payload.len().to_string()),
}),
),
}) {
Err((err, _)) => Err(anyhow!(format!("kafka error: {}", err))),
Ok(delivery_future) => Ok(delivery_future),

View File

@@ -44,8 +44,6 @@ pub struct KafkaLogRow {
pub attributes_map_str: HashMap<String, String>,
pub attributes_map_float: HashMap<String, f64>,
pub attributes_map_datetime: HashMap<String, f64>,
pub attribute_keys: Vec<String>,
pub attribute_values: Vec<String>,
}
impl KafkaLogRow {
@@ -148,8 +146,6 @@ impl KafkaLogRow {
.map(|(k, v)| (k + "__float", v.unwrap()))
.collect(),
attributes_map_datetime: HashMap::new(),
attribute_keys: attributes.keys().cloned().collect(),
attribute_values: attributes.values().cloned().collect(),
};
debug!("log: {:?}", log_row);

View File

@@ -81,7 +81,11 @@ pub async fn export_logs_http(
}
}
if let Err(e) = service.sink.write(token.unwrap(), rows).await {
if let Err(e) = service
.sink
.write(token.unwrap(), rows, body.len() as u64)
.await
{
error!("Failed to send logs to Kafka: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,