From 358cbf3d12e532be1a1166d5a8ab220c4b97bb8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alja=C5=BE=20Mur=20Er=C5=BEen?= Date: Mon, 12 Feb 2024 16:57:01 +0100 Subject: [PATCH] feat: expand supported transport types --- README.md | 24 +- connector_arrow/src/lib.rs | 1 + .../src/postgres/protocol_cursor.rs | 80 ++++-- .../src/postgres/protocol_simple.rs | 189 +++++++------ connector_arrow/src/sqlite/query.rs | 65 +++-- connector_arrow/src/types.rs | 78 ++++++ connector_arrow/src/util/row_writer.rs | 125 ++++++--- connector_arrow/src/util/transport.rs | 257 ++++++++++++------ 8 files changed, 559 insertions(+), 260 deletions(-) create mode 100644 connector_arrow/src/types.rs diff --git a/README.md b/README.md index 84f5717..2bdb357 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ None of the sources are enabled by default, use features to enable them. ## Types -When converting non-arrow data sources (everything except DuckDB), only a subset of all possible arrows types is produced. Here is a list of what it is currently possible to produce: +When converting from non-arrow data sources (everything except DuckDB), only a subset of all arrows types is produced. Here is a list of supported types: - [x] Null - [x] Boolean @@ -56,20 +56,20 @@ When converting non-arrow data sources (everything except DuckDB), only a subset - [x] UInt16 - [x] UInt32 - [x] UInt64 -- [ ] Float16 +- [x] Float16 - [x] Float32 - [x] Float64 -- [ ] Timestamp -- [ ] Date32 -- [ ] Date64 -- [ ] Time32 -- [ ] Time64 -- [ ] Duration -- [ ] Interval -- [ ] Binary -- [ ] FixedSizeBinary +- [x] Timestamp +- [x] Date32 +- [x] Date64 +- [x] Time32 +- [x] Time64 +- [x] Duration +- [x] Interval +- [x] Binary +- [x] FixedSizeBinary - [x] LargeBinary -- [ ] Utf8 +- [x] Utf8 - [x] LargeUtf8 - [ ] List - [ ] FixedSizeList diff --git a/connector_arrow/src/lib.rs b/connector_arrow/src/lib.rs index 0d94c6f..371fec8 100644 --- a/connector_arrow/src/lib.rs +++ b/connector_arrow/src/lib.rs @@ -37,6 +37,7 @@ pub mod api; mod errors; +pub mod types; pub mod util; #[cfg(feature = "src_duckdb")] diff --git a/connector_arrow/src/postgres/protocol_cursor.rs b/connector_arrow/src/postgres/protocol_cursor.rs index fcc51ad..9bb43a4 100644 --- a/connector_arrow/src/postgres/protocol_cursor.rs +++ b/connector_arrow/src/postgres/protocol_cursor.rs @@ -1,11 +1,12 @@ -use arrow::datatypes::SchemaRef; +use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use itertools::Itertools; use postgres::fallible_iterator::FallibleIterator; use postgres::{Row, RowIter}; use crate::api::{ResultReader, Statement}; -use crate::util::transport::{self, Produce, ProduceTy}; +use crate::types::{ArrowType, FixedSizeBinaryType}; +use crate::util::transport; use crate::util::{ArrowRowWriter, CellReader}; use crate::{errors::ConnectorError, util::RowsReader}; @@ -128,39 +129,64 @@ impl<'row> CellReader<'row> for PostgresCellReader { type CellRef<'a> = (&'a Row, usize); -impl<'c> Produce<'c> for CellRef<'c> {} +impl<'c> transport::Produce<'c> for CellRef<'c> {} macro_rules! impl_produce { ($($t: ty,)+) => { $( - impl<'c> ProduceTy<'c, $t> for CellRef<'c> { - fn produce(self) -> Result<$t, ConnectorError> { - Ok(self.0.get::(self.1)) + impl<'c> transport::ProduceTy<'c, $t> for CellRef<'c> { + fn produce(self) -> Result<<$t as ArrowType>::Native, ConnectorError> { + Ok(self.0.get(self.1)) } - fn produce_opt(self) -> Result, ConnectorError> { - Ok(self.0.get::>(self.1)) + fn produce_opt(self) -> Result::Native>, ConnectorError> { + Ok(self.0.get(self.1)) } } )+ }; } - -macro_rules! impl_produce_unimplemented { - ($($t: ty,)+) => { - $( - impl<'r> ProduceTy<'r, $t> for CellRef<'r> { - fn produce(self) -> Result<$t, ConnectorError> { - unimplemented!(); - } - - fn produce_opt(self) -> Result, ConnectorError> { - unimplemented!(); - } - } - )+ - }; -} - -impl_produce!(bool, i8, i16, i32, i64, f32, f64, Vec, String,); -impl_produce_unimplemented!(u8, u16, u32, u64,); +impl_produce!( + BooleanType, + Int8Type, + Int16Type, + Int32Type, + Int64Type, + Float32Type, + Float64Type, + LargeBinaryType, + LargeUtf8Type, +); + +crate::impl_produce_unused!( + CellRef<'r>, + ( + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float16Type, + TimestampSecondType, + TimestampMillisecondType, + TimestampMicrosecondType, + TimestampNanosecondType, + Date32Type, + Date64Type, + Time32SecondType, + Time32MillisecondType, + Time64MicrosecondType, + Time64NanosecondType, + IntervalYearMonthType, + IntervalDayTimeType, + IntervalMonthDayNanoType, + DurationSecondType, + DurationMillisecondType, + DurationMicrosecondType, + DurationNanosecondType, + BinaryType, + FixedSizeBinaryType, + Utf8Type, + Decimal128Type, + Decimal256Type, + ) +); diff --git a/connector_arrow/src/postgres/protocol_simple.rs b/connector_arrow/src/postgres/protocol_simple.rs index a6e4c2a..ef043dd 100644 --- a/connector_arrow/src/postgres/protocol_simple.rs +++ b/connector_arrow/src/postgres/protocol_simple.rs @@ -1,13 +1,10 @@ +use arrow::datatypes::*; use hex::decode; use postgres::{SimpleQueryMessage, SimpleQueryRow}; -use rust_decimal::Decimal; -use serde_json::Value; -use std::collections::HashMap; -use uuid::Uuid; use crate::api::Statement; -use crate::util::transport::{Produce, ProduceTy}; -use crate::util::{collect_rows_to_arrow, ArrowReader, CellReader}; +use crate::types::{ArrowType, FixedSizeBinaryType}; +use crate::util::{collect_rows_to_arrow, ArrowReader, CellReader, transport}; use crate::{errors::ConnectorError, util::RowsReader}; use super::{types, PostgresError, PostgresStatement, ProtocolSimple}; @@ -72,42 +69,27 @@ impl<'row> CellReader<'row> for PostgresCellReader { type CellRef<'a> = (&'a SimpleQueryRow, usize); -impl<'c> Produce<'c> for CellRef<'c> {} +impl<'c> transport::Produce<'c> for CellRef<'c> {} fn err_null() -> ConnectorError { ConnectorError::DataSchemaMismatch("NULL in non-nullable column".into()) } -macro_rules! impl_simple_produce_unimplemented { - ($($t: ty,)+) => { - $( - impl<'r> ProduceTy<'r, $t> for CellRef<'r> { - fn produce(self) -> Result<$t, ConnectorError> { - unimplemented!(); - } - - fn produce_opt(self) -> Result, ConnectorError> { - unimplemented!(); - } - } - )+ - }; -} - macro_rules! impl_simple_produce { ($($t: ty,)+) => { $( - impl<'r> ProduceTy<'r, $t> for CellRef<'r> { - fn produce(self) -> Result<$t, ConnectorError> { - self.produce_opt()?.ok_or_else(err_null) + impl<'r> transport::ProduceTy<'r, $t> for CellRef<'r> { + fn produce(self) -> Result<<$t as ArrowType>::Native, ConnectorError> { + transport::ProduceTy::<$t>::produce_opt(self)?.ok_or_else(err_null) } - fn produce_opt(self) -> Result, ConnectorError> { + fn produce_opt(self) -> Result::Native>, ConnectorError> { let s = self.0.get(self.1); Ok(match s { Some(s) => Some( - s.parse::<$t>().map_err(|e| ConnectorError::DataSchemaMismatch(e.to_string()))?, + s.parse::<<$t as ArrowType>::Native>() + .map_err(|e| ConnectorError::DataSchemaMismatch(e.to_string()))?, ), None => None, }) @@ -117,12 +99,49 @@ macro_rules! impl_simple_produce { }; } -impl_simple_produce!(i8, i16, i32, i64, f32, f64, Decimal, Uuid,); -impl_simple_produce_unimplemented!( - u64, u32, u16, u8, Value, HashMap>, +impl_simple_produce!( + Int8Type, + Int16Type, + Int32Type, + Int64Type, + Float32Type, + Float64Type, + Decimal256Type, +); + +crate::impl_produce_unused!( + CellRef<'r>, + ( + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float16Type, + TimestampSecondType, + TimestampMillisecondType, + TimestampMicrosecondType, + TimestampNanosecondType, + Date32Type, + Date64Type, + Time32SecondType, + Time32MillisecondType, + Time64MicrosecondType, + Time64NanosecondType, + IntervalYearMonthType, + IntervalDayTimeType, + IntervalMonthDayNanoType, + DurationSecondType, + DurationMillisecondType, + DurationMicrosecondType, + DurationNanosecondType, + BinaryType, + FixedSizeBinaryType, + Utf8Type, + Decimal128Type, + ) ); -impl<'r> ProduceTy<'r, String> for CellRef<'r> { +impl<'r> transport::ProduceTy<'r, LargeUtf8Type> for CellRef<'r> { fn produce(self) -> Result { let val = self.0.get(self.1).unwrap().to_string(); Ok(val) @@ -143,9 +162,9 @@ fn parse_bool(token: &str) -> Result { } } -impl<'r> ProduceTy<'r, bool> for CellRef<'r> { +impl<'r> transport::ProduceTy<'r, BooleanType> for CellRef<'r> { fn produce(self) -> Result { - self.produce_opt()?.ok_or_else(err_null) + transport::ProduceTy::::produce_opt(self)?.ok_or_else(err_null) } fn produce_opt(self) -> Result, ConnectorError> { @@ -154,9 +173,9 @@ impl<'r> ProduceTy<'r, bool> for CellRef<'r> { } } -impl<'r> ProduceTy<'r, Vec> for CellRef<'r> { +impl<'r> transport::ProduceTy<'r, LargeBinaryType> for CellRef<'r> { fn produce(self) -> Result, ConnectorError> { - self.produce_opt()?.ok_or_else(err_null) + transport::ProduceTy::::produce_opt(self)?.ok_or_else(err_null) } fn produce_opt(self) -> Result>, ConnectorError> { @@ -180,62 +199,62 @@ impl<'r> ProduceTy<'r, Vec> for CellRef<'r> { } } -fn rem_first_and_last(value: &str) -> &str { - let mut chars = value.chars(); - chars.next(); - chars.next_back(); - chars.as_str() -} +// fn rem_first_and_last(value: &str) -> &str { +// let mut chars = value.chars(); +// chars.next(); +// chars.next_back(); +// chars.as_str() +// } -fn parse_array(val: Option<&str>, item_parser: F) -> Result>, ConnectorError> -where - F: Fn(&str) -> Result, -{ - Ok(match val { - None | Some("") => None, - Some("{}") => Some(vec![]), - Some(s) => Some( - rem_first_and_last(s) - .split(',') - .map(item_parser) - .collect::, ConnectorError>>()?, - ), - }) -} +// fn parse_array(val: Option<&str>, item_parser: F) -> Result>, ConnectorError> +// where +// F: Fn(&str) -> Result, +// { +// Ok(match val { +// None | Some("") => None, +// Some("{}") => Some(vec![]), +// Some(s) => Some( +// rem_first_and_last(s) +// .split(',') +// .map(item_parser) +// .collect::, ConnectorError>>()?, +// ), +// }) +// } -macro_rules! impl_simple_vec_produce { - ($($t: ty,)+) => { - $( - impl<'r> ProduceTy<'r, Vec<$t>> for CellRef<'r> { - fn produce(self) -> Result, ConnectorError> { - self.produce_opt()?.ok_or_else(err_null) - } +// macro_rules! impl_simple_vec_produce { +// ($($t: ty,)+) => { +// $( +// impl<'r> ProduceTy<'r, Vec<$t>> for CellRef<'r> { +// fn produce(self) -> Result, ConnectorError> { +// self.produce_opt()?.ok_or_else(err_null) +// } - fn produce_opt(self) -> Result>, ConnectorError> { - let s = self.0.get(self.1); +// fn produce_opt(self) -> Result>, ConnectorError> { +// let s = self.0.get(self.1); - parse_array( - s, - |token| token.parse::<$t>().map_err(|e| ConnectorError::DataSchemaMismatch(e.to_string())) - ) - } - } - )+ - }; -} -impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,); +// parse_array( +// s, +// |token| token.parse::<$t>().map_err(|e| ConnectorError::DataSchemaMismatch(e.to_string())) +// ) +// } +// } +// )+ +// }; +// } +// impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,); -impl<'r> ProduceTy<'r, Vec> for CellRef<'r> { - fn produce(self) -> Result, ConnectorError> { - self.produce_opt()?.ok_or_else(err_null) - } +// impl<'r> ProduceTy<'r, Vec> for CellRef<'r> { +// fn produce(self) -> Result, ConnectorError> { +// self.produce_opt()?.ok_or_else(err_null) +// } - fn produce_opt(self) -> Result>, ConnectorError> { - let s = self.0.get(self.1); +// fn produce_opt(self) -> Result>, ConnectorError> { +// let s = self.0.get(self.1); - parse_array(s, parse_bool) - } -} +// parse_array(s, parse_bool) +// } +// } // impl<'r> ProduceTy<'r, NaiveDate> for CellRef<'r> { // fn produce(self) -> NaiveDate { diff --git a/connector_arrow/src/sqlite/query.rs b/connector_arrow/src/sqlite/query.rs index a9c21e7..da9adab 100644 --- a/connector_arrow/src/sqlite/query.rs +++ b/connector_arrow/src/sqlite/query.rs @@ -1,10 +1,11 @@ use std::sync::Arc; -use arrow::datatypes::DataType; +use arrow::datatypes::*; use itertools::zip_eq; use rusqlite::types::Value; use crate::api::Statement; +use crate::types::FixedSizeBinaryType; use crate::util::transport::{Produce, ProduceTy}; use crate::util::ArrowReader; use crate::util::{collect_rows_to_arrow, CellReader, RowsReader}; @@ -121,7 +122,7 @@ impl<'rows> CellReader<'rows> for SQLiteCellReader { impl<'r> Produce<'r> for Value {} -impl<'r> ProduceTy<'r, i64> for Value { +impl<'r> ProduceTy<'r, Int64Type> for Value { fn produce(self) -> Result { unimplemented!() } @@ -134,7 +135,7 @@ impl<'r> ProduceTy<'r, i64> for Value { } } -impl<'r> ProduceTy<'r, f64> for Value { +impl<'r> ProduceTy<'r, Float64Type> for Value { fn produce(self) -> Result { unimplemented!() } @@ -147,7 +148,7 @@ impl<'r> ProduceTy<'r, f64> for Value { } } -impl<'r> ProduceTy<'r, String> for Value { +impl<'r> ProduceTy<'r, LargeUtf8Type> for Value { fn produce(self) -> Result { unimplemented!() } @@ -160,7 +161,7 @@ impl<'r> ProduceTy<'r, String> for Value { } } -impl<'r> ProduceTy<'r, Vec> for Value { +impl<'r> ProduceTy<'r, LargeBinaryType> for Value { fn produce(self) -> Result, ConnectorError> { unimplemented!() } @@ -173,20 +174,40 @@ impl<'r> ProduceTy<'r, Vec> for Value { } } -macro_rules! impl_produce_unimplemented { - ($($t: ty,)+) => { - $( - impl<'r> ProduceTy<'r, $t> for Value { - fn produce(self) -> Result<$t, ConnectorError> { - unimplemented!(); - } - - fn produce_opt(self) -> Result, ConnectorError> { - unimplemented!(); - } - } - )+ - }; -} - -impl_produce_unimplemented!(bool, i8, i16, i32, u8, u16, u32, u64, f32,); +crate::impl_produce_unused!( + Value, + ( + BooleanType, + Int8Type, + Int16Type, + Int32Type, + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float16Type, + Float32Type, + TimestampSecondType, + TimestampMillisecondType, + TimestampMicrosecondType, + TimestampNanosecondType, + Date32Type, + Date64Type, + Time32SecondType, + Time32MillisecondType, + Time64MicrosecondType, + Time64NanosecondType, + IntervalYearMonthType, + IntervalDayTimeType, + IntervalMonthDayNanoType, + DurationSecondType, + DurationMillisecondType, + DurationMicrosecondType, + DurationNanosecondType, + BinaryType, + FixedSizeBinaryType, + Utf8Type, + Decimal128Type, + Decimal256Type, + ) +); diff --git a/connector_arrow/src/types.rs b/connector_arrow/src/types.rs new file mode 100644 index 0000000..c5d7674 --- /dev/null +++ b/connector_arrow/src/types.rs @@ -0,0 +1,78 @@ +use arrow::datatypes::*; + +/// For a given arrow type, this trait associates: +/// - the Rust type that represents it (i.e. [Int8Type]), +/// - its Rust-native representation (i.e. [i8]), which might not be unique. +pub trait ArrowType { + type Native: Sized; +} + +// arrow crate does not define null type (as it contains no data), +// but we need it for [Consume] +pub struct NullType; + +// arrow crate does not define fixed-sized binary array type +pub struct FixedSizeBinaryType; + +impl ArrowType for NullType { + type Native = (); +} +impl ArrowType for BooleanType { + type Native = bool; +} +macro_rules! impl_arrow_primitive_type { + ($($t: ty,)+) => { + $( + impl ArrowType for $t { + type Native = ::Native; + } + )+ + }; +} +impl_arrow_primitive_type!( + Int8Type, + Int16Type, + Int32Type, + Int64Type, + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float16Type, + Float32Type, + Float64Type, + TimestampSecondType, + TimestampMillisecondType, + TimestampMicrosecondType, + TimestampNanosecondType, + Date32Type, + Date64Type, + Time32SecondType, + Time32MillisecondType, + Time64MicrosecondType, + Time64NanosecondType, + IntervalYearMonthType, + IntervalDayTimeType, + IntervalMonthDayNanoType, + DurationSecondType, + DurationMillisecondType, + DurationMicrosecondType, + DurationNanosecondType, + Decimal128Type, + Decimal256Type, +); +impl ArrowType for BinaryType { + type Native = Vec; +} +impl ArrowType for LargeBinaryType { + type Native = Vec; +} +impl ArrowType for FixedSizeBinaryType { + type Native = Vec; +} +impl ArrowType for Utf8Type { + type Native = String; +} +impl ArrowType for LargeUtf8Type { + type Native = String; +} diff --git a/connector_arrow/src/util/row_writer.rs b/connector_arrow/src/util/row_writer.rs index a06be65..e70eb78 100644 --- a/connector_arrow/src/util/row_writer.rs +++ b/connector_arrow/src/util/row_writer.rs @@ -1,11 +1,12 @@ use std::any::Any; use arrow::array::{ArrayBuilder, ArrayRef}; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::*; use arrow::record_batch::RecordBatch; -use super::transport::Consume; use crate::errors::ConnectorError; +use crate::types::{ArrowType, FixedSizeBinaryType, NullType}; +use crate::util::transport::{Consume, ConsumeTy}; /// Receives values row-by-row and passes them to [ArrayBuilder]s, /// which construct [RecordBatch]es. @@ -145,20 +146,16 @@ impl Organizer { } macro_rules! impl_consume_ty { - ( + ($({ $ArrTy:ty => $Builder:tt } )*) => { $( - { $Native:ty => $Builder:tt } - )* - ) => { - $( - impl super::transport::ConsumeTy<$Native> for ArrowRowWriter { - fn consume(&mut self, value: $Native) { + impl ConsumeTy<$ArrTy> for ArrowRowWriter { + fn consume(&mut self, value: <$ArrTy as ArrowType>::Native) { self.next_builder() .downcast_mut::() .expect(concat!("bad cast to ", stringify!($Builder))) .append_value(value); } - fn consume_opt(&mut self, value: Option<$Native>) { + fn consume_opt(&mut self, value: Option<<$ArrTy as ArrowType>::Native>) { self.next_builder() .downcast_mut::() .expect(concat!("bad cast to ", stringify!($Builder))) @@ -172,31 +169,44 @@ macro_rules! impl_consume_ty { // List of ConsumeTy implementations to generate. // Must match with arrow::array::make_builder impl_consume_ty! { - // { () => NullBuilder } // Null - custom implementation - { bool => BooleanBuilder } // Boolean - { i8 => Int8Builder } // Int8 - { i16 => Int16Builder } // Int16 - { i32 => Int32Builder } // Int32 - { i64 => Int64Builder } // Int64 - { u8 => UInt8Builder } // UInt8 - { u16 => UInt16Builder } // UInt16 - { u32 => UInt32Builder } // UInt32 - { u64 => UInt64Builder } // UInt64 - // { => Float16Builder } // Float16 - no Rust native type - { f32 => Float32Builder } // Float32 - { f64 => Float64Builder } // Float64 - // { => BinaryBuilder } // Binary - no Rust native type - { Vec => LargeBinaryBuilder } // LargeBinary - // { => FixedSizeBinaryBuilder } // FixedSizeBinary - no Rust native type - // { => Decimal128Builder } // Decimal128 - no Rust native type - // { => Decimal256Builder } // Decimal256 - no Rust native type - // { => StringBuilder } // Utf8 - no Rust native type - { String => LargeStringBuilder } // LargeUtf8 - // { => Date32Builder } // Date32 - no Rust native type - // { => Date64Builder } // Date64 - no Rust native type +// { Null => NullBuilder } custom implementation + { BooleanType => BooleanBuilder } + { Int8Type => Int8Builder } + { Int16Type => Int16Builder } + { Int32Type => Int32Builder } + { Int64Type => Int64Builder } + { UInt8Type => UInt8Builder } + { UInt16Type => UInt16Builder } + { UInt32Type => UInt32Builder } + { UInt64Type => UInt64Builder } + { Float16Type => Float16Builder } + { Float32Type => Float32Builder } + { Float64Type => Float64Builder } + + { TimestampSecondType => TimestampSecondBuilder } + { TimestampMillisecondType => TimestampMillisecondBuilder } + { TimestampMicrosecondType => TimestampMicrosecondBuilder } + { TimestampNanosecondType => TimestampNanosecondBuilder } + { Date32Type => Date32Builder } + { Date64Type => Date64Builder } + { Time32SecondType => Time32SecondBuilder } + { Time32MillisecondType => Time32MillisecondBuilder } + { Time64MicrosecondType => Time64MicrosecondBuilder } + { Time64NanosecondType => Time64NanosecondBuilder } + { IntervalYearMonthType => IntervalYearMonthBuilder } + { IntervalDayTimeType => IntervalDayTimeBuilder } + { IntervalMonthDayNanoType => IntervalMonthDayNanoBuilder } + { DurationSecondType => DurationSecondBuilder } + { DurationMillisecondType => DurationMillisecondBuilder } + { DurationMicrosecondType => DurationMicrosecondBuilder } + { DurationNanosecondType => DurationNanosecondBuilder } + + { Decimal128Type => Decimal128Builder } + { Decimal256Type => Decimal256Builder } + } -impl super::transport::ConsumeTy<()> for ArrowRowWriter { +impl ConsumeTy for ArrowRowWriter { fn consume(&mut self, _: ()) { self.next_builder(); } @@ -204,3 +214,52 @@ impl super::transport::ConsumeTy<()> for ArrowRowWriter { self.next_builder(); } } + +macro_rules! impl_consume_ref_ty { + ($({ $ArrTy:ty => $Builder:tt })*) => { + $( + impl ConsumeTy<$ArrTy> for ArrowRowWriter { + fn consume(&mut self, value: <$ArrTy as ArrowType>::Native) { + self.next_builder() + .downcast_mut::() + .expect(concat!("bad cast to ", stringify!($Builder))) + .append_value(&value); + } + fn consume_opt(&mut self, value: Option<<$ArrTy as ArrowType>::Native>) { + self.next_builder() + .downcast_mut::() + .expect(concat!("bad cast to ", stringify!($Builder))) + .append_option(value); + } + } + )+ + }; +} +impl_consume_ref_ty! { + { BinaryType => BinaryBuilder } + { LargeBinaryType => LargeBinaryBuilder } +// { FixedSizeBinaryType => FixedSizeBinaryBuilder } custom impl + { Utf8Type => StringBuilder } + { LargeUtf8Type => LargeStringBuilder } +} + +impl ConsumeTy for ArrowRowWriter { + fn consume(&mut self, value: ::Native) { + self.next_builder() + .downcast_mut::() + .expect(concat!("bad cast to ", stringify!(FixedSizeBinaryBuilder))) + .append_value(&value) + .unwrap(); + } + fn consume_opt(&mut self, value: Option<::Native>) { + let builder = self + .next_builder() + .downcast_mut::() + .expect(concat!("bad cast to ", stringify!(FixedSizeBinaryBuilder))); + if let Some(value) = value { + builder.append_value(value).unwrap(); + } else { + builder.append_null(); + } + } +} diff --git a/connector_arrow/src/util/transport.rs b/connector_arrow/src/util/transport.rs index 7b8cbf1..895b6b0 100644 --- a/connector_arrow/src/util/transport.rs +++ b/connector_arrow/src/util/transport.rs @@ -1,120 +1,215 @@ -/// Moving of typed values from a producer into a consumer -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::*; +/// Moving of typed values from a producer into a consumer use crate::errors::ConnectorError; +use crate::types::{ArrowType, FixedSizeBinaryType, NullType}; + +macro_rules! impl_transport_match { + ($f: expr, $c: expr, $p: expr, $({ $Pat: pat => $ArrTy: ty })*) => { + if !$f.is_nullable() { + match $f.data_type() { + Null => ConsumeTy::::consume($c, ()), + $( + $Pat => ConsumeTy::<$ArrTy>::consume($c, ProduceTy::<$ArrTy>::produce($p)?), + )* + _ => todo!("unimplemented transport of {:?}", $f.data_type()), + } + } else { + match $f.data_type() { + Null => ConsumeTy::::consume_opt($c, Some(())), + $( + $Pat => ConsumeTy::<$ArrTy>::consume_opt($c, ProduceTy::<$ArrTy>::produce_opt($p)?), + )* + _ => todo!("unimplemented transport of {:?}", $f.data_type()), + } + } + }; +} /// Take a value of type `ty` from [Produce] and insert it into [Consume]. pub fn transport<'r, P: Produce<'r>, C: Consume>( - f: &Field, - p: P, - c: &mut C, + field: &Field, + producer: P, + consumer: &mut C, ) -> Result<(), ConnectorError> { - log::debug!("transporting value of type {f:?}"); + log::debug!("transporting value of type {field:?}"); // TODO: connector-x goes a step further here: instead of having this match in the hot path, // this function returns a transporter function that is stored in a vec, for faster lookup. use DataType::*; - - if !f.is_nullable() { - match f.data_type() { - Null => c.consume(()), - Boolean => c.consume(ProduceTy::::produce(p)?), - Int8 => c.consume(ProduceTy::::produce(p)?), - Int16 => c.consume(ProduceTy::::produce(p)?), - Int32 => c.consume(ProduceTy::::produce(p)?), - Int64 => c.consume(ProduceTy::::produce(p)?), - UInt8 => c.consume(ProduceTy::::produce(p)?), - UInt16 => c.consume(ProduceTy::::produce(p)?), - UInt32 => c.consume(ProduceTy::::produce(p)?), - UInt64 => c.consume(ProduceTy::::produce(p)?), - Float32 => c.consume(ProduceTy::::produce(p)?), - Float64 => c.consume(ProduceTy::::produce(p)?), - Binary | LargeBinary => c.consume(ProduceTy::>::produce(p)?), - Utf8 | LargeUtf8 => c.consume(ProduceTy::::produce(p)?), - _ => todo!("unimplemented transport of {:?}", f.data_type()), - } - } else { - match f.data_type() { - Null => c.consume_opt(Some(())), - Boolean => c.consume_opt(ProduceTy::::produce_opt(p)?), - Int8 => c.consume_opt(ProduceTy::::produce_opt(p)?), - Int16 => c.consume_opt(ProduceTy::::produce_opt(p)?), - Int32 => c.consume_opt(ProduceTy::::produce_opt(p)?), - Int64 => c.consume_opt(ProduceTy::::produce_opt(p)?), - UInt8 => c.consume_opt(ProduceTy::::produce_opt(p)?), - UInt16 => c.consume_opt(ProduceTy::::produce_opt(p)?), - UInt32 => c.consume_opt(ProduceTy::::produce_opt(p)?), - UInt64 => c.consume_opt(ProduceTy::::produce_opt(p)?), - Float32 => c.consume_opt(ProduceTy::::produce_opt(p)?), - Float64 => c.consume_opt(ProduceTy::::produce_opt(p)?), - Binary | LargeBinary => c.consume_opt(ProduceTy::>::produce_opt(p)?), - Utf8 | LargeUtf8 => c.consume_opt(ProduceTy::::produce_opt(p)?), - _ => todo!("unimplemented transport of {:?}", f.data_type()), - } - } + impl_transport_match!( + field, + consumer, + producer, + { Boolean => BooleanType } + { Int8 => Int8Type } + { Int16 => Int16Type } + { Int32 => Int32Type } + { Int64 => Int64Type } + { UInt8 => UInt8Type } + { UInt16 => UInt16Type } + { UInt32 => UInt32Type } + { UInt64 => UInt64Type } + { Float16 => Float16Type } + { Float32 => Float32Type } + { Float64 => Float64Type } + { Timestamp(TimeUnit::Second, _) => TimestampSecondType } + { Timestamp(TimeUnit::Millisecond, _) => TimestampMillisecondType } + { Timestamp(TimeUnit::Microsecond, _) => TimestampMicrosecondType } + { Timestamp(TimeUnit::Nanosecond, _) => TimestampNanosecondType } + { Date32 => Date32Type } + { Date64 => Date64Type } + { Time32(TimeUnit::Second) => Time32SecondType } + { Time32(TimeUnit::Millisecond) => Time32MillisecondType } + { Time64(TimeUnit::Microsecond) => Time64MicrosecondType } + { Time64(TimeUnit::Nanosecond) => Time64NanosecondType } + { Interval(IntervalUnit::YearMonth) => IntervalYearMonthType } + { Interval(IntervalUnit::DayTime) => IntervalDayTimeType } + { Interval(IntervalUnit::MonthDayNano) => IntervalMonthDayNanoType } + { Duration(TimeUnit::Second) => DurationSecondType } + { Duration(TimeUnit::Millisecond) => DurationMillisecondType } + { Duration(TimeUnit::Microsecond) => DurationMicrosecondType } + { Duration(TimeUnit::Nanosecond) => DurationNanosecondType } + { Binary => BinaryType } + { LargeBinary => LargeBinaryType } + { FixedSizeBinary(_) => FixedSizeBinaryType } + { Utf8 => Utf8Type } + { LargeUtf8 => LargeUtf8Type } + { Decimal128(_, _) => Decimal128Type } + { Decimal256(_, _) => Decimal256Type } + ); Ok(()) } +/// Ability to produce values of all arrow types. pub trait Produce<'r>: - ProduceTy<'r, bool> - + ProduceTy<'r, i8> - + ProduceTy<'r, i16> - + ProduceTy<'r, i32> - + ProduceTy<'r, i64> - + ProduceTy<'r, u8> - + ProduceTy<'r, u16> - + ProduceTy<'r, u32> - + ProduceTy<'r, u64> - + ProduceTy<'r, f32> - + ProduceTy<'r, f64> - + ProduceTy<'r, Vec> - + ProduceTy<'r, String> + ProduceTy<'r, BooleanType> + + ProduceTy<'r, Int8Type> + + ProduceTy<'r, Int16Type> + + ProduceTy<'r, Int32Type> + + ProduceTy<'r, Int64Type> + + ProduceTy<'r, UInt8Type> + + ProduceTy<'r, UInt16Type> + + ProduceTy<'r, UInt32Type> + + ProduceTy<'r, UInt64Type> + + ProduceTy<'r, Float16Type> + + ProduceTy<'r, Float32Type> + + ProduceTy<'r, Float64Type> + + ProduceTy<'r, TimestampSecondType> + + ProduceTy<'r, TimestampMillisecondType> + + ProduceTy<'r, TimestampMicrosecondType> + + ProduceTy<'r, TimestampNanosecondType> + + ProduceTy<'r, Date32Type> + + ProduceTy<'r, Date64Type> + + ProduceTy<'r, Time32SecondType> + + ProduceTy<'r, Time32MillisecondType> + + ProduceTy<'r, Time64MicrosecondType> + + ProduceTy<'r, Time64NanosecondType> + + ProduceTy<'r, IntervalYearMonthType> + + ProduceTy<'r, IntervalDayTimeType> + + ProduceTy<'r, IntervalMonthDayNanoType> + + ProduceTy<'r, DurationSecondType> + + ProduceTy<'r, DurationMillisecondType> + + ProduceTy<'r, DurationMicrosecondType> + + ProduceTy<'r, DurationNanosecondType> + + ProduceTy<'r, BinaryType> + + ProduceTy<'r, LargeBinaryType> + + ProduceTy<'r, FixedSizeBinaryType> + + ProduceTy<'r, Utf8Type> + + ProduceTy<'r, LargeUtf8Type> + + ProduceTy<'r, Decimal128Type> + + ProduceTy<'r, Decimal256Type> { } -pub trait ProduceTy<'r, T> { - fn produce(self) -> Result; +/// Ability to produce a value of an arrow type +pub trait ProduceTy<'r, T: ArrowType> { + fn produce(self) -> Result; - fn produce_opt(self) -> Result, ConnectorError>; + fn produce_opt(self) -> Result, ConnectorError>; } +/// Ability to consume values of all arrow types. pub trait Consume: - ConsumeTy<()> - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy - + ConsumeTy> - + ConsumeTy + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy + + ConsumeTy { } -pub trait ConsumeTy { - fn consume(&mut self, value: T); - fn consume_opt(&mut self, value: Option); +/// Ability to consume a value of an an arrow type +pub trait ConsumeTy { + fn consume(&mut self, value: T::Native); + fn consume_opt(&mut self, value: Option); } pub mod print { - use super::{Consume, ConsumeTy}; + use super::{ArrowType, Consume, ConsumeTy}; pub struct PrintConsumer(); impl Consume for PrintConsumer {} - impl ConsumeTy for PrintConsumer { - fn consume(&mut self, value: T) { + impl ConsumeTy for PrintConsumer + where + T::Native: std::fmt::Debug, + { + fn consume(&mut self, value: T::Native) { println!("{}: {value:?}", std::any::type_name::()); } - fn consume_opt(&mut self, value: Option) { + fn consume_opt(&mut self, value: Option) { println!("{}: {value:?}", std::any::type_name::()); } } } + +#[macro_export] +macro_rules! impl_produce_unused { + ($p: ty, ($($t: ty,)+)) => { + $( + impl<'r> $crate::util::transport::ProduceTy<'r, $t> for $p { + fn produce(self) -> Result<<$t as $crate::types::ArrowType>::Native, ConnectorError> { + unimplemented!(); + } + fn produce_opt(self) -> Result::Native>, ConnectorError> { + unimplemented!(); + } + } + )+ + }; +}