Skip to content

Commit

Permalink
refactor: consume_opt
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen committed Feb 19, 2024
1 parent 25b3bc9 commit f7865a4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
31 changes: 15 additions & 16 deletions connector_arrow/src/util/row_writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;

use arrow::array::{ArrayBuilder, ArrayRef};
use arrow::array::{ArrayBuilder, ArrayRef, FixedSizeBinaryBuilder};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;

Expand Down Expand Up @@ -155,11 +155,12 @@ macro_rules! impl_consume_ty {
.expect(concat!("bad cast to ", stringify!($Builder)))
.append_value(value);
}
fn consume_opt(&mut self, value: Option<<$ArrTy as ArrowType>::Native>) {

fn consume_null(&mut self) {
self.next_builder()
.downcast_mut::<arrow::array::builder::$Builder>()
.expect(concat!("bad cast to ", stringify!($Builder)))
.append_option(value);
.append_null();
}
}
)+
Expand Down Expand Up @@ -210,7 +211,8 @@ impl ConsumeTy<NullType> for ArrowRowWriter {
fn consume(&mut self, _: ()) {
self.next_builder();
}
fn consume_opt(&mut self, _: Option<()>) {

fn consume_null(&mut self) {
self.next_builder();
}
}
Expand All @@ -225,11 +227,12 @@ macro_rules! impl_consume_ref_ty {
.expect(concat!("bad cast to ", stringify!($Builder)))
.append_value(&value);
}
fn consume_opt(&mut self, value: Option<<$ArrTy as ArrowType>::Native>) {

fn consume_null(&mut self) {
self.next_builder()
.downcast_mut::<arrow::array::builder::$Builder>()
.expect(concat!("bad cast to ", stringify!($Builder)))
.append_option(value);
.append_null();
}
}
)+
Expand All @@ -251,15 +254,11 @@ impl ConsumeTy<FixedSizeBinaryType> for ArrowRowWriter {
.append_value(&value)
.unwrap();
}
fn consume_opt(&mut self, value: Option<<FixedSizeBinaryType as ArrowType>::Native>) {
let builder = self
.next_builder()
.downcast_mut::<arrow::array::builder::FixedSizeBinaryBuilder>()
.expect(concat!("bad cast to ", stringify!(FixedSizeBinaryBuilder)));
if let Some(value) = value {
builder.append_value(value).unwrap();
} else {
builder.append_null();
}

fn consume_null(&mut self) {
self.next_builder()
.downcast_mut::<FixedSizeBinaryBuilder>()
.expect(concat!("bad cast to ", stringify!($Builder)))
.append_null();
}
}
18 changes: 13 additions & 5 deletions connector_arrow/src/util/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ macro_rules! impl_transport_match {
}
} else {
match $f.data_type() {
Null => ConsumeTy::<NullType>::consume_opt($c, Some(())),
Null => ConsumeTy::<NullType>::consume($c, ()),
$(
$Pat => ConsumeTy::<$ArrTy>::consume_opt($c, ProduceTy::<$ArrTy>::produce_opt($p)?),
$Pat => {
if let Some(v) = ProduceTy::<$ArrTy>::produce_opt($p)? {
ConsumeTy::<$ArrTy>::consume($c, v)
} else {
ConsumeTy::<$ArrTy>::consume_null($c)
}
},
)*
_ => todo!("unimplemented transport of {:?}", $f.data_type()),
}
Expand Down Expand Up @@ -175,7 +181,8 @@ pub trait Consume:
/// Ability to consume a value of an an arrow type
pub trait ConsumeTy<T: ArrowType> {
fn consume(&mut self, value: T::Native);
fn consume_opt(&mut self, value: Option<T::Native>);

fn consume_null(&mut self);
}

pub mod print {
Expand All @@ -192,8 +199,9 @@ pub mod print {
fn consume(&mut self, value: T::Native) {
println!("{}: {value:?}", std::any::type_name::<T>());
}
fn consume_opt(&mut self, value: Option<T::Native>) {
println!("{}: {value:?}", std::any::type_name::<T>());

fn consume_null(&mut self) {
println!("{}: null", std::any::type_name::<T>());
}
}
}
Expand Down

0 comments on commit f7865a4

Please sign in to comment.