From 872debeaeb4b6b361e5c20e45a80a22b37f0eaa2 Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Fri, 13 Sep 2024 18:11:01 +0800 Subject: [PATCH] [fix]: Fix azure connection and clean up test (#150) issue: #148 --- .../filesystem/azure/azure_fs.h | 11 +- .../milvus-storage/filesystem/s3/s3_fs.h | 3 +- cpp/include/milvus-storage/filter/filter.h | 2 +- .../packed/column_group_writer.h | 2 +- cpp/include/milvus-storage/packed/reader.h | 7 +- .../milvus-storage/packed/utils/config.h | 4 + .../reader/common/delete_reader.h | 2 +- cpp/src/filesystem/fs.cpp | 3 +- cpp/src/packed/chunk_manager.cpp | 6 +- cpp/src/packed/column_group_writer.cpp | 2 + cpp/test/packed/one_file_test.cpp | 95 +-------------- cpp/test/packed/packed_integration_test.cpp | 95 +-------------- cpp/test/packed/packed_test_base.h | 113 +++++++++++++++++- 13 files changed, 145 insertions(+), 200 deletions(-) diff --git a/cpp/include/milvus-storage/filesystem/azure/azure_fs.h b/cpp/include/milvus-storage/filesystem/azure/azure_fs.h index df2a41c1..9f113314 100644 --- a/cpp/include/milvus-storage/filesystem/azure/azure_fs.h +++ b/cpp/include/milvus-storage/filesystem/azure/azure_fs.h @@ -24,18 +24,23 @@ namespace milvus_storage { class AzureFileSystemProducer : public FileSystemProducer { public: - AzureFileSystemProducer() {}; + AzureFileSystemProducer(){}; Result> Make(const std::string& uri, std::string* out_path) override { arrow::util::Uri uri_parser; RETURN_ARROW_NOT_OK(uri_parser.Parse(uri)); arrow::fs::AzureOptions options; - options.account_name = std::getenv("AZURE_STORAGE_ACCOUNT"); + auto account = std::getenv("AZURE_STORAGE_ACCOUNT"); + auto key = std::getenv("AZURE_SECRET_KEY"); + if (account == nullptr || key == nullptr) { + return Status::InvalidArgument("Please provide azure storage account and azure secret key"); + } + options.account_name = account; RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(std::getenv("AZURE_SECRET_KEY"))); - *out_path = std::getenv("FILE_PATH"); ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options)); + fs->CreateDir(*out_path); return std::shared_ptr(fs); } }; diff --git a/cpp/include/milvus-storage/filesystem/s3/s3_fs.h b/cpp/include/milvus-storage/filesystem/s3/s3_fs.h index a3d642a4..baa85050 100644 --- a/cpp/include/milvus-storage/filesystem/s3/s3_fs.h +++ b/cpp/include/milvus-storage/filesystem/s3/s3_fs.h @@ -25,7 +25,7 @@ namespace milvus_storage { class S3FileSystemProducer : public FileSystemProducer { public: - S3FileSystemProducer() {}; + S3FileSystemProducer(){}; Result> Make(const std::string& uri, std::string* out_path) override { arrow::util::Uri uri_parser; @@ -45,7 +45,6 @@ class S3FileSystemProducer : public FileSystemProducer { arrow::fs::S3Options options; options.endpoint_override = uri_parser.ToString(); options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY")); - *out_path = std::getenv("FILE_PATH"); if (std::getenv("REGION") != nullptr) { options.region = std::getenv("REGION"); diff --git a/cpp/include/milvus-storage/filter/filter.h b/cpp/include/milvus-storage/filter/filter.h index 65681798..7f9b184a 100644 --- a/cpp/include/milvus-storage/filter/filter.h +++ b/cpp/include/milvus-storage/filter/filter.h @@ -46,7 +46,7 @@ class Filter { return Status::OK(); } - virtual ~Filter() {}; + virtual ~Filter(){}; protected: std::string column_name_; diff --git a/cpp/include/milvus-storage/packed/column_group_writer.h b/cpp/include/milvus-storage/packed/column_group_writer.h index f24196db..478fefea 100644 --- a/cpp/include/milvus-storage/packed/column_group_writer.h +++ b/cpp/include/milvus-storage/packed/column_group_writer.h @@ -52,7 +52,7 @@ class ColumnGroupWriter { ColumnGroup column_group_; int flushed_batches_; int flushed_count_; - int64_t flushed_rows_; + int flushed_rows_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index 3aab931f..d67137f5 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -16,6 +16,7 @@ #include #include +#include "packed/utils/config.h" #include #include #include @@ -37,10 +38,6 @@ struct RowOffsetComparator { using RowOffsetMinHeap = std::priority_queue, std::vector>, RowOffsetComparator>; -// Default number of rows to read when using ::arrow::RecordBatchReader -static constexpr int64_t DefaultBatchSize = 1024; -static constexpr int64_t DefaultBufferSize = 16 * 1024 * 1024; - class PackedRecordBatchReader : public arrow::RecordBatchReader { public: PackedRecordBatchReader(arrow::fs::FileSystem& fs, @@ -48,7 +45,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { const std::shared_ptr schema, const std::vector& column_offsets, const std::set& needed_columns, - const int64_t buffer_size = DefaultBufferSize); + const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); std::shared_ptr schema() const override; diff --git a/cpp/include/milvus-storage/packed/utils/config.h b/cpp/include/milvus-storage/packed/utils/config.h index 127eb06f..c6cab2da 100644 --- a/cpp/include/milvus-storage/packed/utils/config.h +++ b/cpp/include/milvus-storage/packed/utils/config.h @@ -28,4 +28,8 @@ static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; +// Default number of rows to read when using ::arrow::RecordBatchReader +static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024; +static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; + } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/reader/common/delete_reader.h b/cpp/include/milvus-storage/reader/common/delete_reader.h index 2f5d00ad..f3b82fc4 100644 --- a/cpp/include/milvus-storage/reader/common/delete_reader.h +++ b/cpp/include/milvus-storage/reader/common/delete_reader.h @@ -75,7 +75,7 @@ class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor { explicit DeleteFilterVisitor(DeleteFragmentVector delete_fragments, std::shared_ptr version_col = nullptr, int64_t version = -1) - : version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)), version_(version) {}; + : version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)), version_(version){}; arrow::Status Visit(const arrow::Int64Array& array) override; arrow::Status Visit(const arrow::StringArray& array) override; diff --git a/cpp/src/filesystem/fs.cpp b/cpp/src/filesystem/fs.cpp index 11256c74..8b0586f9 100644 --- a/cpp/src/filesystem/fs.cpp +++ b/cpp/src/filesystem/fs.cpp @@ -22,7 +22,8 @@ namespace milvus_storage { -Result> FileSystemFactory::BuildFileSystem(const std::string& uri, std::string* out_path) { +Result> FileSystemFactory::BuildFileSystem(const std::string& uri, + std::string* out_path) { arrow::util::Uri uri_parser; RETURN_ARROW_NOT_OK(uri_parser.Parse(uri)); auto scheme = uri_parser.scheme(); diff --git a/cpp/src/packed/chunk_manager.cpp b/cpp/src/packed/chunk_manager.cpp index f0c4ec83..d3ddc2aa 100644 --- a/cpp/src/packed/chunk_manager.cpp +++ b/cpp/src/packed/chunk_manager.cpp @@ -17,8 +17,10 @@ #include #include #include "common/log.h" -#include "packed/reader.h" +#include "packed/utils/config.h" #include "packed/chunk_manager.h" +#include +#include namespace milvus_storage { @@ -30,7 +32,7 @@ ChunkManager::ChunkManager(const std::vector& column_offsets, int6 std::vector> ChunkManager::SliceChunksByMaxContiguousSlice( int64_t chunksize, std::vector>>& tables) { // Determine the maximum contiguous slice across all tables) - SetChunkSize(std::min(chunksize, DefaultBatchSize)); + SetChunkSize(std::min(chunksize, DEFAULT_READ_BATCH_SIZE)); std::vector chunks(column_offsets_.size()); std::vector chunk_sizes(column_offsets_.size()); std::set table_to_pop; diff --git a/cpp/src/packed/column_group_writer.cpp b/cpp/src/packed/column_group_writer.cpp index 3812ad0d..ff073216 100644 --- a/cpp/src/packed/column_group_writer.cpp +++ b/cpp/src/packed/column_group_writer.cpp @@ -42,6 +42,8 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, : group_id_(group_id), writer_(schema, fs, file_path, props), column_group_(group_id, origin_column_indices), + flushed_batches_(0), + flushed_rows_(0), finished_(false) {} Status ColumnGroupWriter::Init() { return writer_.Init(); } diff --git a/cpp/test/packed/one_file_test.cpp b/cpp/test/packed/one_file_test.cpp index 16704343..c0d6cc7d 100644 --- a/cpp/test/packed/one_file_test.cpp +++ b/cpp/test/packed/one_file_test.cpp @@ -12,117 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "test_util.h" -#include "filesystem/fs.h" #include "packed_test_base.h" namespace milvus_storage { -class OneFileTest : public PackedTestBase { - protected: - OneFileTest() : props_(*parquet::default_writer_properties()) {} - - void SetUp() override { - const char* access_key = std::getenv("ACCESS_KEY"); - const char* secret_key = std::getenv("SECRET_KEY"); - const char* endpoint_url = std::getenv("S3_ENDPOINT_URL"); - const char* file_path = std::getenv("FILE_PATH"); - std::string uri = "file:///tmp/"; - if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) { - uri = endpoint_url; - } - auto factory = std::make_shared(); - ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(uri, &file_path_)); - - SetUpCommonData(); - props_ = *parquet::default_writer_properties(); - writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB memory is for s3fs part upload - reader_memory_ = 16 * 1024 * 1024; // 16 MB memory for reading - } - - void TearDown() override { fs_->DeleteDir(file_path_); } - - size_t writer_memory_; - size_t reader_memory_; - std::shared_ptr fs_; - std::string file_path_; - parquet::WriterProperties props_; - const int bath_size = 100; -}; +class OneFileTest : public PackedTestBase {}; TEST_F(OneFileTest, WriteAndRead) { - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_); - for (int i = 0; i < bath_size; ++i) { - EXPECT_TRUE(writer.Write(record_batch_).ok()); - } - EXPECT_TRUE(writer.Close().ok()); + int batch_size = 100; - std::set needed_columns = {0, 1, 2}; std::vector column_offsets = { ColumnOffset(0, 0), ColumnOffset(0, 1), ColumnOffset(0, 2), }; - auto paths = std::vector{file_path_ + "/0"}; + std::vector paths = {file_path_ + "/0"}; - // after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file std::vector> fields = { arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), arrow::field("str", arrow::utf8()), }; - auto new_schema = arrow::schema(fields); - - PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_); - ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); - ASSERT_STATUS_OK(pr.Close()); - - int64_t total_rows = table->num_rows(); - - auto chunks = table->GetColumnByName("str"); - int64_t count = 0; - for (int i = 0; i < chunks->num_chunks(); ++i) { - auto str_array = std::dynamic_pointer_cast(chunks->chunk(i)); - for (int j = 0; j < str_array->length(); ++j) { - int expected_index = (count++) % str_values.size(); - ASSERT_EQ(str_array->GetString(j), str_values[expected_index]); - } - } - - chunks = table->GetColumnByName("int32"); - count = 0; - for (int i = 0; i < chunks->num_chunks(); ++i) { - auto int32_array = std::dynamic_pointer_cast(chunks->chunk(i)); - for (int j = 0; j < int32_array->length(); ++j) { - int expected_index = (count++) % int32_values.size(); - ASSERT_EQ(int32_array->Value(j), int32_values[expected_index]); - } - } - chunks = table->GetColumnByName("int64"); - count = 0; - for (int i = 0; i < chunks->num_chunks(); ++i) { - auto int64_array = std::dynamic_pointer_cast(chunks->chunk(i)); - for (int j = 0; j < int64_array->length(); ++j) { - int expected_index = (count++) % int64_values.size(); - ASSERT_EQ(int64_array->Value(j), int64_values[expected_index]); - } - } + TestWriteAndRead(batch_size, paths, fields, column_offsets); } } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index f047372e..e6daf9c8 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -12,117 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "test_util.h" -#include "filesystem/fs.h" #include "packed_test_base.h" namespace milvus_storage { -class PackedIntegrationTest : public PackedTestBase { - protected: - PackedIntegrationTest() : props_(*parquet::default_writer_properties()) {} - - void SetUp() override { - const char* access_key = std::getenv("ACCESS_KEY"); - const char* secret_key = std::getenv("SECRET_KEY"); - const char* endpoint_url = std::getenv("S3_ENDPOINT_URL"); - const char* file_path = std::getenv("FILE_PATH"); - std::string uri = "file:///tmp/"; - if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) { - uri = endpoint_url; - } - auto factory = std::make_shared(); - ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(uri, &file_path_)); - - SetUpCommonData(); - props_ = *parquet::default_writer_properties(); - writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB memory is for s3fs part upload - reader_memory_ = 16 * 1024 * 1024; // 16 MB memory for reading - } - - void TearDown() override { fs_->DeleteDir(file_path_); } - - size_t writer_memory_; - size_t reader_memory_; - std::shared_ptr fs_; - std::string file_path_; - parquet::WriterProperties props_; - const int bath_size = 100000; -}; +class PackedIntegrationTest : public PackedTestBase {}; TEST_F(PackedIntegrationTest, WriteAndRead) { - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_); - for (int i = 0; i < bath_size; ++i) { - EXPECT_TRUE(writer.Write(record_batch_).ok()); - } - EXPECT_TRUE(writer.Close().ok()); + int batch_size = 100000; - std::set needed_columns = {0, 1, 2}; std::vector column_offsets = { ColumnOffset(0, 0), ColumnOffset(1, 0), ColumnOffset(1, 1), }; - auto paths = std::vector{file_path_ + "/0", file_path_ + "/1"}; + std::vector paths = {file_path_ + "/0", file_path_ + "/1"}; - // after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file std::vector> fields = { arrow::field("str", arrow::utf8()), arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), }; - auto new_schema = arrow::schema(fields); - - PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_); - ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); - ASSERT_STATUS_OK(pr.Close()); - - int64_t total_rows = table->num_rows(); - - auto chunks = table->GetColumnByName("str"); - int64_t count = 0; - for (int i = 0; i < chunks->num_chunks(); ++i) { - auto str_array = std::dynamic_pointer_cast(chunks->chunk(i)); - for (int j = 0; j < str_array->length(); ++j) { - int expected_index = (count++) % str_values.size(); - ASSERT_EQ(str_array->GetString(j), str_values[expected_index]); - } - } - - chunks = table->GetColumnByName("int32"); - count = 0; - for (int i = 0; i < chunks->num_chunks(); ++i) { - auto int32_array = std::dynamic_pointer_cast(chunks->chunk(i)); - for (int j = 0; j < int32_array->length(); ++j) { - int expected_index = (count++) % int32_values.size(); - ASSERT_EQ(int32_array->Value(j), int32_values[expected_index]); - } - } - chunks = table->GetColumnByName("int64"); - count = 0; - for (int i = 0; i < chunks->num_chunks(); ++i) { - auto int64_array = std::dynamic_pointer_cast(chunks->chunk(i)); - for (int j = 0; j < int64_array->length(); ++j) { - int expected_index = (count++) % int64_values.size(); - ASSERT_EQ(int64_array->Value(j), int64_values[expected_index]); - } - } + TestWriteAndRead(batch_size, paths, fields, column_offsets); } } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 485478d0..ef5b9de1 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -12,16 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include #include #include #include -#include #include #include #include -#include +#include +#include #include "arrow/table.h" + +#include "test_util.h" +#include "filesystem/fs.h" #include "common/log.h" + +#include +#include +#include +#include +#include #include #include @@ -31,6 +44,96 @@ namespace milvus_storage { class PackedTestBase : public ::testing::Test { protected: + PackedTestBase() : props_(*parquet::default_writer_properties()) {} + + void SetUp() override { + const char* access_key = std::getenv("ACCESS_KEY"); + const char* secret_key = std::getenv("SECRET_KEY"); + const char* endpoint_url = std::getenv("S3_ENDPOINT_URL"); + const char* env_file_path = std::getenv("FILE_PATH"); + + std::string uri = "file:///tmp/"; + if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && env_file_path != nullptr) { + uri = endpoint_url; + } + + file_path_ = GenerateUniqueFilePath(env_file_path); + + auto factory = std::make_shared(); + ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(uri, &file_path_)); + + SetUpCommonData(); + props_ = *parquet::default_writer_properties(); + writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB for S3FS part upload + reader_memory_ = 16 * 1024 * 1024; // 16 MB for reading + } + + void TearDown() override { + if (fs_ != nullptr) { + fs_->DeleteDir(file_path_); + } + } + + std::string GenerateUniqueFilePath(const char* base_path) { + std::string path = base_path != nullptr ? base_path : "/tmp/default_path"; + path += "-" + std::to_string(std::time(nullptr)); + return path; + } + + void TestWriteAndRead(int batch_size, + const std::vector& paths, + const std::vector>& fields, + const std::vector& column_offsets) { + PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_TRUE(writer.Write(record_batch_).ok()); + } + EXPECT_TRUE(writer.Close().ok()); + + std::set needed_columns = {0, 1, 2}; + auto new_schema = arrow::schema(fields); + + PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); + ASSERT_STATUS_OK(pr.Close()); + + ValidateTableData(table); + } + + void ValidateTableData(const std::shared_ptr& table) { + int64_t total_rows = table->num_rows(); + + auto chunks = table->GetColumnByName("str"); + int64_t count = 0; + for (int i = 0; i < chunks->num_chunks(); ++i) { + auto str_array = std::dynamic_pointer_cast(chunks->chunk(i)); + for (int j = 0; j < str_array->length(); ++j) { + int expected_index = (count++) % str_values.size(); + ASSERT_EQ(str_array->GetString(j), str_values[expected_index]); + } + } + + chunks = table->GetColumnByName("int32"); + count = 0; + for (int i = 0; i < chunks->num_chunks(); ++i) { + auto int32_array = std::dynamic_pointer_cast(chunks->chunk(i)); + for (int j = 0; j < int32_array->length(); ++j) { + int expected_index = (count++) % int32_values.size(); + ASSERT_EQ(int32_array->Value(j), int32_values[expected_index]); + } + } + + chunks = table->GetColumnByName("int64"); + count = 0; + for (int i = 0; i < chunks->num_chunks(); ++i) { + auto int64_array = std::dynamic_pointer_cast(chunks->chunk(i)); + for (int j = 0; j < int64_array->length(); ++j) { + int expected_index = (count++) % int64_values.size(); + ASSERT_EQ(int64_array->Value(j), int64_values[expected_index]); + } + } + } + void SetUpCommonData() { record_batch_ = randomRecordBatch(); table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie(); @@ -79,6 +182,12 @@ class PackedTestBase : public ::testing::Test { return str; } + size_t writer_memory_; + size_t reader_memory_; + std::shared_ptr fs_; + std::string file_path_; + parquet::WriterProperties props_; + std::shared_ptr schema_; std::shared_ptr record_batch_; std::shared_ptr table_;