Skip to content

Commit

Permalink
[fix]: Fix azure connection and clean up test (#150)
Browse files Browse the repository at this point in the history
issue: #148
  • Loading branch information
shaoting-huang authored Sep 13, 2024
1 parent 0809937 commit 872debe
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 200 deletions.
11 changes: 8 additions & 3 deletions cpp/include/milvus-storage/filesystem/azure/azure_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@ namespace milvus_storage {

class AzureFileSystemProducer : public FileSystemProducer {
public:
AzureFileSystemProducer() {};
AzureFileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const std::string& uri, std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));

arrow::fs::AzureOptions options;
options.account_name = std::getenv("AZURE_STORAGE_ACCOUNT");
auto account = std::getenv("AZURE_STORAGE_ACCOUNT");
auto key = std::getenv("AZURE_SECRET_KEY");
if (account == nullptr || key == nullptr) {
return Status::InvalidArgument("Please provide azure storage account and azure secret key");
}
options.account_name = account;
RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(std::getenv("AZURE_SECRET_KEY")));
*out_path = std::getenv("FILE_PATH");

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options));
fs->CreateDir(*out_path);
return std::shared_ptr<arrow::fs::FileSystem>(fs);
}
};
Expand Down
3 changes: 1 addition & 2 deletions cpp/include/milvus-storage/filesystem/s3/s3_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace milvus_storage {

class S3FileSystemProducer : public FileSystemProducer {
public:
S3FileSystemProducer() {};
S3FileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const std::string& uri, std::string* out_path) override {
arrow::util::Uri uri_parser;
Expand All @@ -45,7 +45,6 @@ class S3FileSystemProducer : public FileSystemProducer {
arrow::fs::S3Options options;
options.endpoint_override = uri_parser.ToString();
options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY"));
*out_path = std::getenv("FILE_PATH");

if (std::getenv("REGION") != nullptr) {
options.region = std::getenv("REGION");
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/filter/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Filter {
return Status::OK();
}

virtual ~Filter() {};
virtual ~Filter(){};

protected:
std::string column_name_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ColumnGroupWriter {
ColumnGroup column_group_;
int flushed_batches_;
int flushed_count_;
int64_t flushed_rows_;
int flushed_rows_;
};

} // namespace milvus_storage
7 changes: 2 additions & 5 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <packed/chunk_manager.h>
#include <packed/column_group.h>
#include "packed/utils/config.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand All @@ -37,18 +38,14 @@ struct RowOffsetComparator {
using RowOffsetMinHeap =
std::priority_queue<std::pair<int, int>, std::vector<std::pair<int, int>>, RowOffsetComparator>;

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DefaultBatchSize = 1024;
static constexpr int64_t DefaultBufferSize = 16 * 1024 * 1024;

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size = DefaultBufferSize);
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

std::shared_ptr<arrow::Schema> schema() const override;

Expand Down
4 changes: 4 additions & 0 deletions cpp/include/milvus-storage/packed/utils/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE +

static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

} // namespace milvus_storage
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/reader/common/delete_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor {
explicit DeleteFilterVisitor(DeleteFragmentVector delete_fragments,
std::shared_ptr<arrow::Int64Array> version_col = nullptr,
int64_t version = -1)
: version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)), version_(version) {};
: version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)), version_(version){};

arrow::Status Visit(const arrow::Int64Array& array) override;
arrow::Status Visit(const arrow::StringArray& array) override;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/filesystem/fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

namespace milvus_storage {

Result<std::shared_ptr<arrow::fs::FileSystem>> FileSystemFactory::BuildFileSystem(const std::string& uri, std::string* out_path) {
Result<std::shared_ptr<arrow::fs::FileSystem>> FileSystemFactory::BuildFileSystem(const std::string& uri,
std::string* out_path) {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
auto scheme = uri_parser.scheme();
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/packed/chunk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include <arrow/table.h>
#include <parquet/properties.h>
#include "common/log.h"
#include "packed/reader.h"
#include "packed/utils/config.h"
#include "packed/chunk_manager.h"
#include <set>
#include <queue>

namespace milvus_storage {

Expand All @@ -30,7 +32,7 @@ ChunkManager::ChunkManager(const std::vector<ColumnOffset>& column_offsets, int6
std::vector<std::shared_ptr<arrow::ArrayData>> ChunkManager::SliceChunksByMaxContiguousSlice(
int64_t chunksize, std::vector<std::queue<std::shared_ptr<arrow::Table>>>& tables) {
// Determine the maximum contiguous slice across all tables)
SetChunkSize(std::min(chunksize, DefaultBatchSize));
SetChunkSize(std::min(chunksize, DEFAULT_READ_BATCH_SIZE));
std::vector<const arrow::Array*> chunks(column_offsets_.size());
std::vector<int> chunk_sizes(column_offsets_.size());
std::set<int> table_to_pop;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/packed/column_group_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
: group_id_(group_id),
writer_(schema, fs, file_path, props),
column_group_(group_id, origin_column_indices),
flushed_batches_(0),
flushed_rows_(0),
finished_(false) {}

Status ColumnGroupWriter::Init() { return writer_.Init(); }
Expand Down
95 changes: 4 additions & 91 deletions cpp/test/packed/one_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,117 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/s3fs.h>
#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>
#include <arrow/api.h>
#include <packed/writer.h>
#include <parquet/properties.h>
#include <packed/reader.h>
#include <memory>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/util/key_value_metadata.h>
#include "test_util.h"
#include "filesystem/fs.h"
#include "packed_test_base.h"

namespace milvus_storage {

class OneFileTest : public PackedTestBase {
protected:
OneFileTest() : props_(*parquet::default_writer_properties()) {}

void SetUp() override {
const char* access_key = std::getenv("ACCESS_KEY");
const char* secret_key = std::getenv("SECRET_KEY");
const char* endpoint_url = std::getenv("S3_ENDPOINT_URL");
const char* file_path = std::getenv("FILE_PATH");
std::string uri = "file:///tmp/";
if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) {
uri = endpoint_url;
}
auto factory = std::make_shared<FileSystemFactory>();
ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(uri, &file_path_));

SetUpCommonData();
props_ = *parquet::default_writer_properties();
writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB memory is for s3fs part upload
reader_memory_ = 16 * 1024 * 1024; // 16 MB memory for reading
}

void TearDown() override { fs_->DeleteDir(file_path_); }

size_t writer_memory_;
size_t reader_memory_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
std::string file_path_;
parquet::WriterProperties props_;
const int bath_size = 100;
};
class OneFileTest : public PackedTestBase {};

TEST_F(OneFileTest, WriteAndRead) {
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_);
for (int i = 0; i < bath_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
int batch_size = 100;

std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(0, 1),
ColumnOffset(0, 2),
};

auto paths = std::vector<std::string>{file_path_ + "/0"};
std::vector<std::string> paths = {file_path_ + "/0"};

// after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file
std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("int32", arrow::int32()),
arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8()),
};
auto new_schema = arrow::schema(fields);

PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_STATUS_OK(pr.Close());

int64_t total_rows = table->num_rows();

auto chunks = table->GetColumnByName("str");
int64_t count = 0;
for (int i = 0; i < chunks->num_chunks(); ++i) {
auto str_array = std::dynamic_pointer_cast<arrow::StringArray>(chunks->chunk(i));
for (int j = 0; j < str_array->length(); ++j) {
int expected_index = (count++) % str_values.size();
ASSERT_EQ(str_array->GetString(j), str_values[expected_index]);
}
}

chunks = table->GetColumnByName("int32");
count = 0;
for (int i = 0; i < chunks->num_chunks(); ++i) {
auto int32_array = std::dynamic_pointer_cast<arrow::Int32Array>(chunks->chunk(i));
for (int j = 0; j < int32_array->length(); ++j) {
int expected_index = (count++) % int32_values.size();
ASSERT_EQ(int32_array->Value(j), int32_values[expected_index]);
}
}

chunks = table->GetColumnByName("int64");
count = 0;
for (int i = 0; i < chunks->num_chunks(); ++i) {
auto int64_array = std::dynamic_pointer_cast<arrow::Int64Array>(chunks->chunk(i));
for (int j = 0; j < int64_array->length(); ++j) {
int expected_index = (count++) % int64_values.size();
ASSERT_EQ(int64_array->Value(j), int64_values[expected_index]);
}
}
TestWriteAndRead(batch_size, paths, fields, column_offsets);
}

} // namespace milvus_storage
95 changes: 4 additions & 91 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,117 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/s3fs.h>
#include <arrow/filesystem/localfs.h>
#include <gtest/gtest.h>
#include <arrow/api.h>
#include <packed/writer.h>
#include <parquet/properties.h>
#include <packed/reader.h>
#include <memory>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/util/key_value_metadata.h>
#include "test_util.h"
#include "filesystem/fs.h"
#include "packed_test_base.h"

namespace milvus_storage {

class PackedIntegrationTest : public PackedTestBase {
protected:
PackedIntegrationTest() : props_(*parquet::default_writer_properties()) {}

void SetUp() override {
const char* access_key = std::getenv("ACCESS_KEY");
const char* secret_key = std::getenv("SECRET_KEY");
const char* endpoint_url = std::getenv("S3_ENDPOINT_URL");
const char* file_path = std::getenv("FILE_PATH");
std::string uri = "file:///tmp/";
if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) {
uri = endpoint_url;
}
auto factory = std::make_shared<FileSystemFactory>();
ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(uri, &file_path_));

SetUpCommonData();
props_ = *parquet::default_writer_properties();
writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB memory is for s3fs part upload
reader_memory_ = 16 * 1024 * 1024; // 16 MB memory for reading
}

void TearDown() override { fs_->DeleteDir(file_path_); }

size_t writer_memory_;
size_t reader_memory_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
std::string file_path_;
parquet::WriterProperties props_;
const int bath_size = 100000;
};
class PackedIntegrationTest : public PackedTestBase {};

TEST_F(PackedIntegrationTest, WriteAndRead) {
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_);
for (int i = 0; i < bath_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
int batch_size = 100000;

std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(1, 0),
ColumnOffset(1, 1),
};

auto paths = std::vector<std::string>{file_path_ + "/0", file_path_ + "/1"};
std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};

// after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file
std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("str", arrow::utf8()),
arrow::field("int32", arrow::int32()),
arrow::field("int64", arrow::int64()),
};
auto new_schema = arrow::schema(fields);

PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_STATUS_OK(pr.Close());

int64_t total_rows = table->num_rows();

auto chunks = table->GetColumnByName("str");
int64_t count = 0;
for (int i = 0; i < chunks->num_chunks(); ++i) {
auto str_array = std::dynamic_pointer_cast<arrow::StringArray>(chunks->chunk(i));
for (int j = 0; j < str_array->length(); ++j) {
int expected_index = (count++) % str_values.size();
ASSERT_EQ(str_array->GetString(j), str_values[expected_index]);
}
}

chunks = table->GetColumnByName("int32");
count = 0;
for (int i = 0; i < chunks->num_chunks(); ++i) {
auto int32_array = std::dynamic_pointer_cast<arrow::Int32Array>(chunks->chunk(i));
for (int j = 0; j < int32_array->length(); ++j) {
int expected_index = (count++) % int32_values.size();
ASSERT_EQ(int32_array->Value(j), int32_values[expected_index]);
}
}

chunks = table->GetColumnByName("int64");
count = 0;
for (int i = 0; i < chunks->num_chunks(); ++i) {
auto int64_array = std::dynamic_pointer_cast<arrow::Int64Array>(chunks->chunk(i));
for (int j = 0; j < int64_array->length(); ++j) {
int expected_index = (count++) % int64_values.size();
ASSERT_EQ(int64_array->Value(j), int64_values[expected_index]);
}
}
TestWriteAndRead(batch_size, paths, fields, column_offsets);
}

} // namespace milvus_storage
Loading

0 comments on commit 872debe

Please sign in to comment.