diff --git a/Cargo.lock b/Cargo.lock index 4d694d3..88e6964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "ahash" version = "0.8.11" @@ -137,6 +143,34 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "apache-avro" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" +dependencies = [ + "bzip2", + "crc32fast", + "digest", + "lazy_static", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_json", + "snap", + "strum 0.25.0", + "strum_macros 0.25.3", + "thiserror", + "typed-builder", + "uuid", + "xz2", + "zstd 0.12.4", +] + [[package]] name = "arrayref" version = "0.3.9" @@ -427,8 +461,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd", - "zstd-safe", + "zstd 0.13.2", + "zstd-safe 7.2.1", ] [[package]] @@ -837,8 +871,8 @@ version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", "unicode-width", ] @@ -910,6 +944,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.14" @@ -997,6 +1040,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "6.1.0" @@ -1018,6 +1067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4fd4a99fc70d40ef7e52b243b4a399c3f8d353a40d5ecb200deee05e49c61bb" dependencies = [ "ahash", + "apache-avro", "arrow", "arrow-array", "arrow-ipc", @@ -1050,6 +1100,7 @@ dependencies = [ "indexmap 2.5.0", "itertools 0.12.1", "log", + "num-traits", "num_cpus", "object_store", "parking_lot", @@ -1064,7 +1115,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd", + "zstd 0.13.2", ] [[package]] @@ -1088,6 +1139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fdbc877e3e40dcf88cc8f283d9f5c8851f0a3aa07fee657b1b75ac1ad49b9c" dependencies = [ "ahash", + "apache-avro", "arrow", "arrow-array", "arrow-buffer", @@ -1148,8 +1200,8 @@ dependencies = [ "paste", "serde_json", "sqlparser 0.49.0", - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", ] [[package]] @@ -1235,6 +1287,16 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-parquet" +version = "0.1.0" +dependencies = [ + "arrow", + "async-trait", + "datafusion", + "parquet", +] + [[package]] name = "datafusion-optimizer" version = "41.0.0" @@ -1388,7 +1450,7 @@ dependencies = [ "log", "regex", "sqlparser 0.49.0", - "strum", + "strum 0.26.3", ] [[package]] @@ -1417,7 +1479,7 @@ dependencies = [ "rustc_version", "serde", "serde_json", - "strum", + "strum 0.26.3", "thiserror", "tracing", "url", @@ -1525,6 +1587,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-functions-json", + "datafusion-functions-parquet", "deltalake", "directories", "env_logger", @@ -1541,7 +1604,7 @@ dependencies = [ "prost", "ratatui", "serde", - "strum", + "strum 0.26.3", "tempfile", "tokio", "tokio-stream", @@ -2374,6 +2437,30 @@ version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +[[package]] +name = "libflate" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +dependencies = [ + "core2", + "hashbrown 0.14.5", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.8" @@ -2745,7 +2832,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.13.2", "zstd-sys", ] @@ -2936,6 +3023,12 @@ dependencies = [ "prost", ] +[[package]] +name = "quad-rand" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" + [[package]] name = "quick-xml" version = "0.36.1" @@ -3053,8 +3146,8 @@ dependencies = [ "itertools 0.13.0", "lru", "paste", - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", "unicode-segmentation", "unicode-truncate", "unicode-width", @@ -3103,6 +3196,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.8.4" @@ -3169,6 +3268,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "roaring" version = "0.10.6" @@ -3573,13 +3678,32 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" + [[package]] name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros", + "strum_macros 0.26.4", +] + +[[package]] +name = "strum_macros" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.77", ] [[package]] @@ -4007,6 +4131,26 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typed-builder" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4521,13 +4665,32 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + [[package]] name = "zstd" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ - "zstd-safe", + "zstd-safe 7.2.1", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 25ebeb5..1c9f48c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ crossterm = { version = "0.28.1", features = ["event-stream"] } datafusion = "41.0.0" datafusion-common = "41.0.0" datafusion-functions-json = { version = "0.41.0", optional = true } +datafusion-functions-parquet = { version = "0.1.0", path = "crates/datafusion-functions-parquet", optional = true } deltalake = { version = "0.19.0", features = ["datafusion"], optional = true } directories = "5.0.1" env_logger = "0.11.5" @@ -56,11 +57,13 @@ tempfile = "3.2.0" # When addding a new feature, also add it to the features tested list in CI (`.github/workflows/rust.yml`) [features] +default = ["functions-parquet"] deltalake = ["dep:deltalake"] flightsql = ["dep:arrow-flight", "dep:tonic"] experimental-flightsql-server = ["dep:arrow-flight", "dep:tonic"] s3 = ["object_store/aws", "url"] functions-json = ["dep:datafusion-functions-json"] +functions-parquet = ["dep:datafusion-functions-parquet"] [[bin]] name = "dft" diff --git a/README.md b/README.md index 302d0ed..05eb24f 100644 --- a/README.md +++ b/README.md @@ -116,9 +116,21 @@ Currently, the only supported packaging is on [crates.io](https://crates.io/sear Once installed you can run `dft` to start the application. -#### Optional Features (Rust Crate Features) +#### Internal Optional Features (Workspace Features) -`dft` has several optional (conditionally compiled features) integrations which are controlled by [Rust Crate Features] +`dft` incubates several optional features in it's `crates` directory. This provides us with the ability to quickly iterate on new features and test them in the main application while at the same time making it easy to export them to their own crates when they are ready. + +#### Parquet Functions (`--features=functions-parquet`) + +Includes functions from [datafusion-function-parquet] for querying Parquet files in DataFusion in `dft`. For example: + +```sql +SELECT * FROM parquet_metadata('my_parquet_file.parquet') +``` + +#### External Optional Features (Rust Crate Features) + +`dft` also has several external optional (conditionally compiled features) integrations which are controlled by [Rust Crate Features] To build with all features, you can run diff --git a/crates/datafusion-functions-parquet/Cargo.toml b/crates/datafusion-functions-parquet/Cargo.toml new file mode 100644 index 0000000..f2c247e --- /dev/null +++ b/crates/datafusion-functions-parquet/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "datafusion-functions-parquet" +version = "0.1.0" +edition = "2021" + +[dependencies] +arrow = { version = "52.2.0" } +async-trait = "0.1.41" +datafusion = { version = "41.0.0", features = [ + "avro", + "crypto_expressions", + "datetime_expressions", + "encoding_expressions", + "parquet", + "regex_expressions", + "unicode_expressions", + "compression", +] } +parquet = { version = "52.2.0", default-features = false } diff --git a/crates/datafusion-functions-parquet/src/lib.rs b/crates/datafusion-functions-parquet/src/lib.rs new file mode 100644 index 0000000..18de22a --- /dev/null +++ b/crates/datafusion-functions-parquet/src/lib.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int64Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; + +use datafusion::catalog::Session; +use datafusion::common::{plan_err, Column}; +use datafusion::datasource::function::TableFunctionImpl; +use datafusion::datasource::TableProvider; +use datafusion::error::Result; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::scalar::ScalarValue; +use parquet::basic::ConvertedType; +use parquet::file::reader::FileReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics; +use std::fs::File; +use std::sync::Arc; + +// Copied from https://github.com/apache/datafusion/blob/main/datafusion-cli/src/functions.rs + +/// PARQUET_META table function +struct ParquetMetadataTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for ParquetMetadataTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(MemoryExec::try_new( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?)) + } +} + +fn convert_parquet_statistics( + value: &Statistics, + converted_type: ConvertedType, +) -> (String, String) { + match (value, converted_type) { + (Statistics::Boolean(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int32(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int64(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int96(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Float(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Double(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::ByteArray(val), ConvertedType::UTF8) => { + let min_bytes = val.min(); + let max_bytes = val.max(); + let min = min_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| min_bytes.to_string()); + + let max = max_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| max_bytes.to_string()); + (min, max) + } + (Statistics::ByteArray(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => { + let min_bytes = val.min(); + let max_bytes = val.max(); + let min = min_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| min_bytes.to_string()); + + let max = max_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| max_bytes.to_string()); + (min, max) + } + (Statistics::FixedLenByteArray(val), _) => (val.min().to_string(), val.max().to_string()), + } +} + +pub struct ParquetMetadataFunc {} + +impl TableFunctionImpl for ParquetMetadataFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + let filename = match exprs.first() { + Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet') + Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet") + _ => { + return plan_err!("parquet_metadata requires string argument as its input"); + } + }; + + let file = File::open(filename.clone())?; + let reader = SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("filename", DataType::Utf8, true), + Field::new("row_group_id", DataType::Int64, true), + Field::new("row_group_num_rows", DataType::Int64, true), + Field::new("row_group_num_columns", DataType::Int64, true), + Field::new("row_group_bytes", DataType::Int64, true), + Field::new("column_id", DataType::Int64, true), + Field::new("file_offset", DataType::Int64, true), + Field::new("num_values", DataType::Int64, true), + Field::new("path_in_schema", DataType::Utf8, true), + Field::new("type", DataType::Utf8, true), + Field::new("stats_min", DataType::Utf8, true), + Field::new("stats_max", DataType::Utf8, true), + Field::new("stats_null_count", DataType::Int64, true), + Field::new("stats_distinct_count", DataType::Int64, true), + Field::new("stats_min_value", DataType::Utf8, true), + Field::new("stats_max_value", DataType::Utf8, true), + Field::new("compression", DataType::Utf8, true), + Field::new("encodings", DataType::Utf8, true), + Field::new("index_page_offset", DataType::Int64, true), + Field::new("dictionary_page_offset", DataType::Int64, true), + Field::new("data_page_offset", DataType::Int64, true), + Field::new("total_compressed_size", DataType::Int64, true), + Field::new("total_uncompressed_size", DataType::Int64, true), + ])); + + // construct recordbatch from metadata + let mut filename_arr = vec![]; + let mut row_group_id_arr = vec![]; + let mut row_group_num_rows_arr = vec![]; + let mut row_group_num_columns_arr = vec![]; + let mut row_group_bytes_arr = vec![]; + let mut column_id_arr = vec![]; + let mut file_offset_arr = vec![]; + let mut num_values_arr = vec![]; + let mut path_in_schema_arr = vec![]; + let mut type_arr = vec![]; + let mut stats_min_arr = vec![]; + let mut stats_max_arr = vec![]; + let mut stats_null_count_arr = vec![]; + let mut stats_distinct_count_arr = vec![]; + let mut stats_min_value_arr = vec![]; + let mut stats_max_value_arr = vec![]; + let mut compression_arr = vec![]; + let mut encodings_arr = vec![]; + let mut index_page_offset_arr = vec![]; + let mut dictionary_page_offset_arr = vec![]; + let mut data_page_offset_arr = vec![]; + let mut total_compressed_size_arr = vec![]; + let mut total_uncompressed_size_arr = vec![]; + for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() { + for (col_idx, column) in row_group.columns().iter().enumerate() { + filename_arr.push(filename.clone()); + row_group_id_arr.push(rg_idx as i64); + row_group_num_rows_arr.push(row_group.num_rows()); + row_group_num_columns_arr.push(row_group.num_columns() as i64); + row_group_bytes_arr.push(row_group.total_byte_size()); + column_id_arr.push(col_idx as i64); + file_offset_arr.push(column.file_offset()); + num_values_arr.push(column.num_values()); + path_in_schema_arr.push(column.column_path().to_string()); + type_arr.push(column.column_type().to_string()); + let converted_type = column.column_descr().converted_type(); + + if let Some(s) = column.statistics() { + let (min_val, max_val) = if s.has_min_max_set() { + let (min_val, max_val) = convert_parquet_statistics(s, converted_type); + (Some(min_val), Some(max_val)) + } else { + (None, None) + }; + stats_min_arr.push(min_val.clone()); + stats_max_arr.push(max_val.clone()); + stats_null_count_arr.push(Some(s.null_count() as i64)); + stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64)); + stats_min_value_arr.push(min_val); + stats_max_value_arr.push(max_val); + } else { + stats_min_arr.push(None); + stats_max_arr.push(None); + stats_null_count_arr.push(None); + stats_distinct_count_arr.push(None); + stats_min_value_arr.push(None); + stats_max_value_arr.push(None); + }; + compression_arr.push(format!("{:?}", column.compression())); + encodings_arr.push(format!("{:?}", column.encodings())); + index_page_offset_arr.push(column.index_page_offset()); + dictionary_page_offset_arr.push(column.dictionary_page_offset()); + data_page_offset_arr.push(column.data_page_offset()); + total_compressed_size_arr.push(column.compressed_size()); + total_uncompressed_size_arr.push(column.uncompressed_size()); + } + } + + let rb = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(filename_arr)), + Arc::new(Int64Array::from(row_group_id_arr)), + Arc::new(Int64Array::from(row_group_num_rows_arr)), + Arc::new(Int64Array::from(row_group_num_columns_arr)), + Arc::new(Int64Array::from(row_group_bytes_arr)), + Arc::new(Int64Array::from(column_id_arr)), + Arc::new(Int64Array::from(file_offset_arr)), + Arc::new(Int64Array::from(num_values_arr)), + Arc::new(StringArray::from(path_in_schema_arr)), + Arc::new(StringArray::from(type_arr)), + Arc::new(StringArray::from(stats_min_arr)), + Arc::new(StringArray::from(stats_max_arr)), + Arc::new(Int64Array::from(stats_null_count_arr)), + Arc::new(Int64Array::from(stats_distinct_count_arr)), + Arc::new(StringArray::from(stats_min_value_arr)), + Arc::new(StringArray::from(stats_max_value_arr)), + Arc::new(StringArray::from(compression_arr)), + Arc::new(StringArray::from(encodings_arr)), + Arc::new(Int64Array::from(index_page_offset_arr)), + Arc::new(Int64Array::from(dictionary_page_offset_arr)), + Arc::new(Int64Array::from(data_page_offset_arr)), + Arc::new(Int64Array::from(total_compressed_size_arr)), + Arc::new(Int64Array::from(total_uncompressed_size_arr)), + ], + )?; + + let parquet_metadata = ParquetMetadataTable { schema, batch: rb }; + Ok(Arc::new(parquet_metadata)) + } +} diff --git a/src/execution/local.rs b/src/execution/local.rs index 7a01a14..deb65c8 100644 --- a/src/execution/local.rs +++ b/src/execution/local.rs @@ -90,6 +90,12 @@ impl ExecutionContext { extension.register_on_ctx(config, &mut session_ctx)?; } + // Register Parquet Metadata Function + session_ctx.register_udtf( + "parquet_metadata", + Arc::new(datafusion_functions_parquet::ParquetMetadataFunc {}), + ); + Ok(Self { config: config.clone(), session_ctx, diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 09eda4d..863367d 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -32,12 +32,6 @@ use crate::config::AppConfig; use color_eyre::eyre::Result; use datafusion::prelude::*; -// #[cfg(feature = "flightsql")] -// use { -// arrow_flight::sql::client::FlightSqlServiceClient, tokio::sync::Mutex, -// tonic::transport::Channel, -// }; - pub enum AppType { Cli, Tui, @@ -58,7 +52,6 @@ impl AppExecution { context, #[cfg(feature = "flightsql")] flightsql_context: FlightSQLContext::default(), - // flightsql_client: Mutex::new(None), } } @@ -95,27 +88,4 @@ impl AppExecution { pub fn with_flightsql_ctx(&mut self, flightsql_ctx: FlightSQLContext) { self.flightsql_context = flightsql_ctx; } - - // Create FlightSQL client from users FlightSQL config - // #[cfg(feature = "flightsql")] - // pub async fn create_flightsql_client(&self, config: FlightSQLConfig) -> Result<()> { - // use color_eyre::eyre::eyre; - // use log::info; - // - // let url = Box::leak(config.connection_url.into_boxed_str()); - // info!("Connecting to FlightSQL host: {}", url); - // let channel = Channel::from_static(url).connect().await; - // match channel { - // Ok(c) => { - // let client = FlightSqlServiceClient::new(c); - // let mut guard = self.flightsql_context.client().lock().await; - // *guard = Some(client); - // Ok(()) - // } - // Err(e) => Err(eyre!( - // "Error creating channel for FlightSQL client: {:?}", - // e - // )), - // } - // } }