feat(kafka-deduplicator): Implement incremental checkpointing (#39643)

Co-authored-by: Eli Reisman <eli.r@posthog.com>
This commit is contained in:
José Sequeira
2025-10-15 00:54:46 +02:00
committed by GitHub
parent 06102dae27
commit 1fa8ca8707
17 changed files with 1282 additions and 1009 deletions

View File

@@ -101,7 +101,7 @@ impl<C: ClientContext> TransactionalProducer<C> {
})
}
pub fn begin(&mut self) -> Result<KafkaTransaction<C>, KafkaError> {
pub fn begin(&mut self) -> Result<KafkaTransaction<'_, C>, KafkaError> {
self.inner.begin_transaction()?;
Ok(KafkaTransaction { producer: self })
}

View File

@@ -1,7 +1,7 @@
use std::path::Path;
use std::time::Instant;
use super::{CheckpointConfig, CheckpointMode, CheckpointUploader};
use super::{CheckpointConfig, CheckpointMode, CheckpointPlan, CheckpointUploader};
use anyhow::Result;
use metrics;
@@ -27,14 +27,7 @@ impl CheckpointExporter {
checkpoint_name: &str,
mode: CheckpointMode,
) -> Result<String> {
let remote_key_prefix = if mode == CheckpointMode::Full {
format!("{}/full/{}", self.config.s3_key_prefix, checkpoint_name)
} else {
format!(
"{}/incremental/{}",
self.config.s3_key_prefix, checkpoint_name
)
};
let remote_key_prefix = format!("{}/{}", self.config.s3_key_prefix, checkpoint_name);
let local_path_tag = local_checkpoint_path.to_string_lossy().to_string();
// Upload to remote storage in background
@@ -92,6 +85,72 @@ impl CheckpointExporter {
}
}
/// Export checkpoint using a plan with incremental deduplication
pub async fn export_checkpoint_with_plan(
&self,
plan: &CheckpointPlan,
checkpoint_name: &str,
mode: CheckpointMode,
) -> Result<String> {
let remote_key_prefix = format!("{}/{}", self.config.s3_key_prefix, checkpoint_name);
if !self.is_available().await {
let tags = [("mode", mode.as_str()), ("result", "unavailable")];
metrics::counter!(CHECKPOINT_UPLOADS_COUNTER, &tags).increment(1);
warn!(
remote_path = remote_key_prefix,
checkpoint_mode = mode.as_str(),
"Export failed: uploader not available"
);
return Err(anyhow::anyhow!("Uploader not available"));
}
let upload_start = Instant::now();
match self
.uploader
.upload_checkpoint_with_plan(plan, &remote_key_prefix)
.await
{
Ok(uploaded_files) => {
let upload_duration = upload_start.elapsed();
let total_files = plan.metadata.files.len();
let new_files = plan.files_to_upload.len();
let reused_files = total_files - new_files;
let tags = [("mode", mode.as_str()), ("result", "success")];
metrics::histogram!(CHECKPOINT_UPLOAD_DURATION_HISTOGRAM, &tags)
.record(upload_duration.as_secs_f64());
metrics::counter!(CHECKPOINT_UPLOADS_COUNTER, &tags).increment(1);
info!(
remote_path = remote_key_prefix,
uploaded_file_count = uploaded_files.len(),
total_files = total_files,
new_files = new_files,
reused_files = reused_files,
elapsed_seconds = upload_duration.as_secs_f64(),
checkpoint_mode = mode.as_str(),
"Export successful: checkpoint uploaded with deduplication",
);
Ok(remote_key_prefix.to_string())
}
Err(e) => {
let tags = [("mode", mode.as_str()), ("result", "error")];
metrics::counter!(CHECKPOINT_UPLOADS_COUNTER, &tags).increment(1);
error!(
remote_path = remote_key_prefix,
checkpoint_mode = mode.as_str(),
"Export failed: uploading checkpoint: {}",
e
);
Err(e)
}
}
}
pub async fn is_available(&self) -> bool {
self.uploader.is_available().await
}

View File

@@ -1,384 +0,0 @@
use anyhow::{Context, Result};
use std::path::PathBuf;
use tracing::{info, warn};
use super::client::CheckpointClient;
use super::metadata::CheckpointInfo;
/// Handles loading checkpoints from remote storage
#[derive(Debug)]
pub struct CheckpointLoader<C: CheckpointClient> {
client: C,
local_base_dir: String,
}
impl<C: CheckpointClient> CheckpointLoader<C> {
pub fn new(client: C, local_base_dir: String) -> Self {
Self {
client,
local_base_dir,
}
}
/// Load the latest checkpoint for a partition
/// Returns the loaded checkpoint info and local path, or None if no checkpoint exists
pub async fn load_latest_checkpoint(
&self,
topic: &str,
partition: i32,
) -> Result<Option<(CheckpointInfo, PathBuf)>> {
info!(
"Loading latest checkpoint for topic {} partition {}",
topic, partition
);
// List available checkpoints
let checkpoint_infos = self
.client
.list_checkpoint_metadata(topic, partition)
.await
.context("Failed to list checkpoint metadata")?;
if checkpoint_infos.is_empty() {
info!(
"No checkpoints found for topic {} partition {}",
topic, partition
);
return Ok(None);
}
// Get the latest checkpoint (list is already sorted newest first)
let latest_checkpoint = &checkpoint_infos[0];
info!(
"Found latest checkpoint: timestamp {}, type {:?}, {} files",
latest_checkpoint.metadata.timestamp,
latest_checkpoint.metadata.checkpoint_type,
latest_checkpoint.metadata.files.len()
);
// Create local directory for this checkpoint
let local_checkpoint_dir = PathBuf::from(&self.local_base_dir)
.join(topic)
.join(partition.to_string())
.join(latest_checkpoint.metadata.timestamp.to_string());
// Download the checkpoint
self.client
.download_checkpoint(latest_checkpoint, &local_checkpoint_dir)
.await
.context("Failed to download checkpoint")?;
info!(
"Successfully loaded checkpoint to {:?}",
local_checkpoint_dir
);
Ok(Some((latest_checkpoint.clone(), local_checkpoint_dir)))
}
/// Load a specific checkpoint by timestamp
pub async fn load_checkpoint_by_timestamp(
&self,
topic: &str,
partition: i32,
timestamp: u64,
) -> Result<Option<(CheckpointInfo, PathBuf)>> {
info!(
"Loading checkpoint for topic {} partition {} timestamp {}",
topic, partition, timestamp
);
// List available checkpoints
let checkpoint_infos = self
.client
.list_checkpoint_metadata(topic, partition)
.await
.context("Failed to list checkpoint metadata")?;
// Find the specific checkpoint
let target_checkpoint = checkpoint_infos
.into_iter()
.find(|cp| cp.metadata.timestamp == timestamp);
let Some(checkpoint_info) = target_checkpoint else {
warn!("Checkpoint not found for timestamp {}", timestamp);
return Ok(None);
};
// Create local directory for this checkpoint
let local_checkpoint_dir = PathBuf::from(&self.local_base_dir)
.join(topic)
.join(partition.to_string())
.join(timestamp.to_string());
// Download the checkpoint
self.client
.download_checkpoint(&checkpoint_info, &local_checkpoint_dir)
.await
.context("Failed to download checkpoint")?;
info!(
"Successfully loaded checkpoint timestamp {} to {:?}",
timestamp, local_checkpoint_dir
);
Ok(Some((checkpoint_info, local_checkpoint_dir)))
}
/// Check if a checkpoint exists locally and is complete
pub async fn is_checkpoint_complete(&self, checkpoint_info: &CheckpointInfo) -> Result<bool> {
let local_checkpoint_dir = PathBuf::from(&self.local_base_dir)
.join(&checkpoint_info.metadata.topic)
.join(checkpoint_info.metadata.partition.to_string())
.join(checkpoint_info.metadata.timestamp.to_string());
// Check if metadata file exists
let metadata_path = local_checkpoint_dir.join("metadata.json");
if !metadata_path.exists() {
return Ok(false);
}
// Check if all expected files exist
for file in &checkpoint_info.metadata.files {
let file_path = local_checkpoint_dir.join(&file.path);
if !file_path.exists() {
return Ok(false);
}
// Optionally verify file size
let file_metadata = tokio::fs::metadata(&file_path)
.await
.context("Failed to get file metadata")?;
if file_metadata.len() != file.size_bytes {
warn!(
"File size mismatch for {}: expected {}, got {}",
file.path,
file.size_bytes,
file_metadata.len()
);
return Ok(false);
}
}
Ok(true)
}
/// Get local checkpoint directory path for a checkpoint
pub fn get_local_checkpoint_path(&self, checkpoint_info: &CheckpointInfo) -> PathBuf {
PathBuf::from(&self.local_base_dir)
.join(&checkpoint_info.metadata.topic)
.join(checkpoint_info.metadata.partition.to_string())
.join(checkpoint_info.metadata.timestamp.to_string())
}
/// List all available checkpoints for a topic/partition
pub async fn list_available_checkpoints(
&self,
topic: &str,
partition: i32,
) -> Result<Vec<CheckpointInfo>> {
self.client
.list_checkpoint_metadata(topic, partition)
.await
.context("Failed to list checkpoint metadata")
}
/// Cleanup old local checkpoints, keeping only the specified count
pub async fn cleanup_local_checkpoints(
&self,
topic: &str,
partition: i32,
keep_count: usize,
) -> Result<()> {
let partition_dir = PathBuf::from(&self.local_base_dir)
.join(topic)
.join(partition.to_string());
if !partition_dir.exists() {
return Ok(());
}
// Read all checkpoint directories (they should be named by timestamp)
let mut entries = tokio::fs::read_dir(&partition_dir)
.await
.context("Failed to read partition directory")?;
let mut checkpoint_dirs = Vec::new();
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
if let Some(name) = entry.file_name().to_str() {
if let Ok(timestamp) = name.parse::<u64>() {
checkpoint_dirs.push((timestamp, entry.path()));
}
}
}
}
// Sort by timestamp (newest first)
checkpoint_dirs.sort_by(|a, b| b.0.cmp(&a.0));
// Remove old directories
if checkpoint_dirs.len() > keep_count {
let dirs_to_remove: Vec<_> = checkpoint_dirs.into_iter().skip(keep_count).collect();
for (_timestamp, path) in dirs_to_remove {
info!("Removing old local checkpoint: {:?}", path);
if let Err(e) = tokio::fs::remove_dir_all(&path).await {
warn!("Failed to remove checkpoint directory {:?}: {}", path, e);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::checkpoint::metadata::{CheckpointMetadata, CheckpointType};
use crate::kafka::types::Partition;
use async_trait::async_trait;
use std::collections::HashMap;
use std::path::Path;
use tempfile::TempDir;
#[derive(Debug, Clone)]
struct MockClient {
checkpoints: HashMap<Partition, Vec<CheckpointInfo>>,
}
impl MockClient {
fn new() -> Self {
Self {
checkpoints: HashMap::new(),
}
}
fn add_checkpoint(&mut self, topic: &str, partition: i32, checkpoint: CheckpointInfo) {
self.checkpoints
.entry(Partition::new(topic.to_string(), partition))
.or_default()
.push(checkpoint);
}
}
#[async_trait]
impl CheckpointClient for MockClient {
async fn list_checkpoint_metadata(
&self,
topic: &str,
partition: i32,
) -> Result<Vec<CheckpointInfo>> {
let mut checkpoints = self
.checkpoints
.get(&Partition::new(topic.to_string(), partition))
.cloned()
.unwrap_or_default();
// Sort by timestamp (newest first)
checkpoints.sort_by(|a, b| b.metadata.timestamp.cmp(&a.metadata.timestamp));
Ok(checkpoints)
}
async fn download_checkpoint(
&self,
_checkpoint_info: &CheckpointInfo,
local_path: &Path,
) -> Result<()> {
tokio::fs::create_dir_all(local_path).await?;
// Create a dummy metadata.json file
let metadata_path = local_path.join("metadata.json");
tokio::fs::write(&metadata_path, "{}").await?;
Ok(())
}
async fn get_checkpoint_metadata(&self, _metadata_key: &str) -> Result<CheckpointMetadata> {
Ok(CheckpointMetadata::new(
CheckpointType::Full,
"test".to_string(),
0,
100,
50,
1000,
))
}
async fn checkpoint_exists(&self, _checkpoint_info: &CheckpointInfo) -> Result<bool> {
Ok(true)
}
async fn is_available(&self) -> bool {
true
}
}
#[tokio::test]
async fn test_load_latest_checkpoint() {
let mut mock_client = MockClient::new();
// Add some test checkpoints
let checkpoint1 = CheckpointInfo {
metadata: CheckpointMetadata {
timestamp: 1000,
topic: "test".to_string(),
partition: 0,
checkpoint_type: CheckpointType::Full,
consumer_offset: 100,
producer_offset: 50,
files: vec![],
previous_checkpoint: None,
total_size_bytes: 0,
key_count: 100,
},
s3_key_prefix: "test/0/1000".to_string(),
};
let checkpoint2 = CheckpointInfo {
metadata: CheckpointMetadata {
timestamp: 2000,
topic: "test".to_string(),
partition: 0,
checkpoint_type: CheckpointType::Partial,
consumer_offset: 200,
producer_offset: 100,
files: vec![],
previous_checkpoint: Some(1000),
total_size_bytes: 0,
key_count: 200,
},
s3_key_prefix: "test/0/2000".to_string(),
};
mock_client.add_checkpoint("test", 0, checkpoint1);
mock_client.add_checkpoint("test", 0, checkpoint2);
let temp_dir = TempDir::new().unwrap();
let loader =
CheckpointLoader::new(mock_client, temp_dir.path().to_string_lossy().to_string());
let result = loader.load_latest_checkpoint("test", 0).await.unwrap();
assert!(result.is_some());
let (checkpoint_info, local_path) = result.unwrap();
assert_eq!(checkpoint_info.metadata.timestamp, 2000); // Should get the newest
assert!(local_path.exists());
}
#[tokio::test]
async fn test_no_checkpoints_available() {
let mock_client = MockClient::new();
let temp_dir = TempDir::new().unwrap();
let loader =
CheckpointLoader::new(mock_client, temp_dir.path().to_string_lossy().to_string());
let result = loader
.load_latest_checkpoint("nonexistent", 0)
.await
.unwrap();
assert!(result.is_none());
}
}

View File

@@ -12,74 +12,49 @@ pub enum CheckpointType {
Partial,
}
/// Information about a checkpoint file
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointFile {
/// Path relative to checkpoint root
pub path: String,
/// Size of file in bytes
pub size_bytes: u64,
/// SHA256 hash of file contents
pub checksum: Option<String>,
}
/// Metadata about a checkpoint
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointMetadata {
/// Unix timestamp when checkpoint was created
pub timestamp: u64,
/// Checkpoint ID (RFC3339-ish timestamp, e.g., "2025-10-14T16-00-05Z")
pub id: String,
/// Topic name
pub topic: String,
/// Partition number
pub partition: i32,
/// Type of checkpoint
pub checkpoint_type: CheckpointType,
/// RocksDB sequence number at checkpoint time
pub sequence: u64,
/// Consumer offset at time of checkpoint
pub consumer_offset: i64,
/// Producer offset at time of checkpoint
pub producer_offset: i64,
/// List of files in this checkpoint
pub files: Vec<CheckpointFile>,
/// Timestamp of previous checkpoint if this is a partial checkpoint
pub previous_checkpoint: Option<u64>,
/// Total size of all files in bytes
pub total_size_bytes: u64,
/// Number of keys in the checkpoint
pub key_count: u64,
/// Files with relative paths (can reference parent checkpoints)
pub files: Vec<String>,
}
impl CheckpointMetadata {
/// Create new checkpoint metadata
/// Create new checkpoint metadata with a given ID
pub fn new(
checkpoint_type: CheckpointType,
id: String,
topic: String,
partition: i32,
sequence: u64,
consumer_offset: i64,
producer_offset: i64,
key_count: u64,
) -> Self {
Self {
timestamp: chrono::Utc::now().timestamp() as u64,
id,
topic,
partition,
checkpoint_type,
sequence,
consumer_offset,
producer_offset,
files: Vec::new(),
previous_checkpoint: None,
total_size_bytes: 0,
key_count,
}
}
/// Add a file to the checkpoint metadata
pub fn add_file(&mut self, path: String, size_bytes: u64, checksum: Option<String>) {
self.total_size_bytes += size_bytes;
self.files.push(CheckpointFile {
path,
size_bytes,
checksum,
});
/// Generate a checkpoint ID from the current timestamp
pub fn generate_id() -> String {
chrono::Utc::now().format("%Y-%m-%dT%H-%M-%SZ").to_string()
}
/// Save metadata to a JSON file
@@ -97,14 +72,14 @@ impl CheckpointMetadata {
Ok(metadata)
}
/// Get S3 key prefix for this checkpoint
/// Get S3 key prefix for this checkpoint (<topic>/<partition>/<id>/)
pub fn get_s3_key_prefix(&self) -> String {
format!("{}/{}/{}", self.topic, self.partition, self.timestamp)
format!("{}/{}/{}", self.topic, self.partition, self.id)
}
/// Get metadata filename
pub fn get_metadata_filename(&self) -> String {
format!("metadata-{}.json", self.timestamp)
"metadata.json".to_string()
}
}
@@ -145,49 +120,41 @@ mod tests {
#[tokio::test]
async fn test_checkpoint_metadata_creation() {
let metadata = CheckpointMetadata::new(
CheckpointType::Full,
"test-topic".to_string(),
0,
100,
50,
1000,
);
let id = "2025-10-14T16-00-05Z".to_string();
let metadata =
CheckpointMetadata::new(id.clone(), "test-topic".to_string(), 0, 1234567890, 100, 50);
assert_eq!(metadata.id, id);
assert_eq!(metadata.topic, "test-topic");
assert_eq!(metadata.partition, 0);
assert_eq!(metadata.checkpoint_type, CheckpointType::Full);
assert_eq!(metadata.sequence, 1234567890);
assert_eq!(metadata.consumer_offset, 100);
assert_eq!(metadata.producer_offset, 50);
assert_eq!(metadata.key_count, 1000);
assert_eq!(metadata.files.len(), 0);
assert_eq!(metadata.total_size_bytes, 0);
}
#[tokio::test]
async fn test_add_file_to_metadata() {
async fn test_add_files_to_metadata() {
let mut metadata = CheckpointMetadata::new(
CheckpointType::Full,
"2025-10-14T16-00-05Z".to_string(),
"test-topic".to_string(),
0,
1234567890,
100,
50,
1000,
);
metadata.add_file("sst/000001.sst".to_string(), 1024, None);
metadata.add_file(
"sst/000002.sst".to_string(),
2048,
Some("abcd1234".to_string()),
);
// Add files
metadata.files.push("000001.sst".to_string());
metadata
.files
.push("../2025-10-14T15-00-00Z/000002.sst".to_string());
metadata.files.push("MANIFEST-000123".to_string());
assert_eq!(metadata.files.len(), 2);
assert_eq!(metadata.total_size_bytes, 3072);
assert_eq!(metadata.files[0].path, "sst/000001.sst");
assert_eq!(metadata.files[0].size_bytes, 1024);
assert!(metadata.files[0].checksum.is_none());
assert_eq!(metadata.files[1].checksum, Some("abcd1234".to_string()));
assert_eq!(metadata.files.len(), 3);
assert_eq!(metadata.files[0], "000001.sst");
assert_eq!(metadata.files[1], "../2025-10-14T15-00-00Z/000002.sst");
assert_eq!(metadata.files[2], "MANIFEST-000123");
}
#[tokio::test]
@@ -196,15 +163,14 @@ mod tests {
let metadata_path = temp_dir.path().join("metadata.json");
let mut metadata = CheckpointMetadata::new(
CheckpointType::Partial,
"2025-10-14T16-00-05Z".to_string(),
"test-topic".to_string(),
1,
9876543210,
200,
150,
2000,
);
metadata.previous_checkpoint = Some(1000);
metadata.add_file("sst/000001.sst".to_string(), 1024, None);
metadata.files.push("000001.sst".to_string());
// Save metadata
metadata.save_to_file(&metadata_path).await.unwrap();
@@ -214,68 +180,74 @@ mod tests {
.await
.unwrap();
assert_eq!(loaded_metadata.id, metadata.id);
assert_eq!(loaded_metadata.topic, metadata.topic);
assert_eq!(loaded_metadata.partition, metadata.partition);
assert_eq!(loaded_metadata.checkpoint_type, CheckpointType::Partial);
assert_eq!(loaded_metadata.sequence, metadata.sequence);
assert_eq!(loaded_metadata.consumer_offset, metadata.consumer_offset);
assert_eq!(loaded_metadata.producer_offset, metadata.producer_offset);
assert_eq!(loaded_metadata.key_count, metadata.key_count);
assert_eq!(loaded_metadata.files.len(), 1);
assert_eq!(loaded_metadata.previous_checkpoint, Some(1000));
}
#[test]
fn test_s3_key_prefix() {
let metadata = CheckpointMetadata::new(
CheckpointType::Full,
"2025-10-14T16-00-05Z".to_string(),
"test-topic".to_string(),
2,
1234567890,
100,
50,
1000,
);
let prefix = metadata.get_s3_key_prefix();
assert!(prefix.starts_with("test-topic/2/"));
assert!(prefix.split('/').count() == 3);
assert_eq!(prefix, "test-topic/2/2025-10-14T16-00-05Z");
}
#[test]
fn test_checkpoint_info() {
let metadata = CheckpointMetadata::new(
CheckpointType::Full,
"2025-10-14T16-00-05Z".to_string(),
"test-topic".to_string(),
0,
1234567890,
100,
50,
1000,
);
let info = CheckpointInfo::new(metadata.clone());
assert_eq!(info.s3_key_prefix, metadata.get_s3_key_prefix());
assert!(info.get_metadata_key().ends_with("/metadata.json"));
assert_eq!(
info.get_file_key("sst/000001.sst"),
format!("{}/sst/000001.sst", info.s3_key_prefix)
info.get_metadata_key(),
"test-topic/0/2025-10-14T16-00-05Z/metadata.json"
);
assert_eq!(
info.get_file_key("000001.sst"),
"test-topic/0/2025-10-14T16-00-05Z/000001.sst"
);
}
#[test]
fn test_metadata_filename() {
let metadata = CheckpointMetadata {
timestamp: 1234567890,
topic: "test".to_string(),
partition: 0,
checkpoint_type: CheckpointType::Full,
consumer_offset: 100,
producer_offset: 50,
files: vec![],
previous_checkpoint: None,
total_size_bytes: 0,
key_count: 0,
};
let metadata = CheckpointMetadata::new(
"2025-10-14T16-00-05Z".to_string(),
"test".to_string(),
0,
1234567890,
100,
50,
);
assert_eq!(metadata.get_metadata_filename(), "metadata-1234567890.json");
assert_eq!(metadata.get_metadata_filename(), "metadata.json");
}
#[test]
fn test_generate_id() {
let id = CheckpointMetadata::generate_id();
// Should be in format YYYY-MM-DDTHH-MM-SSZ
assert!(id.contains('T'));
assert!(id.ends_with('Z'));
assert!(id.len() > 15); // Rough length check
}
}

View File

@@ -1,9 +1,8 @@
pub mod client;
pub mod config;
pub mod export;
pub mod loader;
pub mod metadata;
pub mod s3_client;
pub mod planner;
pub mod s3_uploader;
pub mod uploader;
pub mod worker;
@@ -11,9 +10,8 @@ pub mod worker;
pub use client::CheckpointClient;
pub use config::CheckpointConfig;
pub use export::CheckpointExporter;
pub use loader::CheckpointLoader;
pub use metadata::{CheckpointFile, CheckpointInfo, CheckpointMetadata, CheckpointType};
pub use s3_client::S3CheckpointClient;
pub use metadata::{CheckpointInfo, CheckpointMetadata, CheckpointType};
pub use planner::{plan_checkpoint, CheckpointPlan};
pub use s3_uploader::S3Uploader;
pub use uploader::CheckpointUploader;
pub use worker::{

View File

@@ -0,0 +1,286 @@
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::path::Path;
use tracing::{debug, info};
use super::CheckpointMetadata;
/// Result of checkpoint planning
#[derive(Debug)]
pub struct CheckpointPlan {
/// The new checkpoint metadata with files field populated
pub metadata: CheckpointMetadata,
/// Files that need to be uploaded to S3 (filename, local_full_path)
pub files_to_upload: Vec<(String, String)>,
}
/// Create a checkpoint plan with new metadata and list of files to upload
#[allow(clippy::too_many_arguments)]
pub fn plan_checkpoint(
local_checkpoint_dir: &Path,
new_checkpoint_id: String,
topic: String,
partition: i32,
sequence: u64,
consumer_offset: i64,
producer_offset: i64,
previous_metadata: Option<&CheckpointMetadata>,
) -> Result<CheckpointPlan> {
let mut metadata = CheckpointMetadata::new(
new_checkpoint_id,
topic,
partition,
sequence,
consumer_offset,
producer_offset,
);
let mut files_to_upload = Vec::new();
// Collect all files in local checkpoint directory
let local_files = collect_local_files(local_checkpoint_dir)?;
info!(
"Found {} files in local checkpoint directory",
local_files.len()
);
// If no previous metadata, upload everything
let Some(prev_meta) = previous_metadata else {
info!("No previous checkpoint metadata - uploading all files");
for (filename, local_path) in local_files {
metadata.files.push(filename.clone());
files_to_upload.push((filename, local_path));
}
return Ok(CheckpointPlan {
metadata,
files_to_upload,
});
};
// Build map of filename -> file_path from previous checkpoint
let mut file_map: HashMap<String, String> = HashMap::new();
for file_path in &prev_meta.files {
// Extract just the filename from the path
let filename = file_path
.rsplit('/')
.next()
.unwrap_or(file_path)
.to_string();
file_map.insert(filename.clone(), file_path.clone());
}
info!(
"Built file map with {} files from previous checkpoint {}",
file_map.len(),
prev_meta.id
);
// For each local file, check if it exists in previous metadata
for (filename, local_path) in local_files {
if let Some(prev_file_path) = file_map.get(&filename) {
// File exists in previous checkpoint - reuse it
let relative_path = if prev_file_path.starts_with("../") {
// Already a relative reference, keep it as is
prev_file_path.clone()
} else {
// Create relative path to previous checkpoint
format!("../{}/{}", prev_meta.id, filename)
};
metadata.files.push(relative_path);
debug!("Reusing file {} from previous checkpoint", filename);
} else {
// File is new - needs to be uploaded
metadata.files.push(filename.clone());
files_to_upload.push((filename.clone(), local_path));
debug!("New file {} will be uploaded", filename);
}
}
info!(
"Checkpoint plan: {} new files, {} reused files",
files_to_upload.len(),
metadata.files.len() - files_to_upload.len(),
);
Ok(CheckpointPlan {
metadata,
files_to_upload,
})
}
/// Collect all files in local checkpoint directory
fn collect_local_files(base_path: &Path) -> Result<Vec<(String, String)>> {
let mut files = Vec::new();
let mut stack = vec![base_path.to_path_buf()];
while let Some(current_path) = stack.pop() {
let entries = std::fs::read_dir(&current_path)
.with_context(|| format!("Failed to read directory: {current_path:?}"))?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else {
let relative_path = path
.strip_prefix(base_path)
.with_context(|| format!("Failed to get relative path for: {path:?}"))?;
let filename = relative_path.to_string_lossy().replace('\\', "/");
let local_path = path.to_string_lossy().to_string();
files.push((filename, local_path));
}
}
}
Ok(files)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_plan_checkpoint_no_previous() {
let temp_dir = TempDir::new().unwrap();
let checkpoint_dir = temp_dir.path();
// Create some test files
std::fs::write(checkpoint_dir.join("file1.sst"), b"data1").unwrap();
std::fs::write(checkpoint_dir.join("file2.sst"), b"data2").unwrap();
let plan = plan_checkpoint(
checkpoint_dir,
"2025-10-14T16-00-00Z".to_string(),
"test-topic".to_string(),
0,
1000,
100,
50,
None,
)
.unwrap();
// With no previous metadata, all files should be uploaded
assert_eq!(plan.files_to_upload.len(), 2);
assert_eq!(plan.metadata.files.len(), 2);
}
#[test]
fn test_plan_checkpoint_with_previous() {
let temp_dir = TempDir::new().unwrap();
let checkpoint_dir = temp_dir.path();
// Create test files
std::fs::write(checkpoint_dir.join("file1.sst"), b"data1").unwrap();
std::fs::write(checkpoint_dir.join("file2.sst"), b"data2").unwrap();
std::fs::write(checkpoint_dir.join("file3.sst"), b"data3").unwrap();
// Create previous metadata with file1 and file2
let mut prev_metadata = CheckpointMetadata::new(
"2025-10-14T15-00-00Z".to_string(),
"test-topic".to_string(),
0,
1000,
100,
50,
);
prev_metadata.files.push("file1.sst".to_string());
prev_metadata.files.push("file2.sst".to_string());
let plan = plan_checkpoint(
checkpoint_dir,
"2025-10-14T16-00-00Z".to_string(),
"test-topic".to_string(),
0,
2000,
200,
100,
Some(&prev_metadata),
)
.unwrap();
// file1 and file2 should be reused, file3 should be uploaded
assert_eq!(plan.files_to_upload.len(), 1);
assert_eq!(plan.metadata.files.len(), 3);
// Check that file3 is in upload list
assert!(plan
.files_to_upload
.iter()
.any(|(name, _)| name == "file3.sst"));
// Check that file1 and file2 are in metadata as references
assert!(plan.metadata.files.iter().any(|p| p.contains("file1.sst")));
assert!(plan.metadata.files.iter().any(|p| p.contains("file2.sst")));
assert!(plan.metadata.files.iter().any(|p| p == "file3.sst"));
}
#[test]
fn test_plan_checkpoint_with_already_referenced_files() {
let temp_dir = TempDir::new().unwrap();
let checkpoint_dir = temp_dir.path();
// Create test files
std::fs::write(checkpoint_dir.join("file1.sst"), b"data1").unwrap();
std::fs::write(checkpoint_dir.join("file2.sst"), b"data2").unwrap();
std::fs::write(checkpoint_dir.join("file3.sst"), b"data3").unwrap();
// Create previous metadata where file1 is already a reference to an older checkpoint
let mut prev_metadata = CheckpointMetadata::new(
"2025-10-14T16-00-00Z".to_string(),
"test-topic".to_string(),
0,
2000,
200,
100,
);
prev_metadata
.files
.push("../2025-10-14T14-00-00Z/file1.sst".to_string());
prev_metadata.files.push("file2.sst".to_string());
let plan = plan_checkpoint(
checkpoint_dir,
"2025-10-14T17-00-00Z".to_string(),
"test-topic".to_string(),
0,
3000,
300,
150,
Some(&prev_metadata),
)
.unwrap();
// file1 and file2 should be reused, file3 should be uploaded
assert_eq!(plan.files_to_upload.len(), 1);
assert_eq!(plan.metadata.files.len(), 3);
// Check that file3 is in upload list
assert!(plan
.files_to_upload
.iter()
.any(|(name, _)| name == "file3.sst"));
// Check that file1 keeps its existing relative reference
assert!(plan
.metadata
.files
.iter()
.any(|p| p == "../2025-10-14T14-00-00Z/file1.sst"));
// Check that file2 gets a new relative reference
assert!(plan
.metadata
.files
.iter()
.any(|p| p == "../2025-10-14T16-00-00Z/file2.sst"));
}
}

View File

@@ -1,234 +0,0 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use aws_config::{meta::region::RegionProviderChain, Region};
use aws_sdk_s3::{Client, Config};
use std::path::Path;
use tokio::fs;
use tracing::{info, warn};
use super::client::CheckpointClient;
use super::config::CheckpointConfig;
use super::metadata::{CheckpointInfo, CheckpointMetadata};
#[derive(Debug, Clone)]
pub struct S3CheckpointClient {
client: Client,
config: CheckpointConfig,
}
impl S3CheckpointClient {
pub async fn new(config: CheckpointConfig) -> Result<Self> {
let region_provider =
RegionProviderChain::default_provider().or_else(Region::new(config.aws_region.clone()));
let aws_config = aws_config::from_env().region(region_provider).load().await;
let s3_config = Config::from(&aws_config);
let client = Client::from_conf(s3_config);
Ok(Self { client, config })
}
async fn download_file(&self, s3_key: &str, local_path: &Path) -> Result<()> {
let response = self
.client
.get_object()
.bucket(&self.config.s3_bucket)
.key(s3_key)
.send()
.await
.with_context(|| format!("Failed to download S3 object: {s3_key}"))?;
let body = response
.body
.collect()
.await
.context("Failed to read S3 object body")?;
if let Some(parent) = local_path.parent() {
fs::create_dir_all(parent).await.with_context(|| {
format!("Failed to create parent directories for: {local_path:?}")
})?;
}
fs::write(local_path, body.into_bytes())
.await
.with_context(|| format!("Failed to write file: {local_path:?}"))?;
info!(
"Downloaded s3://{}/{} to {:?}",
self.config.s3_bucket, s3_key, local_path
);
Ok(())
}
async fn list_objects_with_prefix(&self, prefix: &str) -> Result<Vec<String>> {
let response = self
.client
.list_objects_v2()
.bucket(&self.config.s3_bucket)
.prefix(prefix)
.send()
.await
.context("Failed to list S3 objects")?;
let keys = response
.contents()
.iter()
.filter_map(|obj| obj.key())
.map(|k| k.to_string())
.collect();
Ok(keys)
}
}
#[async_trait]
impl CheckpointClient for S3CheckpointClient {
async fn list_checkpoint_metadata(
&self,
topic: &str,
partition: i32,
) -> Result<Vec<CheckpointInfo>> {
let prefix = format!("{}/{}/{}", self.config.s3_key_prefix, topic, partition);
let keys = self.list_objects_with_prefix(&prefix).await?;
let mut checkpoint_infos = Vec::new();
for key in keys {
if key.ends_with("/metadata.json") {
// Parse the key to extract checkpoint info
if let Some((topic_parsed, partition_parsed, _timestamp)) =
CheckpointInfo::parse_s3_key(&key, &self.config.s3_key_prefix)
{
if topic_parsed == topic && partition_parsed == partition {
// Get the metadata
match self.get_checkpoint_metadata(&key).await {
Ok(metadata) => {
let s3_key_prefix = key.replace("/metadata.json", "");
checkpoint_infos.push(CheckpointInfo {
metadata,
s3_key_prefix,
});
}
Err(e) => {
warn!("Failed to get checkpoint metadata for key {}: {}", key, e);
}
}
}
}
}
}
// Sort by timestamp (newest first)
checkpoint_infos.sort_by(|a, b| b.metadata.timestamp.cmp(&a.metadata.timestamp));
Ok(checkpoint_infos)
}
async fn download_checkpoint(
&self,
checkpoint_info: &CheckpointInfo,
local_path: &Path,
) -> Result<()> {
info!(
"Downloading checkpoint {} to {:?} using transfer manager",
checkpoint_info.s3_key_prefix, local_path
);
// Create local directory
fs::create_dir_all(local_path)
.await
.with_context(|| format!("Failed to create checkpoint directory: {local_path:?}"))?;
// Download metadata.json first
let metadata_s3_key = format!("{}/metadata.json", checkpoint_info.s3_key_prefix);
let metadata_local_path = local_path.join("metadata.json");
self.download_file(&metadata_s3_key, &metadata_local_path)
.await?;
// Download all files concurrently
let download_tasks: Vec<_> = checkpoint_info
.metadata
.files
.iter()
.map(|file| {
let file_s3_key = format!("{}/{}", checkpoint_info.s3_key_prefix, file.path);
let file_local_path = local_path.join(&file.path);
async move { self.download_file(&file_s3_key, &file_local_path).await }
})
.collect();
// Execute all downloads concurrently
futures::future::try_join_all(download_tasks).await?;
info!(
"Successfully downloaded checkpoint with {} files to {:?}",
checkpoint_info.metadata.files.len(),
local_path
);
Ok(())
}
async fn get_checkpoint_metadata(&self, metadata_key: &str) -> Result<CheckpointMetadata> {
let response = self
.client
.get_object()
.bucket(&self.config.s3_bucket)
.key(metadata_key)
.send()
.await
.with_context(|| format!("Failed to get checkpoint metadata: {metadata_key}"))?;
let body = response
.body
.collect()
.await
.context("Failed to read metadata body")?;
let json =
String::from_utf8(body.into_bytes().to_vec()).context("Invalid UTF-8 in metadata")?;
serde_json::from_str(&json)
.with_context(|| format!("Failed to parse checkpoint metadata: {metadata_key}"))
}
async fn checkpoint_exists(&self, checkpoint_info: &CheckpointInfo) -> Result<bool> {
let metadata_key = format!("{}/metadata.json", checkpoint_info.s3_key_prefix);
let result = self
.client
.head_object()
.bucket(&self.config.s3_bucket)
.key(&metadata_key)
.send()
.await;
Ok(result.is_ok())
}
async fn is_available(&self) -> bool {
!self.config.s3_bucket.is_empty()
}
}
impl CheckpointInfo {
/// Parse S3 key to extract topic, partition, and timestamp
pub fn parse_s3_key(key: &str, prefix: &str) -> Option<(String, i32, u64)> {
// Remove prefix and split remaining path
let path = key.strip_prefix(prefix)?.trim_start_matches('/');
let parts: Vec<&str> = path.split('/').collect();
if parts.len() < 3 {
return None;
}
let topic = parts[0].to_string();
let partition = parts[1].parse().ok()?;
let timestamp = parts[2].parse().ok()?;
Some((topic, partition, timestamp))
}
}

View File

@@ -135,6 +135,63 @@ impl CheckpointUploader for S3Uploader {
Ok(uploaded_keys)
}
async fn upload_checkpoint_with_plan(
&self,
plan: &super::CheckpointPlan,
remote_key_prefix: &str,
) -> Result<Vec<String>> {
info!(
"Starting upload with plan: {} files to upload, {} files referenced from parents",
plan.files_to_upload.len(),
plan.metadata.files.len() - plan.files_to_upload.len()
);
// Upload all files concurrently
let upload_futures: Vec<_> = plan
.files_to_upload
.iter()
.map(|(filename, local_path)| {
let s3_key = format!("{remote_key_prefix}/{filename}");
let local_path = local_path.clone();
async move {
self.upload_file(Path::new(&local_path), &s3_key).await?;
Ok::<String, anyhow::Error>(s3_key)
}
})
.collect();
let uploaded_keys = futures::future::try_join_all(upload_futures).await?;
// Upload metadata.json
let metadata_json = serde_json::to_string_pretty(&plan.metadata)
.context("Failed to serialize checkpoint metadata")?;
let metadata_key = format!("{remote_key_prefix}/metadata.json");
let put_object = self
.client
.put_object()
.bucket(&self.config.s3_bucket)
.key(&metadata_key)
.body(metadata_json.into_bytes().into());
let result = tokio::time::timeout(self.config.s3_timeout, put_object.send())
.await
.with_context(|| format!("S3 upload timeout for key: {metadata_key}"))?;
result.with_context(|| format!("Failed to upload metadata to S3 key: {metadata_key}"))?;
info!(
"Uploaded {} files and metadata to s3://{}/{}",
plan.files_to_upload.len(),
self.config.s3_bucket,
remote_key_prefix
);
let mut all_keys = uploaded_keys;
all_keys.push(metadata_key);
Ok(all_keys)
}
async fn is_available(&self) -> bool {
!self.config.s3_bucket.is_empty()
}

View File

@@ -2,6 +2,8 @@ use anyhow::Result;
use async_trait::async_trait;
use std::path::Path;
use super::CheckpointPlan;
/// Trait for uploading checkpoints to remote storage
#[async_trait]
pub trait CheckpointUploader: Send + Sync + std::fmt::Debug {
@@ -12,6 +14,13 @@ pub trait CheckpointUploader: Send + Sync + std::fmt::Debug {
remote_key_prefix: &str,
) -> Result<Vec<String>>;
/// Upload checkpoint using a plan (specific files + metadata)
async fn upload_checkpoint_with_plan(
&self,
plan: &CheckpointPlan,
remote_key_prefix: &str,
) -> Result<Vec<String>>;
/// Check if the uploader is available/configured
async fn is_available(&self) -> bool;
}

View File

@@ -2,13 +2,13 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use super::CheckpointExporter;
use super::{plan_checkpoint, CheckpointExporter, CheckpointMetadata};
use crate::kafka::types::Partition;
use crate::metrics_const::{
CHECKPOINT_DURATION_HISTOGRAM, CHECKPOINT_FILE_COUNT_HISTOGRAM, CHECKPOINT_SIZE_HISTOGRAM,
CHECKPOINT_WORKER_STATUS_COUNTER,
};
use crate::store::DeduplicationStore;
use crate::store::{DeduplicationStore, LocalCheckpointInfo};
use anyhow::{Context, Result};
use metrics;
@@ -49,20 +49,21 @@ pub struct CheckpointTarget {
impl CheckpointTarget {
pub fn new(partition: Partition, local_checkpoint_base_dir: &Path) -> Result<Self> {
let cp_epoch_micros_str = Self::format_checkpoint_timestamp(SystemTime::now())?;
let cp_topic = format!("{}{}", CHECKPOINT_TOPIC_PREFIX, &partition.topic());
let cp_partition = format!(
"{}{}",
CHECKPOINT_PARTITION_PREFIX,
&partition.partition_number()
// Use RFC3339-ish timestamp format for checkpoint ID
let checkpoint_id = chrono::Utc::now().format("%Y-%m-%dT%H-%M-%SZ").to_string();
// S3 layout: <topic>/<partition>/<timestamp>/
let remote_path = format!(
"{}/{}/{}",
partition.topic(),
partition.partition_number(),
checkpoint_id
);
let remote_path = format!("{}/{}/{}", &cp_topic, &cp_partition, cp_epoch_micros_str);
let local_path = PathBuf::from(local_checkpoint_base_dir)
.join(cp_topic)
.join(cp_partition)
.join(cp_epoch_micros_str);
.join(partition.topic())
.join(partition.partition_number().to_string())
.join(&checkpoint_id);
let local_path_tag = local_path.to_string_lossy().to_string();
Ok(Self {
@@ -94,6 +95,9 @@ pub struct CheckpointWorker {
/// Checkpoint export module
exporter: Option<Arc<CheckpointExporter>>,
/// Whether this is a test worker
test_mode: bool,
}
impl CheckpointWorker {
@@ -106,15 +110,54 @@ impl CheckpointWorker {
worker_id,
target,
exporter,
test_mode: false,
}
}
/// Perform a checkpoint for the given (assumed active) partition and store
pub fn new_for_testing(
worker_id: u32,
target: CheckpointTarget,
exporter: Option<Arc<CheckpointExporter>>,
) -> Self {
Self {
worker_id,
target,
exporter,
test_mode: true,
}
}
/// Perform a complete checkpoint operation: create, export, and cleanup
/// Returns (remote_path, metadata) on success
pub async fn checkpoint_partition(
&self,
mode: CheckpointMode,
store: &DeduplicationStore,
) -> Result<Option<String>> {
previous_metadata: Option<&CheckpointMetadata>,
) -> Result<Option<(String, CheckpointMetadata)>> {
// Create local checkpoint
let rocks_metadata = self.create_checkpoint(mode, store).await?;
// Export checkpoint
let result = self
.export_checkpoint(mode, &rocks_metadata, previous_metadata)
.await;
// Clean up temp checkpoint directory (skip in test mode to allow verification)
if !self.test_mode {
self.cleanup_checkpoint().await;
}
result
}
/// Create a local checkpoint (step 1 of checkpoint process)
/// Returns RocksDB metadata about the checkpoint
pub async fn create_checkpoint(
&self,
mode: CheckpointMode,
store: &DeduplicationStore,
) -> Result<LocalCheckpointInfo> {
info!(
self.worker_id,
local_path = self.target.local_path_tag,
@@ -126,7 +169,7 @@ impl CheckpointWorker {
self.create_partition_checkpoint_directory(mode).await?;
// this creates the local RocksDB checkpoint - results observed internally, safe to bubble up
self.create_local_partition_checkpoint(mode, store).await?;
let rocks_metadata = self.create_local_partition_checkpoint(mode, store).await?;
// update store metrics - this can fail without blocking the checkpoint attempt
if let Err(e) = store.update_metrics() {
@@ -139,8 +182,19 @@ impl CheckpointWorker {
);
}
// export the checkpoint - observed internally, safe to return result
self.export_checkpoint(mode).await
Ok(rocks_metadata)
}
/// Clean up the temporary checkpoint directory (step 3 of checkpoint process)
pub async fn cleanup_checkpoint(&self) {
if let Err(e) = tokio::fs::remove_dir_all(&self.target.local_path).await {
warn!(
self.worker_id,
local_path = self.target.local_path_tag,
"Failed to clean up temp checkpoint directory: {}",
e
);
}
}
async fn create_partition_checkpoint_directory(&self, mode: CheckpointMode) -> Result<()> {
@@ -179,7 +233,7 @@ impl CheckpointWorker {
&self,
mode: CheckpointMode,
store: &DeduplicationStore,
) -> Result<()> {
) -> Result<LocalCheckpointInfo> {
let start_time = Instant::now();
// TODO: this should accept CheckpointMode argument to implement incremental local checkpoint step
@@ -205,7 +259,7 @@ impl CheckpointWorker {
"Created local checkpoint",
);
Ok(())
Ok(metadata)
}
Err(e) => {
@@ -236,7 +290,12 @@ impl CheckpointWorker {
}
}
async fn export_checkpoint(&self, mode: CheckpointMode) -> Result<Option<String>> {
async fn export_checkpoint(
&self,
mode: CheckpointMode,
rocks_info: &LocalCheckpointInfo,
previous_metadata: Option<&CheckpointMetadata>,
) -> Result<Option<(String, CheckpointMetadata)>> {
info!(
self.worker_id,
local_path = self.target.local_path_tag,
@@ -246,8 +305,46 @@ impl CheckpointWorker {
match self.exporter.as_ref() {
Some(exporter) => {
// Extract checkpoint ID from remote_path (format: <topic>/<partition>/<timestamp>)
let checkpoint_id = self
.target
.remote_path
.split('/')
.next_back()
.ok_or_else(|| {
anyhow::anyhow!("Failed to extract checkpoint ID from remote path")
})?
.to_string();
// TODO: Wire up actual consumer/producer offset tracking
// For now, using 0 as placeholder values
let consumer_offset = 0i64;
let producer_offset = 0i64;
// Create checkpoint plan
let plan = plan_checkpoint(
&self.target.local_path,
checkpoint_id,
self.target.partition.topic().to_string(),
self.target.partition.partition_number(),
rocks_info.sequence,
consumer_offset,
producer_offset,
previous_metadata,
)?;
info!(
self.worker_id,
local_path = self.target.local_path_tag,
total_files = plan.metadata.files.len(),
new_files = plan.files_to_upload.len(),
reused_files = plan.metadata.files.len() - plan.files_to_upload.len(),
"Checkpoint plan created"
);
// Export checkpoint using the plan
match exporter
.export_checkpoint(&self.target.local_path, &self.target.remote_path, mode)
.export_checkpoint_with_plan(&plan, &self.target.remote_path, mode)
.await
{
Ok(remote_key_prefix) => {
@@ -265,7 +362,7 @@ impl CheckpointWorker {
"Checkpoint exported successfully"
);
Ok(Some(remote_key_prefix))
Ok(Some((remote_key_prefix, plan.metadata)))
}
Err(e) => {
@@ -419,9 +516,7 @@ mod tests {
// simulate how the manager's checkpoint loop thread constructs workers
let worker = CheckpointWorker::new(1, target.clone(), None);
let result = worker
.checkpoint_partition(CheckpointMode::Full, &store)
.await;
let result = worker.create_checkpoint(CheckpointMode::Full, &store).await;
assert!(result.is_ok());
let expected_checkpoint_path = Path::new(&target.local_path);
@@ -480,8 +575,9 @@ mod tests {
// simulate how the manager's checkpoint loop thread constructs workers
let worker = CheckpointWorker::new(1, target.clone(), None);
// Use create_checkpoint to test without cleanup
let result = worker
.checkpoint_partition(CheckpointMode::Incremental, &store)
.create_checkpoint(CheckpointMode::Incremental, &store)
.await;
assert!(result.is_ok());

View File

@@ -7,8 +7,8 @@ use std::sync::{
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::checkpoint::{
CheckpointConfig, CheckpointExporter, CheckpointMode, CheckpointTarget, CheckpointWorker,
CHECKPOINT_PARTITION_PREFIX, CHECKPOINT_TOPIC_PREFIX,
CheckpointConfig, CheckpointExporter, CheckpointMetadata, CheckpointMode, CheckpointTarget,
CheckpointWorker, CHECKPOINT_PARTITION_PREFIX, CHECKPOINT_TOPIC_PREFIX,
};
use crate::kafka::types::Partition;
use crate::metrics_const::{
@@ -19,6 +19,7 @@ use crate::store::DeduplicationStore;
use crate::store_manager::StoreManager;
use anyhow::{Context, Result};
use dashmap::DashMap;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
@@ -112,11 +113,11 @@ impl CheckpointManager {
// loop-local counter for individual worker task logging
let mut worker_task_id = 1_u32;
// loop-local state variables. In the future, we can pass in
// last-known values for these as recorded in checkpoint metadata
// loop-local state variables
let is_checkpointing = self.is_checkpointing.clone();
let checkpoint_counters: Arc<Mutex<HashMap<Partition, u32>>> =
Arc::new(Mutex::new(HashMap::new()));
// Track checkpoint counter and metadata per partition in a single map for atomic updates
let checkpoint_state: Arc<DashMap<Partition, (u32, CheckpointMetadata)>> =
Arc::new(DashMap::new());
let checkpoint_health_reporter = health_reporter.clone();
let checkpoint_task_handle = tokio::spawn(async move {
@@ -193,7 +194,7 @@ impl CheckpointManager {
// to avoid race conditions - the worker must acquire protected values
// when the thread executes, and mark it's own completion
let worker_is_checkpointing = is_checkpointing.clone();
let worker_checkpoint_counters = checkpoint_counters.clone();
let worker_checkpoint_state = checkpoint_state.clone();
let worker_store_manager = store_manager.clone();
let worker_exporter = exporter.as_ref().map(|e| e.clone());
let worker_cancel_token = cancel_submit_loop_token.child_token();
@@ -239,20 +240,23 @@ impl CheckpointManager {
}
};
// Determine if this should be a full checkpoint or incremental,
// and block while executing the operation
let mode = Self::get_checkpoint_mode(&partition, &worker_checkpoint_counters, worker_full_checkpoint_interval).await;
let result = worker.checkpoint_partition(mode, &target_store).await;
// Get previous checkpoint state (counter and metadata) for this partition
let (counter, prev_metadata) = worker_checkpoint_state
.get(&partition)
.map(|entry| (entry.0, Some(entry.1.clone())))
.unwrap_or((0, None));
// Determine if this should be a full checkpoint or incremental
let mode = Self::get_checkpoint_mode(counter, worker_full_checkpoint_interval);
// Execute checkpoint operation with previous metadata for deduplication
let result = worker.checkpoint_partition(mode, &target_store, prev_metadata.as_ref()).await;
// handle releasing locks and reporting outcome
let status = match result {
Ok(Some(_)) => {
// only update the counter for this partition/store if a checkpoint + export was successful
{
let mut counter_guard = worker_checkpoint_counters.lock().await;
let counter_for_partition = *counter_guard.get(&partition).unwrap_or(&0_u32);
counter_guard.insert(partition.clone(), counter_for_partition + 1);
}
let status = match &result {
Ok(Some((_, new_metadata))) => {
// Update counter and metadata atomically on success
worker_checkpoint_state.insert(partition.clone(), (counter + 1, new_metadata.clone()));
"success"
},
Ok(None) => "skipped",
@@ -386,38 +390,23 @@ impl CheckpointManager {
);
worker
.checkpoint_partition(CheckpointMode::Full, &store)
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await?;
}
Ok(())
}
// use the local atomic counter for the given partition to determine
// if this checkpoint should be full or incremental. CheckpointConfig
// specifies the interval at which full checkpoints should be performed
async fn get_checkpoint_mode(
partition: &Partition,
checkpoint_counters: &Arc<Mutex<HashMap<Partition, u32>>>,
full_checkpoint_interval: u32,
) -> CheckpointMode {
// Determine if this should be a full upload or incremental
// use the checkpoint counter to determine if this checkpoint should be full or incremental.
// CheckpointConfig specifies the interval at which full checkpoints should be performed
fn get_checkpoint_mode(counter: u32, full_checkpoint_interval: u32) -> CheckpointMode {
// if config.full_upload_interval is 0, then we should always do full uploads
if full_checkpoint_interval == 0 {
return CheckpointMode::Full;
}
// otherwise, use the atomic counter for this partition
// and decide based on the configured interval
let counter_for_partition: u32;
{
let counter_guard = checkpoint_counters.lock().await;
counter_for_partition = *counter_guard.get(partition).unwrap_or(&0_u32);
}
// when full_checkpoint_interval is 0, we default to always performing Full checkpoints
if counter_for_partition.is_multiple_of(full_checkpoint_interval) {
// when counter is a multiple of the interval, perform a full checkpoint
if counter.is_multiple_of(full_checkpoint_interval) {
CheckpointMode::Full
} else {
CheckpointMode::Incremental
@@ -644,13 +633,104 @@ impl Drop for CheckpointManager {
mod tests {
use super::*;
use crate::checkpoint::worker::CheckpointTarget;
use crate::checkpoint::{CheckpointPlan, CheckpointUploader};
use crate::store::{
DeduplicationStore, DeduplicationStoreConfig, TimestampKey, TimestampMetadata,
};
use async_trait::async_trait;
use common_types::RawEvent;
use std::{collections::HashMap, path::PathBuf, time::Duration};
use tempfile::TempDir;
/// Filesystem-based uploader for testing - copies files to a local export directory
#[derive(Debug)]
struct FilesystemUploader {
export_base_dir: PathBuf,
}
impl FilesystemUploader {
fn new(export_base_dir: PathBuf) -> Self {
Self { export_base_dir }
}
async fn copy_dir_recursive(&self, src: &Path, dest: &Path) -> Result<Vec<String>> {
let mut files_to_copy = Vec::new();
let mut stack = vec![(src.to_path_buf(), dest.to_path_buf())];
while let Some((current_src, current_dest)) = stack.pop() {
let entries = std::fs::read_dir(&current_src)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
let file_name = entry.file_name();
let dest_path = current_dest.join(&file_name);
if path.is_dir() {
stack.push((path, dest_path));
} else {
files_to_copy.push((path, dest_path));
}
}
}
let mut uploaded_files = Vec::new();
for (src_file, dest_file) in files_to_copy {
if let Some(parent) = dest_file.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::copy(&src_file, &dest_file).await?;
uploaded_files.push(dest_file.to_string_lossy().to_string());
}
Ok(uploaded_files)
}
}
#[async_trait]
impl CheckpointUploader for FilesystemUploader {
async fn upload_checkpoint_dir(
&self,
local_path: &Path,
remote_key_prefix: &str,
) -> Result<Vec<String>> {
let dest = self.export_base_dir.join(remote_key_prefix);
self.copy_dir_recursive(local_path, &dest).await
}
async fn upload_checkpoint_with_plan(
&self,
plan: &CheckpointPlan,
remote_key_prefix: &str,
) -> Result<Vec<String>> {
let dest_dir = self.export_base_dir.join(remote_key_prefix);
tokio::fs::create_dir_all(&dest_dir).await?;
let mut uploaded_files = Vec::new();
// Upload only new files
for (filename, local_path) in &plan.files_to_upload {
let dest_path = dest_dir.join(filename);
if let Some(parent) = dest_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::copy(local_path, &dest_path).await?;
uploaded_files.push(dest_path.to_string_lossy().to_string());
}
// Write metadata.json
let metadata_path = dest_dir.join("metadata.json");
let metadata_json = serde_json::to_string_pretty(&plan.metadata)?;
tokio::fs::write(&metadata_path, metadata_json).await?;
uploaded_files.push(metadata_path.to_string_lossy().to_string());
Ok(uploaded_files)
}
async fn is_available(&self) -> bool {
true
}
}
fn create_test_store_manager() -> Arc<StoreManager> {
let config = DeduplicationStoreConfig {
path: TempDir::new().unwrap().path().to_path_buf(),
@@ -692,14 +772,7 @@ mod tests {
if path.is_file() {
checkpoint_files.push(path);
} else if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with(CHECKPOINT_TOPIC_PREFIX)
|| name.starts_with(CHECKPOINT_PARTITION_PREFIX)
|| name.chars().filter(|c| c.is_ascii_digit()).count() == name.len()
{
stack.push(path);
}
}
stack.push(path);
}
}
}
@@ -854,20 +927,26 @@ mod tests {
let metadata2 = TimestampMetadata::new(&event2);
store.put_timestamp_record(&key2, &metadata2).unwrap();
// Create manager with short interval for testing
// Create manager with short interval for testing and filesystem exporter
let tmp_checkpoint_dir = TempDir::new().unwrap();
let tmp_export_dir = TempDir::new().unwrap();
let uploader = Box::new(FilesystemUploader::new(tmp_export_dir.path().to_path_buf()));
let config = CheckpointConfig {
checkpoint_interval: Duration::from_millis(100),
cleanup_interval: Duration::from_secs(10),
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_key_prefix: "test".to_string(),
..Default::default()
};
let exporter = Arc::new(CheckpointExporter::new(config.clone(), uploader));
let partition = Partition::new("test_periodic_flush_task".to_string(), 0);
let stores = store_manager.stores();
stores.insert(partition.clone(), store);
let mut manager = CheckpointManager::new(config.clone(), store_manager.clone(), None);
let mut manager =
CheckpointManager::new(config.clone(), store_manager.clone(), Some(exporter));
// Start the manager
let health_reporter = manager.start();
@@ -882,41 +961,27 @@ mod tests {
// service task threads are still healthy and running
assert!(health_reporter.unwrap().load(Ordering::SeqCst));
// the local checkpoints dir for the target topic partition
// should have produced several checkpoints by now. The expected
// parent path for checkpoints of this topic partition is this:
let expected_checkpoint_dir = Path::new(&config.local_checkpoint_dir)
.join(format!("{CHECKPOINT_TOPIC_PREFIX}{}", partition.topic()))
.join(format!(
"{CHECKPOINT_PARTITION_PREFIX}{}",
partition.partition_number()
));
// there should be lots of checkpoint files collected from
// various attempt directories of form /<base_path>/topic/partition/timestamp
let checkpoint_files =
find_local_checkpoint_files(Path::new(&expected_checkpoint_dir)).unwrap();
assert!(!checkpoint_files.is_empty());
assert!(checkpoint_files
// Verify that files were exported to the export directory
let export_files = find_local_checkpoint_files(tmp_export_dir.path()).unwrap();
assert!(!export_files.is_empty());
assert!(export_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with("CURRENT")));
assert!(checkpoint_files
assert!(export_files
.iter()
.any(|p| p.to_string_lossy().to_string().contains("MANIFEST")));
assert!(checkpoint_files
assert!(export_files
.iter()
.any(|p| p.to_string_lossy().to_string().contains("OPTIONS")));
assert!(checkpoint_files
assert!(export_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with(".sst")));
assert!(checkpoint_files
assert!(export_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with(".log")));
// there should be one or more timstamp-based checkpoint attempt directories
// of the form /<base_path>/topic/partition/timestamp depending on how
// many times the task loop ran while the test slept
let checkpoint_attempts = checkpoint_files
// there should be one or more checkpoint attempt directories in the export directory
let checkpoint_attempts = export_files
.iter()
.map(|p| p.parent().unwrap())
.collect::<HashSet<_>>();
@@ -999,23 +1064,27 @@ mod tests {
);
let tmp_checkpoint_dir = TempDir::new().unwrap();
let tmp_export_dir = TempDir::new().unwrap();
// configure frequent checkpoints and long retention, cleanup interval
// configure frequent checkpoints and long retention, cleanup interval with filesystem exporter
let uploader = Box::new(FilesystemUploader::new(tmp_export_dir.path().to_path_buf()));
let config = CheckpointConfig {
checkpoint_interval: Duration::from_millis(50),
cleanup_interval: Duration::from_secs(120),
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_key_prefix: "test".to_string(),
..Default::default()
};
let exporter = Arc::new(CheckpointExporter::new(config.clone(), uploader));
// start the manager and produce some local checkpoint files
let mut manager = CheckpointManager::new(config.clone(), store_manager.clone(), None);
// start the manager and produce some exported checkpoint files
let mut manager =
CheckpointManager::new(config.clone(), store_manager.clone(), Some(exporter));
manager.start();
tokio::time::sleep(Duration::from_millis(200)).await;
manager.stop().await;
let found_files =
find_local_checkpoint_files(Path::new(&config.local_checkpoint_dir)).unwrap();
let found_files = find_local_checkpoint_files(tmp_export_dir.path()).unwrap();
assert!(!found_files.is_empty());
// reconfigure the manager to not run checkpoints, but to clean up immediately
@@ -1067,23 +1136,27 @@ mod tests {
);
let tmp_checkpoint_dir = TempDir::new().unwrap();
let tmp_export_dir = TempDir::new().unwrap();
// configure frequent checkpoints and long retention, cleanup interval
// configure frequent checkpoints and long retention, cleanup interval with filesystem exporter
let uploader = Box::new(FilesystemUploader::new(tmp_export_dir.path().to_path_buf()));
let config = CheckpointConfig {
checkpoint_interval: Duration::from_millis(50),
cleanup_interval: Duration::from_secs(120),
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_key_prefix: "test".to_string(),
..Default::default()
};
let exporter = Arc::new(CheckpointExporter::new(config.clone(), uploader));
// start the manager and produce some local checkpoint files
let mut manager = CheckpointManager::new(config.clone(), store_manager.clone(), None);
// start the manager and produce some exported checkpoint files
let mut manager =
CheckpointManager::new(config.clone(), store_manager.clone(), Some(exporter));
manager.start();
tokio::time::sleep(Duration::from_millis(200)).await;
manager.stop().await;
let found_files =
find_local_checkpoint_files(Path::new(&config.local_checkpoint_dir)).unwrap();
let found_files = find_local_checkpoint_files(tmp_export_dir.path()).unwrap();
assert!(!found_files.is_empty());
// reconfigure the manager to not run checkpoints, but to clean up immediately
@@ -1125,18 +1198,23 @@ mod tests {
}
let tmp_checkpoint_dir = TempDir::new().unwrap();
let tmp_export_dir = TempDir::new().unwrap();
// configure moderate checkpoints with reasonable intervals
// configure moderate checkpoints with reasonable intervals and filesystem exporter
let uploader = Box::new(FilesystemUploader::new(tmp_export_dir.path().to_path_buf()));
let config = CheckpointConfig {
checkpoint_interval: Duration::from_millis(50), // Submit frequent checkpoints during test run
cleanup_interval: Duration::from_secs(30),
max_concurrent_checkpoints: 2,
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_key_prefix: "test".to_string(),
..Default::default()
};
let exporter = Arc::new(CheckpointExporter::new(config.clone(), uploader));
// start the manager and produce some local checkpoint files
let mut manager = CheckpointManager::new(config.clone(), store_manager.clone(), None);
// start the manager and produce some exported checkpoint files
let mut manager =
CheckpointManager::new(config.clone(), store_manager.clone(), Some(exporter));
manager.start();
// Give the manager time to start checkpointing
@@ -1175,8 +1253,7 @@ mod tests {
manager.stop().await;
let found_files =
find_local_checkpoint_files(Path::new(&config.local_checkpoint_dir)).unwrap();
let found_files = find_local_checkpoint_files(tmp_export_dir.path()).unwrap();
assert!(!found_files.is_empty());
}

View File

@@ -71,68 +71,128 @@ impl StatefulKafkaConsumer {
let mut metrics_interval = tokio::time::interval(Duration::from_secs(10));
loop {
tokio::select! {
// Check for shutdown signal
_ = &mut self.shutdown_rx => {
info!("Shutdown signal received, starting graceful shutdown");
break;
}
// Check capacity BEFORE polling Kafka
let permits_available = self.tracker.available_permits();
// Poll for messages
msg_result = timeout(Duration::from_secs(1), self.consumer.recv()) => {
match msg_result {
Ok(Ok(msg)) => {
// Send message to processor pool
self.send_to_processor(msg).await?;
}
Ok(Err(e)) => {
error!("Error receiving message: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(_) => {
// Timeout - continue
debug!("Consumer poll timeout");
}
if permits_available == 0 {
// No capacity - don't poll Kafka, just handle control operations
tokio::select! {
// Check for shutdown signal
_ = &mut self.shutdown_rx => {
info!("Shutdown signal received, starting graceful shutdown");
break;
}
}
// Publish metrics every 10 seconds
_ = metrics_interval.tick() => {
info!("Starting metrics publication cycle");
// Wait briefly for capacity to become available
_ = tokio::time::sleep(Duration::from_millis(50)) => {
debug!("No permits available, waiting for capacity");
continue;
}
let stats = self.tracker.get_stats().await;
let available_permits = self.tracker.available_permits();
let partition_health = self.tracker.get_partition_health().await;
// Publish metrics every 10 seconds
_ = metrics_interval.tick() => {
info!("Starting metrics publication cycle");
info!(
"Global Metrics: in_flight={}, completed={}, failed={}, memory={}MB, available_permits={}",
stats.in_flight, stats.completed, stats.failed,
stats.memory_usage / (1024 * 1024),
available_permits
);
let stats = self.tracker.get_stats().await;
let available_permits = self.tracker.available_permits();
let partition_health = self.tracker.get_partition_health().await;
// Log partition health status
for health in &partition_health {
info!(
"Partition {}-{}: last_committed={}, in_flight={}",
health.topic, health.partition,
health.last_committed_offset, health.in_flight_count
"Global Metrics: in_flight={}, completed={}, failed={}, memory={}MB, available_permits={}",
stats.in_flight, stats.completed, stats.failed,
stats.memory_usage / (1024 * 1024),
available_permits
);
// Log partition health status
for health in &partition_health {
info!(
"Partition {}-{}: last_committed={}, in_flight={}",
health.topic, health.partition,
health.last_committed_offset, health.in_flight_count
);
}
stats.publish_metrics();
// Also publish semaphore permit metrics from the tracker
metrics::gauge!(KAFKA_CONSUMER_AVAILABLE_PERMITS)
.set(available_permits as f64);
info!("Metrics published successfully");
}
stats.publish_metrics();
// Also publish semaphore permit metrics from the tracker
metrics::gauge!(KAFKA_CONSUMER_AVAILABLE_PERMITS)
.set(available_permits as f64);
info!("Metrics published successfully");
// Commit offsets periodically
_ = commit_interval.tick() => {
if let Err(e) = self.commit_offsets().await {
error!("Failed to commit offsets: {}", e);
}
}
}
} else {
// We have capacity - poll Kafka with short timeout
tokio::select! {
// Check for shutdown signal
_ = &mut self.shutdown_rx => {
info!("Shutdown signal received, starting graceful shutdown");
break;
}
// Commit offsets periodically
_ = commit_interval.tick() => {
if let Err(e) = self.commit_offsets().await {
error!("Failed to commit offsets: {}", e);
// Poll for messages
msg_result = timeout(Duration::from_millis(10), self.consumer.recv()) => {
match msg_result {
Ok(Ok(msg)) => {
// Send message to processor pool
self.send_to_processor(msg).await?;
}
Ok(Err(e)) => {
error!("Error receiving message: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(_) => {
// Timeout, this is expected when no messages available
}
}
}
// Publish metrics every 10 seconds
_ = metrics_interval.tick() => {
info!("Starting metrics publication cycle");
let stats = self.tracker.get_stats().await;
let available_permits = self.tracker.available_permits();
let partition_health = self.tracker.get_partition_health().await;
info!(
"Global Metrics: in_flight={}, completed={}, failed={}, memory={}MB, available_permits={}",
stats.in_flight, stats.completed, stats.failed,
stats.memory_usage / (1024 * 1024),
available_permits
);
// Log partition health status
for health in &partition_health {
info!(
"Partition {}-{}: last_committed={}, in_flight={}",
health.topic, health.partition,
health.last_committed_offset, health.in_flight_count
);
}
stats.publish_metrics();
// Also publish semaphore permit metrics from the tracker
metrics::gauge!(KAFKA_CONSUMER_AVAILABLE_PERMITS)
.set(available_permits as f64);
info!("Metrics published successfully");
}
// Commit offsets periodically
_ = commit_interval.tick() => {
if let Err(e) = self.commit_offsets().await {
error!("Failed to commit offsets: {}", e);
}
}
}
}

View File

@@ -8,7 +8,8 @@ use num_cpus;
use once_cell::sync::Lazy;
use rocksdb::{
checkpoint::Checkpoint, BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor,
DBWithThreadMode, MultiThreaded, Options, WriteBatch, WriteBufferManager, WriteOptions,
DBWithThreadMode, MultiThreaded, Options, SliceTransform, WriteBatch, WriteBufferManager,
WriteOptions,
};
use std::time::Instant;
@@ -76,6 +77,10 @@ fn rocksdb_options() -> Options {
opts.optimize_universal_style_compaction(512 * 1024 * 1024); // 512MB
let mut block_opts = block_based_table_factory();
// Timestamp CF
let mut ts_cf = Options::default();
ts_cf.set_block_based_table_factory(&block_opts);
ts_cf.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
// CRITICAL: Use shared block cache across all stores
block_opts.set_block_cache(&SHARED_BLOCK_CACHE);
@@ -88,7 +93,7 @@ fn rocksdb_options() -> Options {
// Reduced memory budget per store (with 50 partitions per pod)
opts.set_write_buffer_size(8 * 1024 * 1024); // Reduced to 8MB per memtable
opts.set_max_write_buffer_number(3); // Max 3 buffers = 24MB per partition
opts.set_target_file_size_base(32 * 1024 * 1024); // SST files ~32MB
opts.set_target_file_size_base(128 * 1024 * 1024); // SST files ~128MB
// Parallelism
opts.increase_parallelism(num_threads as i32);
@@ -98,6 +103,9 @@ fn rocksdb_options() -> Options {
opts.set_paranoid_checks(true);
opts.set_bytes_per_sync(1024 * 1024);
opts.set_wal_bytes_per_sync(1024 * 1024);
opts.set_use_direct_reads(true);
opts.set_use_direct_io_for_flush_and_compaction(true);
opts.set_compaction_readahead_size(2 * 1024 * 1024);
// Reduce background IO impact
opts.set_disable_auto_compactions(false);

View File

@@ -44,6 +44,24 @@ pub struct KafkaDeduplicatorService {
}
impl KafkaDeduplicatorService {
/// Reset the local checkpoint directory (remove if exists, then create fresh)
fn reset_checkpoint_directory(checkpoint_dir: &str) -> Result<()> {
let path = std::path::Path::new(checkpoint_dir);
if path.exists() {
info!("Resetting local checkpoint directory: {checkpoint_dir}");
std::fs::remove_dir_all(path).with_context(|| {
format!("Failed to remove existing checkpoint directory: {checkpoint_dir}",)
})?;
}
std::fs::create_dir_all(path)
.with_context(|| format!("Failed to create checkpoint directory: {checkpoint_dir}"))?;
info!("Local checkpoint directory ready: {checkpoint_dir}");
Ok(())
}
/// Create a new service from configuration
pub async fn new(config: Config, liveness: HealthRegistry) -> Result<Self> {
// Validate configuration
@@ -93,6 +111,9 @@ impl KafkaDeduplicatorService {
s3_timeout: config.s3_timeout(),
};
// Reset local checkpoint directory on startup (it's temporary storage)
Self::reset_checkpoint_directory(&checkpoint_config.local_checkpoint_dir)?;
// create exporter conditionally if S3 config is populated
let exporter = if !config.aws_region.is_empty() && config.s3_bucket.is_some() {
let uploader = Box::new(S3Uploader::new(checkpoint_config.clone()).await.unwrap());

View File

@@ -7,6 +7,7 @@ use common_types::RawEvent;
use rocksdb::{ColumnFamilyDescriptor, Options, SliceTransform};
use tracing::info;
use crate::metrics::MetricsHelper;
use crate::rocksdb::dedup_metadata::EventSimilarity;
use crate::rocksdb::store::{block_based_table_factory, RocksDbStore};
@@ -115,7 +116,7 @@ impl DeduplicationStore {
pub fn new(config: DeduplicationStoreConfig, topic: String, partition: i32) -> Result<Self> {
// Create metrics helper for the RocksDB store
let metrics = crate::metrics::MetricsHelper::with_partition(&topic, partition)
let metrics = MetricsHelper::with_partition(&topic, partition)
.with_label("service", "kafka-deduplicator");
let cf_descriptors = Self::get_cf_descriptors();
@@ -478,7 +479,7 @@ impl DeduplicationStore {
pub fn create_checkpoint_with_metadata<P: AsRef<std::path::Path>>(
&self,
checkpoint_path: P,
) -> Result<CheckpointMetadata> {
) -> Result<LocalCheckpointInfo> {
// Step 1: Flush WAL to ensure durability
self.store.flush_wal(true)?;
@@ -494,16 +495,16 @@ impl DeduplicationStore {
// Step 5: Create the checkpoint (RocksDB internally handles file deletion safety)
self.store.create_checkpoint(checkpoint_path)?;
Ok(CheckpointMetadata {
Ok(LocalCheckpointInfo {
sst_files,
sequence,
})
}
}
/// Metadata about a checkpoint
/// Information about a local RocksDB checkpoint
#[derive(Debug, Clone)]
pub struct CheckpointMetadata {
pub struct LocalCheckpointInfo {
/// SST files included in the checkpoint
pub sst_files: Vec<String>,
/// RocksDB sequence number at checkpoint time

View File

@@ -2,6 +2,6 @@ pub mod deduplication_store;
pub mod keys;
pub mod metadata;
pub use deduplication_store::{DeduplicationStore, DeduplicationStoreConfig};
pub use deduplication_store::{DeduplicationStore, DeduplicationStoreConfig, LocalCheckpointInfo};
pub use keys::{TimestampKey, UuidKey};
pub use metadata::{TimestampMetadata, UuidMetadata};

View File

@@ -6,8 +6,8 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use kafka_deduplicator::checkpoint::{
CheckpointConfig, CheckpointExporter, CheckpointMode, CheckpointTarget, CheckpointUploader,
CheckpointWorker, CHECKPOINT_PARTITION_PREFIX, CHECKPOINT_TOPIC_PREFIX,
CheckpointConfig, CheckpointExporter, CheckpointMode, CheckpointPlan, CheckpointTarget,
CheckpointUploader, CheckpointWorker,
};
use kafka_deduplicator::checkpoint_manager::CheckpointManager;
use kafka_deduplicator::kafka::types::Partition;
@@ -184,6 +184,38 @@ impl CheckpointUploader for MockUploader {
Ok(uploaded_keys)
}
async fn upload_checkpoint_with_plan(
&self,
plan: &CheckpointPlan,
key_prefix: &str,
) -> Result<Vec<String>> {
info!(
"Mock uploading checkpoint with plan: {} files to upload, prefix {}",
plan.files_to_upload.len(),
key_prefix
);
let mut files_to_upload = Vec::new();
for (filename, local_path) in &plan.files_to_upload {
let key = format!("{key_prefix}/{filename}");
files_to_upload.push((PathBuf::from(local_path), key));
}
let mut uploaded_keys = self.upload_files(files_to_upload).await?;
let metadata_key = format!("{key_prefix}/metadata.json");
let metadata_path = self.upload_dir.join(&metadata_key);
if let Some(parent) = metadata_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let metadata_json = serde_json::to_string_pretty(&plan.metadata)?;
tokio::fs::write(&metadata_path, metadata_json).await?;
uploaded_keys.push(metadata_key);
info!("Mock uploaded {} files with plan", uploaded_keys.len());
Ok(uploaded_keys)
}
async fn is_available(&self) -> bool {
self.available
}
@@ -216,35 +248,6 @@ fn create_test_raw_event(distinct_id: &str, token: &str, event_name: &str) -> Ra
}
}
fn find_local_checkpoint_files(base_dir: &Path) -> Result<Vec<PathBuf>> {
let mut checkpoint_files = Vec::new();
let mut stack = vec![base_dir.to_path_buf()];
while let Some(current_path) = stack.pop() {
let entries = std::fs::read_dir(&current_path)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_file() {
checkpoint_files.push(path);
} else if path.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with(CHECKPOINT_TOPIC_PREFIX)
|| name.starts_with(CHECKPOINT_PARTITION_PREFIX)
|| name.chars().filter(|c| c.is_ascii_digit()).count() == name.len()
{
stack.push(path);
}
}
}
}
}
Ok(checkpoint_files)
}
#[tokio::test]
async fn test_checkpoint_exporter_creation() {
let temp_dir = TempDir::new().unwrap();
@@ -306,7 +309,7 @@ async fn test_manual_checkpoint_export_incremental() {
// Perform checkpoint
let result = worker
.checkpoint_partition(CheckpointMode::Incremental, &store)
.checkpoint_partition(CheckpointMode::Incremental, &store, None)
.await;
assert!(result.is_ok());
@@ -314,42 +317,31 @@ async fn test_manual_checkpoint_export_incremental() {
assert!(result.is_some());
// the expected remote path will include the bucket prefix
// and the checkpoint mode path element
let expected = format!("test-prefix/incremental/{}", &target.remote_path);
let expected = format!("test-prefix/{}", &target.remote_path);
let (remote_path, _metadata) = result.as_ref().unwrap();
assert!(
result.as_ref().unwrap() == &expected,
"remote path should match {}, got: {:?}",
expected,
result.unwrap()
remote_path == &expected,
"remote path should match {expected}, got: {remote_path:?}",
);
// there should be lots of checkpoint files collected from
let local_checkpoint_files = find_local_checkpoint_files(&target.local_path).unwrap();
assert!(!local_checkpoint_files.is_empty());
// there should be lots of checkpoint files collected from
// various attempt directories of form /<base_path>/topic/partition/timestamp
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with("CURRENT")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().contains("MANIFEST")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().contains("OPTIONS")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with(".sst")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with(".log")));
let remote_checkpoint_files = uploader.get_stored_files().await.unwrap();
assert!(!remote_checkpoint_files.is_empty());
assert!(remote_checkpoint_files
.keys()
.all(|k| k.contains("test-prefix/incremental/")));
.all(|k| k.contains("test-prefix/")));
// Verify exported files contain expected RocksDB checkpoint files
assert!(remote_checkpoint_files
.keys()
.any(|k| k.ends_with("CURRENT")));
assert!(remote_checkpoint_files
.keys()
.any(|k| k.contains("MANIFEST")));
assert!(remote_checkpoint_files
.keys()
.any(|k| k.contains("OPTIONS")));
assert!(remote_checkpoint_files.keys().any(|k| k.ends_with(".sst")));
assert!(remote_checkpoint_files.keys().any(|k| k.ends_with(".log")));
}
#[tokio::test]
@@ -394,7 +386,7 @@ async fn test_checkpoint_manual_export_full() {
let worker = CheckpointWorker::new(1, target.clone(), exporter.clone());
let result = worker
.checkpoint_partition(CheckpointMode::Full, &store)
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await;
assert!(
result.is_ok(),
@@ -402,33 +394,24 @@ async fn test_checkpoint_manual_export_full() {
result.err()
);
// there should be lots of checkpoint files collected from
let local_checkpoint_files = find_local_checkpoint_files(&target.local_path).unwrap();
assert!(!local_checkpoint_files.is_empty());
// there should be lots of checkpoint files collected from
// various attempt directories of form /<base_path>/topic/partition/timestamp
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with("CURRENT")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().contains("MANIFEST")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().contains("OPTIONS")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with(".sst")));
assert!(local_checkpoint_files
.iter()
.any(|p| p.to_string_lossy().to_string().ends_with(".log")));
let remote_checkpoint_files = uploader.get_stored_files().await.unwrap();
assert!(!remote_checkpoint_files.is_empty());
assert!(remote_checkpoint_files
.keys()
.all(|k| k.contains("test-prefix/full/")));
.all(|k| k.contains("test-prefix/")));
// Verify exported files contain expected RocksDB checkpoint files
assert!(remote_checkpoint_files
.keys()
.any(|k| k.ends_with("CURRENT")));
assert!(remote_checkpoint_files
.keys()
.any(|k| k.contains("MANIFEST")));
assert!(remote_checkpoint_files
.keys()
.any(|k| k.contains("OPTIONS")));
assert!(remote_checkpoint_files.keys().any(|k| k.ends_with(".sst")));
assert!(remote_checkpoint_files.keys().any(|k| k.ends_with(".log")));
}
// TODO: incremental snapshot and export is not implemented yet, but
@@ -487,26 +470,18 @@ async fn test_incremental_vs_full_upload_serial() {
tokio::time::sleep(Duration::from_millis(250)).await;
manager.stop().await;
// eval if some full and incremental uploads were performed
// eval if multiple uploads were performed
let stored_files = uploader.get_stored_files().await.unwrap();
// Check if this was a full upload (every 3rd checkpoint)
let full_upload_paths = stored_files
.keys()
.filter(|k| k.contains("test-prefix/full/"))
.collect::<Vec<_>>();
let incremental_upload_paths = stored_files
.keys()
.filter(|k| k.contains("test-prefix/incremental/"))
.collect::<Vec<_>>();
assert!(
stored_files.len() >= 4,
"Should have performed at least four checkpoints, got {}",
stored_files.len()
);
assert!(
full_upload_paths.len() >= 2,
"Should have performed at least two full uploads"
);
assert!(
incremental_upload_paths.len() >= 2,
"Should have performed at least two incremental uploads"
stored_files.keys().all(|k| k.contains("test-prefix/")),
"All uploads should be under test-prefix/"
);
}
@@ -554,7 +529,7 @@ async fn test_unavailable_uploader() {
// The wrapper thread closure that is spawned to run this in
// production will catch and log/stat these errors
let result = worker
.checkpoint_partition(CheckpointMode::Full, &store)
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await;
assert!(result.is_err());
@@ -605,8 +580,280 @@ async fn test_unpopulated_exporter() {
// Checkpoint should still succeed even if uploader is unavailable
let result = worker
.checkpoint_partition(CheckpointMode::Full, &store)
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await;
assert!(result.is_ok()); // Should return OK result
assert!(result.unwrap().is_none()); // Should be None since no remote upload was attempted
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}
#[tokio::test]
async fn test_incremental_checkpoint_with_no_changes() {
let tmp_store_dir = TempDir::new().unwrap();
let test_topic = "test_incremental_no_changes";
let test_partition = 0;
let store = create_test_dedup_store(&tmp_store_dir, test_topic, test_partition);
let events = vec![
create_test_raw_event("user1", "token1", "event1"),
create_test_raw_event("user2", "token1", "event2"),
];
for event in &events {
let key = event.into();
let metadata = TimestampMetadata::new(event);
store.put_timestamp_record(&key, &metadata).unwrap();
}
let tmp_checkpoint_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
checkpoint_interval: Duration::from_secs(60),
cleanup_interval: Duration::from_secs(60),
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_bucket: "test-bucket".to_string(),
s3_key_prefix: "test-prefix".to_string(),
aws_region: "us-east-1".to_string(),
..Default::default()
};
let uploader = Box::new(MockUploader::new().unwrap());
let exporter = Some(Arc::new(CheckpointExporter::new(
config.clone(),
uploader.clone(),
)));
let partition = Partition::new(test_topic.to_string(), test_partition);
let target1 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker1 = CheckpointWorker::new(1, target1.clone(), exporter.clone());
let result1 = worker1
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await;
assert!(result1.is_ok());
let (remote_path1, metadata1) = result1.unwrap().unwrap();
let files_before = uploader.get_stored_files().await.unwrap();
let file_count_checkpoint1 = files_before.len();
tokio::time::sleep(Duration::from_millis(1100)).await;
let target2 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker2 = CheckpointWorker::new(2, target2.clone(), exporter.clone());
let result2 = worker2
.checkpoint_partition(CheckpointMode::Incremental, &store, Some(&metadata1))
.await;
assert!(result2.is_ok());
let (remote_path2, metadata2) = result2.unwrap().unwrap();
let files_after = uploader.get_stored_files().await.unwrap();
assert_ne!(remote_path1, remote_path2);
let new_files_count = files_after.len() - file_count_checkpoint1;
assert!(
new_files_count <= 2,
"Expected only metadata.json and maybe one other file, got {new_files_count} new files",
);
let reused_file_count = metadata2
.files
.iter()
.filter(|f| f.starts_with("../"))
.count();
assert!(
reused_file_count > 0,
"Expected some files to be reused with relative paths"
);
let total_files = metadata2.files.len();
assert!(
reused_file_count >= total_files - 5,
"Expected most files to be reused: {reused_file_count}/{total_files} reused",
);
}
#[tokio::test]
async fn test_incremental_checkpoint_with_new_data() {
let tmp_store_dir = TempDir::new().unwrap();
let test_topic = "test_incremental_with_changes";
let test_partition = 0;
let store = create_test_dedup_store(&tmp_store_dir, test_topic, test_partition);
let events1 = vec![create_test_raw_event("user1", "token1", "event1")];
for event in &events1 {
let key = event.into();
let metadata = TimestampMetadata::new(event);
store.put_timestamp_record(&key, &metadata).unwrap();
}
let tmp_checkpoint_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
checkpoint_interval: Duration::from_secs(60),
cleanup_interval: Duration::from_secs(60),
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_bucket: "test-bucket".to_string(),
s3_key_prefix: "test-prefix".to_string(),
aws_region: "us-east-1".to_string(),
..Default::default()
};
let uploader = Box::new(MockUploader::new().unwrap());
let exporter = Some(Arc::new(CheckpointExporter::new(
config.clone(),
uploader.clone(),
)));
let partition = Partition::new(test_topic.to_string(), test_partition);
let target1 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker1 = CheckpointWorker::new(1, target1.clone(), exporter.clone());
let result1 = worker1
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await;
assert!(result1.is_ok());
let (remote_path1, metadata1) = result1.unwrap().unwrap();
let files_before = uploader.get_stored_files().await.unwrap();
let file_count_checkpoint1 = files_before.len();
let events2 = vec![
create_test_raw_event("user2", "token1", "event2"),
create_test_raw_event("user3", "token1", "event3"),
create_test_raw_event("user4", "token1", "event4"),
];
for event in &events2 {
let key = event.into();
let metadata = TimestampMetadata::new(event);
store.put_timestamp_record(&key, &metadata).unwrap();
}
tokio::time::sleep(Duration::from_millis(1100)).await;
let target2 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker2 = CheckpointWorker::new(2, target2.clone(), exporter.clone());
let result2 = worker2
.checkpoint_partition(CheckpointMode::Incremental, &store, Some(&metadata1))
.await;
assert!(result2.is_ok());
let (remote_path2, metadata2) = result2.unwrap().unwrap();
let files_after = uploader.get_stored_files().await.unwrap();
assert_ne!(remote_path1, remote_path2);
let new_files_count = files_after.len() - file_count_checkpoint1;
assert!(
new_files_count >= 1,
"Expected at least metadata.json to be uploaded, got {new_files_count} new files",
);
let reused_file_count = metadata2
.files
.iter()
.filter(|f| f.starts_with("../"))
.count();
assert!(
reused_file_count > 0,
"Expected some files to be reused with relative paths"
);
let new_file_count = metadata2
.files
.iter()
.filter(|f| !f.starts_with("../"))
.count();
assert!(
new_file_count > 0,
"Expected some new files to be created due to new data"
);
assert!(
files_after.keys().any(|k| k.contains("metadata.json")),
"Expected metadata.json to be uploaded"
);
}
#[tokio::test]
async fn test_chained_incremental_checkpoints() {
let tmp_store_dir = TempDir::new().unwrap();
let test_topic = "test_chained_incremental";
let test_partition = 0;
let store = create_test_dedup_store(&tmp_store_dir, test_topic, test_partition);
let events1 = vec![create_test_raw_event("user1", "token1", "event1")];
for event in &events1 {
let key = event.into();
let metadata = TimestampMetadata::new(event);
store.put_timestamp_record(&key, &metadata).unwrap();
}
let tmp_checkpoint_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
checkpoint_interval: Duration::from_secs(60),
cleanup_interval: Duration::from_secs(60),
local_checkpoint_dir: tmp_checkpoint_dir.path().to_string_lossy().to_string(),
s3_bucket: "test-bucket".to_string(),
s3_key_prefix: "test-prefix".to_string(),
aws_region: "us-east-1".to_string(),
..Default::default()
};
let uploader = Box::new(MockUploader::new().unwrap());
let exporter = Some(Arc::new(CheckpointExporter::new(
config.clone(),
uploader.clone(),
)));
let partition = Partition::new(test_topic.to_string(), test_partition);
let target1 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker1 = CheckpointWorker::new(1, target1, exporter.clone());
let result1 = worker1
.checkpoint_partition(CheckpointMode::Full, &store, None)
.await;
assert!(result1.is_ok());
let (remote_path1, metadata1) = result1.unwrap().unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
let target2 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker2 = CheckpointWorker::new(2, target2, exporter.clone());
let result2 = worker2
.checkpoint_partition(CheckpointMode::Incremental, &store, Some(&metadata1))
.await;
assert!(result2.is_ok());
let (remote_path2, metadata2) = result2.unwrap().unwrap();
tokio::time::sleep(Duration::from_millis(1100)).await;
let target3 =
CheckpointTarget::new(partition.clone(), Path::new(&config.local_checkpoint_dir)).unwrap();
let worker3 = CheckpointWorker::new(3, target3, exporter.clone());
let result3 = worker3
.checkpoint_partition(CheckpointMode::Incremental, &store, Some(&metadata2))
.await;
assert!(result3.is_ok());
let (remote_path3, metadata3) = result3.unwrap().unwrap();
assert_ne!(remote_path1, remote_path2);
assert_ne!(remote_path2, remote_path3);
assert_ne!(remote_path1, remote_path3);
let files_referencing_checkpoint1 = metadata3
.files
.iter()
.filter(|f| f.contains(&metadata1.id))
.count();
assert!(
files_referencing_checkpoint1 > 0,
"Expected some files in checkpoint 3 to reference checkpoint 1"
);
}