diff --git a/Cargo.lock b/Cargo.lock index 5103deb..8046c17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,20 +218,6 @@ dependencies = [ "num", ] -[[package]] -name = "arrow-ipc" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" -dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-schema", - "flatbuffers", -] - [[package]] name = "arrow-ord" version = "49.0.0" @@ -519,10 +505,8 @@ dependencies = [ "fallible-streaming-iterator", "half", "hex", - "insta", "itertools", "log", - "parquet", "postgres", "postgres-protocol", "rand", @@ -746,16 +730,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" -[[package]] -name = "flatbuffers" -version = "23.5.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" -dependencies = [ - "bitflags 1.2.1", - "rustc_version", -] - [[package]] name = "flate2" version = "1.0.28" @@ -952,25 +926,6 @@ dependencies = [ "hashbrown 0.14.3", ] -[[package]] -name = "insta" -version = "1.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d64600be34b2fcfc267740a243fa7744441bb4947a619ac4e5bb6507f35fbfc" -dependencies = [ - "console", - "lazy_static", - "linked-hash-map", - "similar", - "yaml-rust", -] - -[[package]] -name = "integer-encoding" -version = "3.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" - [[package]] name = "itertools" version = "0.10.5" @@ -1102,12 +1057,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1258,15 +1207,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "ordered-float" -version = "2.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" -dependencies = [ - "num-traits", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -1290,38 +1230,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "parquet" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4" -dependencies = [ - "ahash 0.8.7", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-ipc", - "arrow-schema", - "arrow-select", - "base64", - "bytes", - "chrono", - "hashbrown 0.14.3", - "num", - "num-bigint", - "paste", - "seq-macro", - "thrift", - "twox-hash", -] - -[[package]] -name = "paste" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" - [[package]] name = "percent-encoding" version = "2.3.1" @@ -1652,15 +1560,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustc_version" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" -dependencies = [ - "semver", -] - [[package]] name = "rustix" version = "0.38.30" @@ -1698,18 +1597,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "semver" -version = "1.0.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" - -[[package]] -name = "seq-macro" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" - [[package]] name = "serde" version = "1.0.196" @@ -1925,17 +1812,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "thrift" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" -dependencies = [ - "byteorder", - "integer-encoding", - "ordered-float", -] - [[package]] name = "tiny-keccak" version = "2.0.2" @@ -2051,16 +1927,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "twox-hash" -version = "1.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" -dependencies = [ - "cfg-if", - "static_assertions", -] - [[package]] name = "typenum" version = "1.17.0" @@ -2386,15 +2252,6 @@ dependencies = [ "rustix", ] -[[package]] -name = "yaml-rust" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "zerocopy" version = "0.7.32" diff --git a/connector_arrow/Cargo.toml b/connector_arrow/Cargo.toml index e6930d1..97574cd 100644 --- a/connector_arrow/Cargo.toml +++ b/connector_arrow/Cargo.toml @@ -57,9 +57,7 @@ optional = true [dev-dependencies] env_logger = "0.11" -arrow = { version = "49", features = ["prettyprint"], default-features = false } -parquet = { version = "49", features = ["arrow"], default-features = false } -insta = { version = "1.34.0" } +arrow = { version = "49", default-features = false } similar-asserts = { version = "1.5.0" } half = "2.3.1" rand = { version = "0.8.5", default-features = false } diff --git a/connector_arrow/src/util/coerce.rs b/connector_arrow/src/util/coerce.rs index c4690fb..19b51f2 100644 --- a/connector_arrow/src/util/coerce.rs +++ b/connector_arrow/src/util/coerce.rs @@ -6,23 +6,32 @@ use arrow::record_batch::RecordBatch; use itertools::Itertools; pub fn coerce_batches( + schema: SchemaRef, batches: &[RecordBatch], coerce_fn: F, -) -> Result, arrow::error::ArrowError> + override_nullable: Option, +) -> Result<(SchemaRef, Vec), arrow::error::ArrowError> where F: Fn(&DataType) -> Option + Copy, { - batches.iter().map(|b| coerce_batch(b, coerce_fn)).collect() + let batches = batches + .iter() + .map(|b| coerce_batch(b, coerce_fn, override_nullable)) + .collect::, _>>()?; + + let schema = coerce_schema(schema, coerce_fn, override_nullable); + Ok((schema, batches)) } pub fn coerce_batch( batch: &RecordBatch, coerce_fn: F, + override_nullable: Option, ) -> Result where F: Fn(&DataType) -> Option + Copy, { - let new_schema = coerce_schema(batch.schema(), coerce_fn); + let new_schema = coerce_schema(batch.schema(), coerce_fn, override_nullable); let new_columns = batch .columns() @@ -47,7 +56,11 @@ where } } -pub fn coerce_schema(schema: SchemaRef, coerce_fn: F) -> SchemaRef +pub fn coerce_schema( + schema: SchemaRef, + coerce_fn: F, + override_nullable: Option, +) -> SchemaRef where F: Fn(&DataType) -> Option + Copy, { @@ -55,9 +68,19 @@ where schema .fields() .iter() - .map(|f| match coerce_fn(f.data_type()) { - Some(new_ty) => Field::new(f.name(), new_ty, true), - None => Field::clone(f).with_nullable(true), + .map(|f| { + let field = match coerce_fn(f.data_type()) { + Some(new_ty) => { + let nullable = f.is_nullable() || matches!(f.data_type(), DataType::Null); + Field::new(f.name(), new_ty, nullable) + } + None => Field::clone(f), + }; + if let Some(nullable) = &override_nullable { + field.with_nullable(*nullable) + } else { + field + } }) .collect_vec(), )) diff --git a/connector_arrow/tests/it/generator.rs b/connector_arrow/tests/it/generator.rs index 26e6c6b..0002383 100644 --- a/connector_arrow/tests/it/generator.rs +++ b/connector_arrow/tests/it/generator.rs @@ -4,7 +4,10 @@ use half::f16; use rand::Rng; use std::sync::Arc; -pub fn generate_batch(column_specs: Vec, rng: &mut R) -> RecordBatch { +pub fn generate_batch( + column_specs: Vec, + rng: &mut R, +) -> (SchemaRef, Vec) { let mut arrays = Vec::new(); let mut fields = Vec::new(); for column in column_specs { @@ -15,11 +18,70 @@ pub fn generate_batch(column_specs: Vec, rng: &mut R) -> Rec fields.push(field); } let schema = Arc::new(Schema::new(fields)); - RecordBatch::try_new(schema, arrays).unwrap() + + let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + if batch.num_rows() == 0 { + (schema, vec![]) + } else { + (schema, vec![batch]) + } } -pub fn spec_simple() -> Vec { - domains_to_batch_spec(&[DataType::Null, DataType::Boolean], &[false, true]) +pub fn spec_all_types() -> Vec { + domains_to_batch_spec( + &[ + DataType::Null, + DataType::Boolean, + // DataType::Int8, + DataType::Int16, + DataType::Int32, + DataType::Int64, + // DataType::UInt8, + // DataType::UInt16, + // DataType::UInt32, + // DataType::UInt64, + // DataType::Float16, + DataType::Float32, + DataType::Float64, + // DataType::Timestamp(TimeUnit::Nanosecond, None), + // DataType::Timestamp(TimeUnit::Microsecond, None), + // DataType::Timestamp(TimeUnit::Millisecond, None), + // DataType::Timestamp(TimeUnit::Second, None), + // DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("+07:30"))), + // DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("+07:30"))), + // DataType::Timestamp(TimeUnit::Millisecond, Some(Arc::from("+07:30"))), + // DataType::Timestamp(TimeUnit::Second, Some(Arc::from("+07:30"))), + // DataType::Time32(TimeUnit::Millisecond), + // DataType::Time32(TimeUnit::Second), + // DataType::Time64(TimeUnit::Nanosecond), + // DataType::Time64(TimeUnit::Microsecond), + // DataType::Duration(TimeUnit::Nanosecond), + // DataType::Duration(TimeUnit::Microsecond), + // DataType::Duration(TimeUnit::Millisecond), + // DataType::Duration(TimeUnit::Second), + // DataType::Interval(IntervalUnit::YearMonth), + // DataType::Interval(IntervalUnit::MonthDayNano), + // DataType::Interval(IntervalUnit::DayTime), + ], + &[false, true], + &[ValueGenProcess::High], + ) +} + +pub fn spec_empty() -> Vec { + domains_to_batch_spec( + &[DataType::Null, DataType::Int64, DataType::Float64], + &[false, true], + &[], + ) +} + +pub fn spec_null_bool() -> Vec { + domains_to_batch_spec( + &[DataType::Null, DataType::Boolean], + &[false, true], + &VALUE_GEN_PROCESS_ALL, + ) } pub fn spec_numeric() -> Vec { @@ -38,6 +100,7 @@ pub fn spec_numeric() -> Vec { DataType::Float64, ], &[false, true], + &VALUE_GEN_PROCESS_ALL, ) } @@ -54,10 +117,15 @@ pub fn spec_timestamp() -> Vec { DataType::Timestamp(TimeUnit::Second, Some(Arc::from("+07:30"))), ], &[true], + &VALUE_GEN_PROCESS_ALL, ) } pub fn spec_date() -> Vec { - domains_to_batch_spec(&[DataType::Date32, DataType::Date64], &[true]) + domains_to_batch_spec( + &[DataType::Date32, DataType::Date64], + &[true], + &VALUE_GEN_PROCESS_ALL, + ) } pub fn spec_time() -> Vec { domains_to_batch_spec( @@ -68,6 +136,7 @@ pub fn spec_time() -> Vec { DataType::Time64(TimeUnit::Microsecond), ], &[true], + &VALUE_GEN_PROCESS_ALL, ) } pub fn spec_duration() -> Vec { @@ -79,6 +148,7 @@ pub fn spec_duration() -> Vec { DataType::Duration(TimeUnit::Second), ], &[true], + &VALUE_GEN_PROCESS_ALL, ) } pub fn spec_interval() -> Vec { @@ -89,20 +159,15 @@ pub fn spec_interval() -> Vec { DataType::Interval(IntervalUnit::DayTime), ], &[true], + &VALUE_GEN_PROCESS_ALL, ) } pub fn domains_to_batch_spec( data_types_domain: &[DataType], is_nullable_domain: &[bool], + value_gen_process_domain: &[ValueGenProcess], ) -> Vec { - let value_gen_process_domain = [ - ValueGenProcess::Low, - ValueGenProcess::High, - ValueGenProcess::Null, - ValueGenProcess::RandomUniform, - ]; - let mut columns = Vec::new(); for data_type in data_types_domain { for is_nullable in is_nullable_domain { @@ -127,7 +192,7 @@ pub fn domains_to_batch_spec( gen_process: if matches!(gen_process, ValueGenProcess::Null) && !is_nullable { ValueGenProcess::RandomUniform } else { - gen_process + *gen_process }, repeat: 1, }); @@ -139,13 +204,20 @@ pub fn domains_to_batch_spec( } #[derive(Clone, Copy)] -enum ValueGenProcess { +pub enum ValueGenProcess { Null, Low, High, RandomUniform, } +const VALUE_GEN_PROCESS_ALL: [ValueGenProcess; 4] = [ + ValueGenProcess::Low, + ValueGenProcess::High, + ValueGenProcess::Null, + ValueGenProcess::RandomUniform, +]; + struct ValuesSpec { gen_process: ValueGenProcess, repeat: usize, diff --git a/connector_arrow/tests/it/test_duckdb.rs b/connector_arrow/tests/it/test_duckdb.rs index cabdb71..970b067 100644 --- a/connector_arrow/tests/it/test_duckdb.rs +++ b/connector_arrow/tests/it/test_duckdb.rs @@ -10,31 +10,22 @@ fn query_01() { super::tests::query_01(&mut conn); } -#[test] -fn roundtrip_basic_small() { - let table_name = "roundtrip_basic_small"; - let file_name = "basic_small.parquet"; - - let mut conn = init(); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); -} - #[test] fn roundtrip_empty() { let table_name = "roundtrip_empty"; - let file_name = "empty.parquet"; let mut conn = init(); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_empty(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn roundtrip_simple() { - let table_name = "roundtrip_simple"; +fn roundtrip_null_bool() { + let table_name = "roundtrip_null_bool"; let mut conn = init(); - let column_spec = super::generator::spec_simple(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + let column_spec = super::generator::spec_null_bool(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -43,23 +34,23 @@ fn roundtrip_numeric() { let mut conn = init(); let column_spec = super::generator::spec_numeric(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn introspection_basic_small() { - let table_name = "introspection_basic_small"; - let file_name = "basic_small.parquet"; +fn schema_get() { + let table_name = "schema_get"; let mut conn = init(); - super::tests::introspection(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_get(&mut conn, table_name, column_spec); } #[test] -fn schema_edit_01() { - let table_name = "schema_edit_01"; - let file_name = "basic_small.parquet"; +fn schema_edit() { + let table_name = "schema_edit"; let mut conn = init(); - super::tests::schema_edit(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_edit(&mut conn, table_name, column_spec); } diff --git a/connector_arrow/tests/it/test_postgres_extended.rs b/connector_arrow/tests/it/test_postgres_extended.rs index 52b39bc..28fb26d 100644 --- a/connector_arrow/tests/it/test_postgres_extended.rs +++ b/connector_arrow/tests/it/test_postgres_extended.rs @@ -18,57 +18,42 @@ fn query_01() { super::tests::query_01(&mut conn); } -#[test] -fn roundtrip_basic_small() { - let table_name = "extended::roundtrip_basic_small"; - let file_name = "basic_small.parquet"; - - let mut client = init(); - let mut conn = wrap_conn(&mut client); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); -} - #[test] fn roundtrip_empty() { let table_name = "extended::roundtrip_empty"; - let file_name = "empty.parquet"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_empty(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn introspection_basic_small() { - let table_name = "extended::introspection_basic_small"; - let file_name = "basic_small.parquet"; +fn roundtrip_null_bool() { + let table_name = "extended::roundtrip_null_bool"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::introspection(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_null_bool(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn schema_edit_01() { - let table_name = "extended::schema_edit_01"; - let file_name = "basic_small.parquet"; +fn schema_get() { + let table_name = "extended::schema_get"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::schema_edit(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_get(&mut conn, table_name, column_spec); } #[test] -#[ignore] -fn streaming() { +fn schema_edit() { + let table_name = "extended::schema_edit"; + let mut client = init(); let mut conn = wrap_conn(&mut client); - - super::tests::streaming(&mut conn); - - // This should be quick and not load the full result. - // ... but I guess not - it takes a long time. - // ... I don't know why. Maybe my impl is wrong, but I cannot find a reason why. - // ... Maybe it is the postgres that hangs before returning the first result batch? - // ... Maybe it tries to return the full result and not in batches? + let column_spec = super::generator::spec_all_types(); + super::tests::schema_edit(&mut conn, table_name, column_spec); } diff --git a/connector_arrow/tests/it/test_postgres_simple.rs b/connector_arrow/tests/it/test_postgres_simple.rs index 528be44..cb1c183 100644 --- a/connector_arrow/tests/it/test_postgres_simple.rs +++ b/connector_arrow/tests/it/test_postgres_simple.rs @@ -19,41 +19,41 @@ fn query_01() { } #[test] -fn roundtrip_basic_small() { - let table_name = "simple::roundtrip_basic_small"; - let file_name = "basic_small.parquet"; +fn roundtrip_empty() { + let table_name = "simple::roundtrip_empty"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_empty(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn roundtrip_empty() { - let table_name = "simple::roundtrip_empty"; - let file_name = "empty.parquet"; +fn roundtrip_null_bool() { + let table_name = "simple::roundtrip_null_bool"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_null_bool(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn introspection_basic_small() { - let table_name = "simple::introspection_basic_small"; - let file_name = "basic_small.parquet"; +fn schema_get() { + let table_name = "simple::schema_get"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::introspection(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_get(&mut conn, table_name, column_spec); } #[test] -fn schema_edit_01() { - let table_name = "simple::schema_edit_01"; - let file_name = "basic_small.parquet"; +fn schema_edit() { + let table_name = "simple::schema_edit"; let mut client = init(); let mut conn = wrap_conn(&mut client); - super::tests::schema_edit(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_edit(&mut conn, table_name, column_spec); } diff --git a/connector_arrow/tests/it/test_sqlite.rs b/connector_arrow/tests/it/test_sqlite.rs index 295352a..afb0571 100644 --- a/connector_arrow/tests/it/test_sqlite.rs +++ b/connector_arrow/tests/it/test_sqlite.rs @@ -10,32 +10,23 @@ fn query_01() { super::tests::query_01(&mut conn); } -#[test] -fn roundtrip_basic_small() { - let table_name = "roundtrip_basic_small"; - let file_name = "basic_small.parquet"; - - let mut conn = init(); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); -} - #[test] #[ignore] // SQLite cannot infer schema from an empty response, as there is no rows to infer from fn roundtrip_empty() { let table_name = "roundtrip_empty"; - let file_name = "empty.parquet"; let mut conn = init(); - super::tests::roundtrip_of_parquet(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_empty(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] -fn roundtrip_simple() { - let table_name = "roundtrip_simple"; +fn roundtrip_null_bool() { + let table_name = "roundtrip_null_bool"; let mut conn = init(); - let column_spec = super::generator::spec_simple(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + let column_spec = super::generator::spec_null_bool(); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -44,7 +35,7 @@ fn roundtrip_numeric() { let mut conn = init(); let column_spec = super::generator::spec_numeric(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -54,7 +45,7 @@ fn roundtrip_timestamp() { let mut conn = init(); let column_spec = super::generator::spec_timestamp(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -64,7 +55,7 @@ fn roundtrip_date() { let mut conn = init(); let column_spec = super::generator::spec_date(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -74,7 +65,7 @@ fn roundtrip_time() { let mut conn = init(); let column_spec = super::generator::spec_time(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -84,7 +75,7 @@ fn roundtrip_duration() { let mut conn = init(); let column_spec = super::generator::spec_duration(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] @@ -94,24 +85,24 @@ fn roundtrip_interval() { let mut conn = init(); let column_spec = super::generator::spec_interval(); - super::tests::roundtrip_of_generated(&mut conn, table_name, column_spec); + super::tests::roundtrip(&mut conn, table_name, column_spec); } #[test] #[ignore] // cannot introspect the Null column -fn introspection_basic_small() { - let table_name = "introspection_basic_small"; - let file_name = "basic_small.parquet"; +fn schema_get() { + let table_name = "schema_get"; let mut conn = init(); - super::tests::introspection(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_get(&mut conn, table_name, column_spec); } #[test] -fn schema_edit_01() { - let table_name = "schema_edit_01"; - let file_name = "basic_small.parquet"; +fn schema_edit() { + let table_name = "schema_edit"; let mut conn = init(); - super::tests::schema_edit(&mut conn, file_name, table_name); + let column_spec = super::generator::spec_all_types(); + super::tests::schema_edit(&mut conn, table_name, column_spec); } diff --git a/connector_arrow/tests/it/tests.rs b/connector_arrow/tests/it/tests.rs index ee3192f..8c34a01 100644 --- a/connector_arrow/tests/it/tests.rs +++ b/connector_arrow/tests/it/tests.rs @@ -1,17 +1,13 @@ -use std::path::Path; - use arrow::util::pretty::pretty_format_batches; use connector_arrow::{ api::{Connection, ResultReader, SchemaEdit, SchemaGet, Statement}, + util::coerce, TableCreateError, TableDropError, }; use rand::SeedableRng; +use crate::generator::{generate_batch, ColumnSpec}; use crate::util::{load_into_table, query_table}; -use crate::{ - generator::{generate_batch, ColumnSpec}, - util::read_parquet, -}; #[track_caller] pub fn query_01(conn: &mut C) { @@ -28,59 +24,49 @@ pub fn query_01(conn: &mut C) { ); } -pub fn roundtrip_of_parquet(conn: &mut C, file_name: &str, table_name: &str) -where - C: Connection + SchemaEdit, -{ - let file_path = Path::new("./tests/data/a").with_file_name(file_name); - - let (schema_file, batches_file) = read_parquet(&file_path).unwrap(); - let (schema_file, batches_file) = - load_into_table(conn, schema_file, batches_file, table_name).unwrap(); - let (schema_query, batches_query) = query_table(conn, table_name).unwrap(); - similar_asserts::assert_eq!(schema_file, schema_query); - similar_asserts::assert_eq!(batches_file, batches_query); -} - -pub fn roundtrip_of_generated(conn: &mut C, table_name: &str, column_specs: Vec) +pub fn roundtrip(conn: &mut C, table_name: &str, column_specs: Vec) where C: Connection + SchemaEdit, { let mut rng = rand_chacha::ChaCha8Rng::from_seed([0; 32]); - let batch = generate_batch(column_specs, &mut rng); + let (schema, batches) = generate_batch(column_specs, &mut rng); + + load_into_table(conn, schema.clone(), &batches, table_name).unwrap(); - let (_, batches_file) = load_into_table(conn, batch.schema(), vec![batch], table_name).unwrap(); + let (schema_coerced, batches_coerced) = + coerce::coerce_batches(schema, &batches, C::coerce_type, Some(true)).unwrap(); - let (_, batches_query) = query_table(conn, table_name).unwrap(); + let (schema_query, batches_query) = query_table(conn, table_name).unwrap(); - similar_asserts::assert_eq!(batches_file, batches_query); + similar_asserts::assert_eq!(schema_coerced, schema_query); + similar_asserts::assert_eq!(batches_coerced, batches_query); } -pub fn introspection(conn: &mut C, file_name: &str, table_name: &str) +pub fn schema_get(conn: &mut C, table_name: &str, column_specs: Vec) where C: Connection + SchemaEdit + SchemaGet, { - let file_path = Path::new("./tests/data/a").with_file_name(file_name); + let mut rng = rand_chacha::ChaCha8Rng::from_seed([0; 32]); + let (schema, batches) = generate_batch(column_specs, &mut rng); - let (schema_file, batches_file) = read_parquet(&file_path).unwrap(); - let (schema_loaded, _) = load_into_table(conn, schema_file, batches_file, table_name).unwrap(); + load_into_table(conn, schema.clone(), &batches, table_name).unwrap(); + let schema = coerce::coerce_schema(schema, &C::coerce_type, Some(false)); let schema_introspection = conn.table_get(table_name).unwrap(); - similar_asserts::assert_eq!(schema_loaded, schema_introspection); + let schema_introspection = coerce::coerce_schema(schema_introspection, |_| None, Some(false)); + similar_asserts::assert_eq!(schema, schema_introspection); } -pub fn schema_edit(conn: &mut C, file_name: &str, table_name: &str) +pub fn schema_edit(conn: &mut C, table_name: &str, column_specs: Vec) where C: Connection + SchemaEdit + SchemaGet, { - let file_path = Path::new("./tests/data/a").with_file_name(file_name); - - let (schema_file, batches_file) = read_parquet(&file_path).unwrap(); - let (schema, _) = load_into_table(conn, schema_file, batches_file, table_name).unwrap(); + let mut rng = rand_chacha::ChaCha8Rng::from_seed([0; 32]); + let (schema, _) = generate_batch(column_specs, &mut rng); let table_name2 = table_name.to_string() + "2"; - let _ignore = conn.table_drop(&table_name2); + let _ = conn.table_drop(&table_name2); conn.table_create(&table_name2, schema.clone()).unwrap(); assert!(matches!( @@ -95,6 +81,7 @@ where )); } +#[allow(dead_code)] pub fn streaming(conn: &mut C) { let query = " WITH RECURSIVE t(n) AS ( @@ -123,4 +110,10 @@ pub fn streaming(conn: &mut C) { // drop the reader and don't call next // this should not load anymore batches + + // This should be quick and not load the full result. + // ... but I guess not - it takes a long time. + // ... I don't know why. Maybe my impl is wrong, but I cannot find a reason why. + // ... Maybe it is the postgres that hangs before returning the first result batch? + // ... Maybe it tries to return the full result and not in batches? } diff --git a/connector_arrow/tests/it/util.rs b/connector_arrow/tests/it/util.rs index 42074c8..a7c2003 100644 --- a/connector_arrow/tests/it/util.rs +++ b/connector_arrow/tests/it/util.rs @@ -1,44 +1,15 @@ -use std::{fs::File, path::Path}; - use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use connector_arrow::api::{Append, Connection, ResultReader, SchemaEdit, Statement}; -use connector_arrow::util::coerce; use connector_arrow::{ConnectorError, TableCreateError, TableDropError}; -pub fn read_parquet(file_path: &Path) -> Result<(SchemaRef, Vec), ArrowError> { - // read from file - let file = File::open(file_path)?; - - let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; - - let schema = builder.schema().clone(); - - let reader = builder.build()?; - let batches = reader.collect::, ArrowError>>()?; - Ok((schema, batches)) -} - -#[allow(dead_code)] -pub fn write_parquet(path: &Path, batch: RecordBatch) { - let mut file = File::create(path).unwrap(); - - let schema = batch.schema(); - let mut writer = - parquet::arrow::arrow_writer::ArrowWriter::try_new(&mut file, schema, None).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); -} - pub fn load_into_table( conn: &mut C, schema: SchemaRef, - batches: Vec, + batches: &[RecordBatch], table_name: &str, -) -> Result<(SchemaRef, Vec), ConnectorError> +) -> Result<(), ConnectorError> where C: Connection + SchemaEdit, { @@ -60,15 +31,13 @@ where // write into table { let mut appender = conn.append(&table_name).unwrap(); - for batch in batches.clone() { - appender.append(batch).unwrap(); + for batch in batches { + appender.append(batch.clone()).unwrap(); } appender.finish().unwrap(); } - let schema_coerced = coerce::coerce_schema(schema, &C::coerce_type); - let batches_coerced = coerce::coerce_batches(&batches, C::coerce_type).unwrap(); - Ok((schema_coerced, batches_coerced)) + Ok(()) } pub fn query_table(