mirror of
https://github.com/langchain-ai/delta-rs.git
synced 2026-07-01 20:34:35 -04:00
chore: upgrade azurite and purge the need for a local az CLI to run tests
Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
This commit is contained in:
@@ -52,16 +52,6 @@ jobs:
|
||||
toolchain: stable
|
||||
cache: true
|
||||
|
||||
- name: Pin azure cli version
|
||||
if: matrix.target.name == 'azure'
|
||||
# as of 24-01-2026 we need to downgrade azure cli since the azuraite container
|
||||
# does not support the latest api versions used by the latest azure cli.
|
||||
run: |
|
||||
apt-cache policy azure-cli
|
||||
AZ_DIST=$(lsb_release -cs)
|
||||
AZ_VER=2.81.0
|
||||
sudo apt-get install azure-cli=${AZ_VER}-1~${AZ_DIST} --allow-downgrades
|
||||
|
||||
- name: Install cargo-llvm-cov
|
||||
uses: taiki-e/install-action@cargo-llvm-cov
|
||||
|
||||
|
||||
@@ -31,6 +31,9 @@ deltalake-test = { path = "../test" }
|
||||
pretty_env_logger = "0.5.0"
|
||||
rand = "0.8"
|
||||
serde_json = { workspace = true }
|
||||
azure_storage_blobs = { version = "0.21.0" }
|
||||
futures.workspace = true
|
||||
azure_storage = "0.21.0"
|
||||
|
||||
[features]
|
||||
integration_test = []
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use azure_storage_blobs::prelude::*;
|
||||
use chrono::Utc;
|
||||
use deltalake_azure::register_handlers;
|
||||
use deltalake_test::utils::*;
|
||||
use std::path::PathBuf;
|
||||
use std::process::ExitStatus;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
/// Kinds of storage integration
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -20,19 +23,36 @@ impl Default for MsftIntegration {
|
||||
|
||||
impl StorageIntegration for MsftIntegration {
|
||||
fn prepare_env(&self) {
|
||||
match self {
|
||||
Self::Azure(_) => az_cli::prepare_env(),
|
||||
Self::Onelake => onelake_cli::prepare_env(),
|
||||
Self::OnelakeAbfs => onelake_cli::prepare_env(),
|
||||
}
|
||||
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "1");
|
||||
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "devstoreaccount1");
|
||||
set_env_if_not_set(
|
||||
"AZURE_STORAGE_ACCOUNT_KEY",
|
||||
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
|
||||
);
|
||||
set_env_if_not_set(
|
||||
"AZURE_STORAGE_CONNECTION_STRING",
|
||||
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;",
|
||||
);
|
||||
}
|
||||
|
||||
fn create_bucket(&self) -> std::io::Result<ExitStatus> {
|
||||
match self {
|
||||
Self::Azure(_) => az_cli::create_container(self.bucket_name()),
|
||||
Self::Onelake => Ok(ExitStatus::default()),
|
||||
Self::OnelakeAbfs => Ok(ExitStatus::default()),
|
||||
}
|
||||
let name = self.bucket_name();
|
||||
let container_client = ClientBuilder::emulator().container_client(name.clone());
|
||||
|
||||
let handle = Handle::current();
|
||||
tokio::task::block_in_place(move || {
|
||||
handle.block_on(async {
|
||||
if !container_client.exists().await.unwrap() {
|
||||
container_client
|
||||
.create()
|
||||
.await
|
||||
.expect("Failed to make the container");
|
||||
} else {
|
||||
println!("Container {name} already exists at start of test!");
|
||||
}
|
||||
})
|
||||
});
|
||||
Ok(ExitStatus::default())
|
||||
}
|
||||
|
||||
fn bucket_name(&self) -> String {
|
||||
@@ -60,99 +80,74 @@ impl StorageIntegration for MsftIntegration {
|
||||
}
|
||||
|
||||
fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result<ExitStatus> {
|
||||
let destination = format!("{}/{destination}", self.bucket_name());
|
||||
az_cli::copy_directory(source, destination)
|
||||
let name = self.bucket_name();
|
||||
println!("copy from {source} to {name}/{destination}");
|
||||
let container_client = ClientBuilder::emulator().container_client(name.clone());
|
||||
|
||||
async fn upload_folder(
|
||||
source: PathBuf,
|
||||
to: &str,
|
||||
client: &ContainerClient,
|
||||
) -> std::io::Result<()> {
|
||||
for entry in std::fs::read_dir(source).expect("Failed to read dir") {
|
||||
let entry = entry?;
|
||||
let file_type = entry.file_type()?;
|
||||
let destination = format!(
|
||||
"{to}/{}",
|
||||
entry
|
||||
.file_name()
|
||||
.into_string()
|
||||
.expect("Failed to turn file_name into a string")
|
||||
);
|
||||
|
||||
if file_type.is_dir() {
|
||||
println!("uploading folder {destination}");
|
||||
Box::pin(upload_folder(entry.path(), &destination, client))
|
||||
.await
|
||||
.expect("Failed to upload directory");
|
||||
} else if file_type.is_file() {
|
||||
let blob_client = client.blob_client(&destination);
|
||||
println!("putting entry: {:?} into {destination}", entry.file_name());
|
||||
let body = std::fs::read(entry.path()).expect("Failed to read file");
|
||||
let r = blob_client
|
||||
.put_block_blob(body)
|
||||
.await
|
||||
.expect("Failed to upload file");
|
||||
println!("upload result: {r:?}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let handle = Handle::current();
|
||||
tokio::task::block_in_place(move || {
|
||||
handle.block_on(async {
|
||||
upload_folder(source.into(), destination, &container_client)
|
||||
.await
|
||||
.expect("Failed to upload root");
|
||||
})
|
||||
});
|
||||
Ok(ExitStatus::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MsftIntegration {
|
||||
fn drop(&mut self) {
|
||||
az_cli::delete_container(self.bucket_name()).expect("Failed to drop bucket");
|
||||
}
|
||||
}
|
||||
|
||||
//cli for onelake
|
||||
mod onelake_cli {
|
||||
use super::set_env_if_not_set;
|
||||
/// prepare_env
|
||||
pub fn prepare_env() {
|
||||
let token = "jwt-token";
|
||||
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0");
|
||||
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake");
|
||||
set_env_if_not_set(
|
||||
"AZURE_STORAGE_CONTAINER_NAME",
|
||||
"86bc63cf-5086-42e0-b16d-6bc580d1dc87",
|
||||
);
|
||||
set_env_if_not_set("AZURE_STORAGE_TOKEN", token);
|
||||
}
|
||||
}
|
||||
|
||||
/// small wrapper around az cli
|
||||
mod az_cli {
|
||||
use super::set_env_if_not_set;
|
||||
use std::process::{Command, ExitStatus};
|
||||
|
||||
/// Create a new bucket
|
||||
pub fn create_container(container_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
|
||||
let mut child = Command::new("az")
|
||||
.args([
|
||||
"storage",
|
||||
"container",
|
||||
"create",
|
||||
"-n",
|
||||
container_name.as_ref(),
|
||||
])
|
||||
.spawn()
|
||||
.expect("az command is installed");
|
||||
child.wait()
|
||||
}
|
||||
|
||||
/// delete bucket
|
||||
pub fn delete_container(container_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
|
||||
let mut child = Command::new("az")
|
||||
.args([
|
||||
"storage",
|
||||
"container",
|
||||
"delete",
|
||||
"-n",
|
||||
container_name.as_ref(),
|
||||
])
|
||||
.spawn()
|
||||
.expect("az command is installed");
|
||||
child.wait()
|
||||
}
|
||||
|
||||
/// copy directory
|
||||
pub fn copy_directory(
|
||||
source: impl AsRef<str>,
|
||||
destination: impl AsRef<str>,
|
||||
) -> std::io::Result<ExitStatus> {
|
||||
let mut child = Command::new("az")
|
||||
.args([
|
||||
"storage",
|
||||
"blob",
|
||||
"upload-batch",
|
||||
"-s",
|
||||
source.as_ref(),
|
||||
"-d",
|
||||
destination.as_ref(),
|
||||
])
|
||||
.spawn()
|
||||
.expect("az command is installed");
|
||||
child.wait()
|
||||
}
|
||||
|
||||
/// prepare_env
|
||||
pub fn prepare_env() {
|
||||
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "1");
|
||||
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "devstoreaccount1");
|
||||
set_env_if_not_set(
|
||||
"AZURE_STORAGE_ACCOUNT_KEY",
|
||||
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
|
||||
);
|
||||
set_env_if_not_set(
|
||||
"AZURE_STORAGE_CONNECTION_STRING",
|
||||
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;",
|
||||
);
|
||||
let name = self.bucket_name();
|
||||
let container_client = ClientBuilder::emulator().container_client(name.clone());
|
||||
|
||||
let handle = Handle::current();
|
||||
tokio::task::block_in_place(move || {
|
||||
handle.block_on(async {
|
||||
if container_client.exists().await.unwrap() {
|
||||
container_client
|
||||
.delete()
|
||||
.await
|
||||
.expect("Failed to delete the container");
|
||||
} else {
|
||||
println!("Container named {name} doesn't exist, so nothing to delete");
|
||||
}
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];
|
||||
/// TEST_PREFIXES as they should appear in object stores.
|
||||
static TEST_PREFIXES_ENCODED: &[&str] = &["my table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"];
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_read_tables_azure() -> TestResult {
|
||||
let context = IntegrationContext::new(Box::new(MsftIntegration::default()))?;
|
||||
@@ -96,9 +96,9 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn list_delta_tables_using_listing_provider_with_missing_account_name() -> TestResult {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn list_delta_tables_using_listing_provider_with_missing_account_name() -> TestResult {
|
||||
let context = IntegrationContext::new(Box::new(MsftIntegration::default()))?;
|
||||
unsafe {
|
||||
// Removing the envs set by the `IntegrationContext (az_cli::prepare_env())` to illustrate the issue if e.g. account_name is not set from custom `storage_options`, but still preserving the use of the `IntegrationContext`
|
||||
@@ -117,8 +117,8 @@ fn list_delta_tables_using_listing_provider_with_missing_account_name() -> TestR
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn list_delta_tables_using_listing_provider_with_account_name() -> TestResult {
|
||||
let context = IntegrationContext::new(Box::new(MsftIntegration::default()))?;
|
||||
unsafe {
|
||||
|
||||
@@ -87,7 +87,9 @@ impl IntegrationContext {
|
||||
|
||||
let tmp_dir = tempdir()?;
|
||||
// create a fresh bucket in every context. THis is done via CLI...
|
||||
integration.create_bucket()?;
|
||||
integration
|
||||
.create_bucket()
|
||||
.expect("Failed to create the bucket!");
|
||||
let store = integration.object_store()?;
|
||||
let bucket = integration.bucket_name();
|
||||
|
||||
|
||||
+1
-1
@@ -33,6 +33,6 @@ services:
|
||||
- 4443:4443
|
||||
|
||||
azurite:
|
||||
image: mcr.microsoft.com/azure-storage/azurite:3.35.0
|
||||
image: mcr.microsoft.com/azure-storage/azurite:latest
|
||||
ports:
|
||||
- 10000:10000
|
||||
|
||||
Reference in New Issue
Block a user