From bbd7c8b6c3a192e503ae860e77dc18090161a128 Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 18 Nov 2024 14:11:41 -0600 Subject: [PATCH 01/10] Added set up for the example of flattening from pyarrow. --- arrow-array/src/record_batch.rs | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 372ca63f30a..a9e410a872c 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1206,6 +1206,41 @@ mod tests { assert_ne!(batch1, batch2); } + #[test] + fn flattening() { + let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); + let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); + let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); + + let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); + let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); + let year_field = Arc::new(Field::new("year", DataType::Int64, true)); + + let a = Arc::new(StructArray::from(vec![ + (animals_field.clone(), Arc::new(animals) as ArrayRef), + (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), + (year_field.clone(), Arc::new(year) as ArrayRef), + ])); + let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Struct(Fields::from(vec![ + animals_field, + n_legs_field, + year_field, + ])), false), + Field::new("month", DataType::Int64, true) + ]); + + let record_batch = + RecordBatch::try_new(Arc::new(schema), vec![ + a, + month, + ]).expect("valid conversion"); + + println!("{:?}", record_batch); + } + #[test] fn project() { let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); From 8abcd2542caa32fb2ca57142f1cfe4febea6cce5 Mon Sep 17 00:00:00 2001 From: nglime Date: Tue, 19 Nov 2024 21:10:04 -0600 Subject: [PATCH 02/10] Logic for recursive normalizer with a base normalize function, based on pola-rs. --- arrow-array/src/record_batch.rs | 32 ++++++------ arrow-schema/src/schema.rs | 87 ++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 15 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index a9e410a872c..b644a0d6231 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -19,8 +19,8 @@ //! [schema](arrow_schema::Schema). use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef}; -use std::ops::Index; +use arrow_schema::{ArrowError, DataType, Field, Fields, Schema, SchemaBuilder, SchemaRef}; +use std::ops::{Deref, Index}; use std::sync::Arc; /// Trait for types that can read `RecordBatch`'s. @@ -1207,7 +1207,7 @@ mod tests { } #[test] - fn flattening() { + fn normalize() { let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); @@ -1224,21 +1224,23 @@ mod tests { let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); let schema = Schema::new(vec![ - Field::new("a", DataType::Struct(Fields::from(vec![ - animals_field, - n_legs_field, - year_field, - ])), false), - Field::new("month", DataType::Int64, true) + Field::new( + "a", + DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), + false, + ), + Field::new("month", DataType::Int64, true), ]); + let normalized = schema.clone().normalize(".", 0).unwrap(); + println!("{:?}", normalized); let record_batch = - RecordBatch::try_new(Arc::new(schema), vec![ - a, - month, - ]).expect("valid conversion"); + RecordBatch::try_new(Arc::new(schema), vec![a, month]).expect("valid conversion"); + + println!("Fields: {:?}", record_batch.schema().fields()); + println!("Metadata{:?}", record_batch.columns()); - println!("{:?}", record_batch); + //println!("{:?}", record_batch); } #[test] @@ -1353,7 +1355,9 @@ mod tests { let metadata = vec![("foo".to_string(), "bar".to_string())] .into_iter() .collect(); + println!("Metadata: {:?}", metadata); let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata); + println!("Metadata schema: {:?}", metadata_schema); let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap(); // Cannot remove metadata diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index cc3a8a308a8..9924f2b5be1 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::error::ArrowError; use crate::field::Field; -use crate::{FieldRef, Fields}; +use crate::{DataType, FieldRef, Fields}; /// A builder to facilitate building a [`Schema`] from iteratively from [`FieldRef`] #[derive(Debug, Default)] @@ -413,6 +413,91 @@ impl Schema { &self.metadata } + pub fn normalize(self, separator: &str, mut max_level: usize) -> Result { + if max_level == 0 { + max_level = usize::MAX; + } + let mut new_fields: Vec = vec![]; + for field in self.fields() { + match field.data_type() { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + DataType::Struct(nested_fields) => { + let field_name = field.name().as_str(); + new_fields = [ + new_fields, + Self::normalizer( + nested_fields.to_vec(), + field_name, + separator, + max_level - 1, + ), + ] + .concat(); + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => new_fields.push(Field::new( + field.name(), + field.data_type().clone(), + field.is_nullable(), + )), + }; + } + Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) + } + + fn normalizer( + fields: Vec, + key_string: &str, + separator: &str, + max_level: usize, + ) -> Vec { + if max_level > 0 { + let mut new_fields: Vec = vec![]; + for field in fields { + match field.data_type() { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + DataType::Struct(nested_fields) => { + let field_name = field.name().as_str(); + let new_key = format!("{key_string}{separator}{field_name}"); + new_fields = [ + new_fields, + Self::normalizer( + nested_fields.to_vec(), + new_key.as_str(), + separator, + max_level - 1, + ), + ] + .concat(); + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => new_fields.push(Field::new( + format!("{key_string}{separator}{}", field.name()), + field.data_type().clone(), + field.is_nullable(), + )), + }; + } + new_fields + } else { + todo!() + } + } + /// Look up a column by name and return a immutable reference to the column along with /// its index. pub fn column_with_name(&self, name: &str) -> Option<(usize, &Field)> { From 6bba7d39925f0f2f1e25f96dc3772e86005a8870 Mon Sep 17 00:00:00 2001 From: nglime Date: Fri, 22 Nov 2024 22:37:01 -0600 Subject: [PATCH 03/10] Added recursive normalize function for `Schema`, and started building iterative function for `RecordBatch`. Not sure which one is better currently. --- arrow-array/src/record_batch.rs | 58 +++++++++++++++++++++++++++++++-- arrow-schema/src/schema.rs | 14 ++++---- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index b644a0d6231..2be5ca10903 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -18,8 +18,9 @@ //! A two-dimensional batch of column-oriented data with a defined //! [schema](arrow_schema::Schema). +use std::collections::{BinaryHeap, VecDeque}; use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, Fields, Schema, SchemaBuilder, SchemaRef}; +use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef}; use std::ops::{Deref, Index}; use std::sync::Arc; @@ -403,7 +404,59 @@ impl RecordBatch { ) } - /// Returns the number of columns in the record batch. + /// Normalize a semi-structured RecordBatch into a flat table + /// If max_level is 0, normalizes all levels. + pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { + if max_level == 0 { + max_level = usize::MAX; + } + if self.num_rows() == 0 { + // No data, only need to normalize the schema + return Ok(Self::new_empty(Arc::new(self.schema.normalize(separator, max_level)?))); + } + let mut queue: VecDeque<(usize, &Arc, &FieldRef)> = VecDeque::new(); + + // push fields + for (c, f) in self.columns.iter().zip(self.schema().fields()) { + queue.push_front((0, c, f)); + } + + while !queue.is_empty() { + match queue.pop_front() { + Some((depth, c, f)) => { + match f.data_type() { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + DataType::Struct(nested_fields) => { + let field_name = f.name().as_str(); + /*new_fields = [ + new_fields, + Self::normalizer( + nested_fields.to_vec(), + field_name, + separator, + max_level - 1, + ), + ] + .concat();*/ + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => queue.push_front((0, c, f)), + } + }, + None => break, + }; + } + todo!() + } + + /// Returns the number of columns in the record batch. /// /// # Example /// @@ -1216,6 +1269,7 @@ mod tests { let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); let year_field = Arc::new(Field::new("year", DataType::Int64, true)); + let a = Arc::new(StructArray::from(vec![ (animals_field.clone(), Arc::new(animals) as ArrayRef), (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 9924f2b5be1..62f74c8b435 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -413,7 +413,9 @@ impl Schema { &self.metadata } - pub fn normalize(self, separator: &str, mut max_level: usize) -> Result { + /// Returns a new schema, normalized based on the max_level + /// This carries metadata from the parent schema over as well + pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { if max_level == 0 { max_level = usize::MAX; } @@ -462,11 +464,11 @@ impl Schema { let mut new_fields: Vec = vec![]; for field in fields { match field.data_type() { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, + //DataType::List(f) => , + //DataType::ListView(_) => , + //DataType::FixedSizeList(_, _) => , + //DataType::LargeList(_) => , + //DataType::LargeListView(_) => , DataType::Struct(nested_fields) => { let field_name = field.name().as_str(); let new_key = format!("{key_string}{separator}{field_name}"); From 55eb9533f7fd0da4b6b36e9cc7d059191e90f250 Mon Sep 17 00:00:00 2001 From: nglime Date: Sat, 23 Nov 2024 11:16:35 -0600 Subject: [PATCH 04/10] Built out a bit more of the iterative normalize. --- arrow-array/src/record_batch.rs | 72 +++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 2be5ca10903..92cdb700b8d 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -18,9 +18,11 @@ //! A two-dimensional batch of column-oriented data with a defined //! [schema](arrow_schema::Schema). -use std::collections::{BinaryHeap, VecDeque}; use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef}; +use arrow_schema::{ + ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef, +}; +use std::collections::VecDeque; use std::ops::{Deref, Index}; use std::sync::Arc; @@ -412,51 +414,61 @@ impl RecordBatch { } if self.num_rows() == 0 { // No data, only need to normalize the schema - return Ok(Self::new_empty(Arc::new(self.schema.normalize(separator, max_level)?))); + return Ok(Self::new_empty(Arc::new( + self.schema.normalize(separator, max_level)?, + ))); } let mut queue: VecDeque<(usize, &Arc, &FieldRef)> = VecDeque::new(); // push fields - for (c, f) in self.columns.iter().zip(self.schema().fields()) { + for (c, f) in self.columns.iter().zip(self.schema.fields()) { queue.push_front((0, c, f)); } while !queue.is_empty() { match queue.pop_front() { Some((depth, c, f)) => { - match f.data_type() { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, - DataType::Struct(nested_fields) => { - let field_name = f.name().as_str(); - /*new_fields = [ - new_fields, - Self::normalizer( - nested_fields.to_vec(), - field_name, - separator, - max_level - 1, - ), - ] - .concat();*/ + + if depth < max_level { + match (c.data_type(), f.data_type()) { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + (DataType::Struct(cf), DataType::Struct(ff)) => { + let field_name = f.name().as_str(); + let new_key = format!("{key_string}{separator}{field_name}"); + ff.iter().rev().zip(cf.iter().rev()).map(|(field, ())| { + let updated_field = Field::new( + format!("{key_string}{separator}{}", field.name()), + field.data_type().clone(), + field.is_nullable(), + ); + queue.push_front(( + depth + 1, + c, // TODO: need to modify c -- if it's a StructArray, it needs to have the fields modified. + &Arc::new(updated_field), + )) + }); + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => queue.push_front((depth, c, f)), } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => queue.push_front((0, c, f)), + } else { + queue.push_front((depth, c, f)); } - }, + } None => break, }; } todo!() } - /// Returns the number of columns in the record batch. + /// Returns the number of columns in the record batch. /// /// # Example /// @@ -1269,12 +1281,12 @@ mod tests { let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); let year_field = Arc::new(Field::new("year", DataType::Int64, true)); - let a = Arc::new(StructArray::from(vec![ (animals_field.clone(), Arc::new(animals) as ArrayRef), (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), (year_field.clone(), Arc::new(year) as ArrayRef), ])); + let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); let schema = Schema::new(vec![ From 30d6294d1346dbbdf6ac9bfc7df847a8e05ccaec Mon Sep 17 00:00:00 2001 From: nglime Date: Sat, 23 Nov 2024 21:03:20 -0600 Subject: [PATCH 05/10] Fixed normalize function for `RecordBatch`. Adjusted test case to match the example from PyArrow. --- arrow-array/src/record_batch.rs | 108 ++++++++++++++++---------------- arrow-schema/src/schema.rs | 42 +++++-------- 2 files changed, 68 insertions(+), 82 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 92cdb700b8d..1ba8402b1bd 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -18,12 +18,11 @@ //! A two-dimensional batch of column-oriented data with a defined //! [schema](arrow_schema::Schema). +use crate::cast::AsArray; use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ - ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef, -}; +use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef}; use std::collections::VecDeque; -use std::ops::{Deref, Index}; +use std::ops::Index; use std::sync::Arc; /// Trait for types that can read `RecordBatch`'s. @@ -406,7 +405,8 @@ impl RecordBatch { ) } - /// Normalize a semi-structured RecordBatch into a flat table + /// Normalize a semi-structured [`RecordBatch`] into a flat table. + /// /// If max_level is 0, normalizes all levels. pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { if max_level == 0 { @@ -418,54 +418,47 @@ impl RecordBatch { self.schema.normalize(separator, max_level)?, ))); } - let mut queue: VecDeque<(usize, &Arc, &FieldRef)> = VecDeque::new(); + let mut queue: VecDeque<(usize, (ArrayRef, FieldRef))> = VecDeque::new(); - // push fields for (c, f) in self.columns.iter().zip(self.schema.fields()) { - queue.push_front((0, c, f)); + queue.push_back((0, ((*c).clone(), (*f).clone()))); } - while !queue.is_empty() { - match queue.pop_front() { - Some((depth, c, f)) => { - - if depth < max_level { - match (c.data_type(), f.data_type()) { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, - (DataType::Struct(cf), DataType::Struct(ff)) => { - let field_name = f.name().as_str(); - let new_key = format!("{key_string}{separator}{field_name}"); - ff.iter().rev().zip(cf.iter().rev()).map(|(field, ())| { - let updated_field = Field::new( - format!("{key_string}{separator}{}", field.name()), - field.data_type().clone(), - field.is_nullable(), - ); - queue.push_front(( - depth + 1, - c, // TODO: need to modify c -- if it's a StructArray, it needs to have the fields modified. - &Arc::new(updated_field), - )) - }); - } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => queue.push_front((depth, c, f)), + let mut columns: Vec = Vec::new(); + let mut fields: Vec = Vec::new(); + + while let Some((depth, (c, f))) = queue.pop_front() { + if depth < max_level { + match f.data_type() { + DataType::Struct(ff) => { + // Need to zip these in reverse to maintain original order + for (cff, fff) in c + .as_struct() + .columns() + .iter() + .rev() + .zip(ff.into_iter().rev()) + { + let new_key = format!("{}{separator}{}", f.name(), fff.name()); + let updated_field = Field::new( + new_key.as_str(), + fff.data_type().clone(), + fff.is_nullable(), + ); + queue.push_front((depth + 1, (cff.clone(), Arc::new(updated_field)))) } - } else { - queue.push_front((depth, c, f)); + } + _ => { + columns.push(c); + fields.push(f); } } - None => break, - }; + } else { + columns.push(c); + fields.push(f); + } } - todo!() + RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) } /// Returns the number of columns in the record batch. @@ -1282,9 +1275,9 @@ mod tests { let year_field = Arc::new(Field::new("year", DataType::Int64, true)); let a = Arc::new(StructArray::from(vec![ - (animals_field.clone(), Arc::new(animals) as ArrayRef), - (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), - (year_field.clone(), Arc::new(year) as ArrayRef), + (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef), + (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef), + (year_field.clone(), Arc::new(year.clone()) as ArrayRef), ])); let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); @@ -1297,16 +1290,21 @@ mod tests { ), Field::new("month", DataType::Int64, true), ]); - let normalized = schema.clone().normalize(".", 0).unwrap(); - println!("{:?}", normalized); - let record_batch = - RecordBatch::try_new(Arc::new(schema), vec![a, month]).expect("valid conversion"); + let record_batch = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) + .expect("valid conversion"); + + let normalized = record_batch.normalize(".", 0).expect("valid normalization"); - println!("Fields: {:?}", record_batch.schema().fields()); - println!("Metadata{:?}", record_batch.columns()); + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("a.animals", animals.clone(), true), + ("a.n_legs", n_legs.clone(), true), + ("a.year", year.clone(), true), + ("month", month.clone(), true), + ]) + .expect("valid conversion"); - //println!("{:?}", record_batch); + assert_eq!(expected, normalized); } #[test] diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 62f74c8b435..e8aff42db10 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -419,14 +419,9 @@ impl Schema { if max_level == 0 { max_level = usize::MAX; } - let mut new_fields: Vec = vec![]; + let mut new_fields: Vec = vec![]; for field in self.fields() { match field.data_type() { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, DataType::Struct(nested_fields) => { let field_name = field.name().as_str(); new_fields = [ @@ -440,15 +435,11 @@ impl Schema { ] .concat(); } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => new_fields.push(Field::new( + _ => new_fields.push(Arc::new(Field::new( field.name(), field.data_type().clone(), field.is_nullable(), - )), + ))), }; } Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) @@ -459,16 +450,11 @@ impl Schema { key_string: &str, separator: &str, max_level: usize, - ) -> Vec { + ) -> Vec { + let mut new_fields: Vec = vec![]; if max_level > 0 { - let mut new_fields: Vec = vec![]; for field in fields { match field.data_type() { - //DataType::List(f) => , - //DataType::ListView(_) => , - //DataType::FixedSizeList(_, _) => , - //DataType::LargeList(_) => , - //DataType::LargeListView(_) => , DataType::Struct(nested_fields) => { let field_name = field.name().as_str(); let new_key = format!("{key_string}{separator}{field_name}"); @@ -483,21 +469,23 @@ impl Schema { ] .concat(); } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => new_fields.push(Field::new( + _ => new_fields.push(Arc::new(Field::new( format!("{key_string}{separator}{}", field.name()), field.data_type().clone(), field.is_nullable(), - )), + ))), }; } - new_fields } else { - todo!() + for field in fields { + new_fields.push(Arc::new(Field::new( + format!("{key_string}{separator}{}", field.name()), + field.data_type().clone(), + field.is_nullable(), + ))); + } } + new_fields } /// Look up a column by name and return a immutable reference to the column along with From 0ed979d0244d511282b3945e38704650be84cd63 Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 24 Nov 2024 21:51:18 -0600 Subject: [PATCH 06/10] Added tests for `Schema` normalization. Partial tests for `RecordBatch`. --- arrow-array/src/record_batch.rs | 114 ++++++++++++++++++++++++++++---- arrow-schema/src/schema.rs | 81 +++++++++++++++++++++++ 2 files changed, 183 insertions(+), 12 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 1ba8402b1bd..b56e2138fa6 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -432,14 +432,8 @@ impl RecordBatch { match f.data_type() { DataType::Struct(ff) => { // Need to zip these in reverse to maintain original order - for (cff, fff) in c - .as_struct() - .columns() - .iter() - .rev() - .zip(ff.into_iter().rev()) - { - let new_key = format!("{}{separator}{}", f.name(), fff.name()); + for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { + let new_key = format!("{}{}{}", f.name(), separator, fff.name()); let updated_field = Field::new( new_key.as_str(), fff.data_type().clone(), @@ -1291,10 +1285,10 @@ mod tests { Field::new("month", DataType::Int64, true), ]); - let record_batch = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) - .expect("valid conversion"); - - let normalized = record_batch.normalize(".", 0).expect("valid normalization"); + let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) + .expect("valid conversion") + .normalize(".", 0) + .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ ("a.animals", animals.clone(), true), @@ -1307,6 +1301,102 @@ mod tests { assert_eq!(expected, normalized); } + #[test] + fn normalize_nested() { + // Initialize schema + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + let c = Arc::new(Field::new("c", DataType::Int64, true)); + + let d = Arc::new(Field::new("d", DataType::Utf8, true)); + let e = Arc::new(Field::new("e", DataType::Int64, false)); + let f = Arc::new(Field::new("f", DataType::Int64, true)); + + let one = Arc::new(Field::new( + "1", + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), + false, + )); + let two = Arc::new(Field::new( + "2", + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one, two])), + false, + )); + + // Initialize fields + let a_field: ArrayRef = Arc::new(StringArray::from(vec!["a1_field_data", "a1_field_data"])); + let b_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(0), Some(1)])); + let c_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2)])); + + let d_field: ArrayRef = Arc::new(StringArray::from(vec!["d1_field_data", "d2_field_data"])); + let e_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), Some(4)])); + let f_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(5)])); + + let one_field = Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])); + let two_field = Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])); + + /*let exclamation_field = Arc::new(StructArray::from(vec![ + (one.clone(), Arc::new(one_field.clone()) as ArrayRef), + (two.clone(), Arc::new(two_field.clone()) as ArrayRef), + ]));*/ + + let schema = Schema::new(vec![exclamation.clone()]); + /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) + .expect("valid conversion");*/ + //.normalize(".", 0) + //.expect("valid normalization"); + + /*let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("a.animals", animals.clone(), true), + ("a.n_legs", n_legs.clone(), true), + ("a.year", year.clone(), true), + ("month", month.clone(), true), + ]) + .expect("valid conversion");*/ + + //assert_eq!(expected, normalized); + } + + #[test] + fn normalize_empty() { + let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); + let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); + let year_field = Arc::new(Field::new("year", DataType::Int64, true)); + + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), + false, + ), + Field::new("month", DataType::Int64, true), + ]); + + let normalized = RecordBatch::new_empty(Arc::new(schema.clone())) + .normalize(".", 0) + .expect("valid normalization"); + + let expected = RecordBatch::new_empty(Arc::new( + schema.normalize(".", 0).expect("valid normalization"), + )); + + assert_eq!(expected, normalized); + } + #[test] fn project() { let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index e8aff42db10..1832f97cf17 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -772,6 +772,87 @@ mod tests { schema.index_of("nickname").unwrap(); } + #[test] + fn normalize() { + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Struct(Fields::from(vec![ + Arc::new(Field::new("animals", DataType::Utf8, true)), + Arc::new(Field::new("n_legs", DataType::Int64, true)), + Arc::new(Field::new("year", DataType::Int64, true)), + ])), + false, + ), + Field::new("month", DataType::Int64, true), + ]) + .normalize(".", 0) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("a.animals", DataType::Utf8, true), + Field::new("a.n_legs", DataType::Int64, true), + Field::new("a.year", DataType::Int64, true), + Field::new("month", DataType::Int64, true), + ]); + + assert_eq!(schema, expected); + } + + #[test] + fn normalize_nested() { + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + let c = Arc::new(Field::new("c", DataType::Int64, true)); + + let d = Arc::new(Field::new("d", DataType::Utf8, true)); + let e = Arc::new(Field::new("e", DataType::Int64, false)); + let f = Arc::new(Field::new("f", DataType::Int64, true)); + + let one = Arc::new(Field::new( + "1", + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), + false, + )); + let two = Arc::new(Field::new( + "2", + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one, two])), + false, + )); + + let normalize_all = Schema::new(vec![exclamation.clone()]) + .normalize(".", 0) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("!.1.a", DataType::Utf8, true), + Field::new("!.1.b", DataType::Int64, false), + Field::new("!.1.c", DataType::Int64, true), + Field::new("!.2.d", DataType::Utf8, true), + Field::new("!.2.e", DataType::Int64, false), + Field::new("!.2.f", DataType::Int64, true), + ]); + + assert_eq!(normalize_all, expected); + + let normalize_depth_one = Schema::new(vec![exclamation]) + .normalize(".", 1) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("!.1", DataType::Struct(Fields::from(vec![a, b, c])), false), + Field::new("!.2", DataType::Struct(Fields::from(vec![d, e, f])), true), + ]); + + assert_eq!(normalize_depth_one, expected); + } + #[test] #[should_panic( expected = "Unable to get field named \\\"nickname\\\". Valid fields: [\\\"first_name\\\", \\\"last_name\\\", \\\"address\\\", \\\"interests\\\"]" From d9d08cd63f4aae7d1208fd773fc8209f0493c00c Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 24 Nov 2024 21:54:24 -0600 Subject: [PATCH 07/10] Removed stray comments. --- arrow-array/src/record_batch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index b56e2138fa6..6b5b5af22a8 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1349,10 +1349,10 @@ mod tests { (c.clone(), Arc::new(c_field.clone()) as ArrayRef), ])); - /*let exclamation_field = Arc::new(StructArray::from(vec![ + let exclamation_field = Arc::new(StructArray::from(vec![ (one.clone(), Arc::new(one_field.clone()) as ArrayRef), (two.clone(), Arc::new(two_field.clone()) as ArrayRef), - ]));*/ + ])); let schema = Schema::new(vec![exclamation.clone()]); /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) @@ -1509,9 +1509,7 @@ mod tests { let metadata = vec![("foo".to_string(), "bar".to_string())] .into_iter() .collect(); - println!("Metadata: {:?}", metadata); let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata); - println!("Metadata schema: {:?}", metadata_schema); let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap(); // Cannot remove metadata From d1b3260441271c848c2096c6843716bdbd8da5a9 Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 24 Nov 2024 21:58:53 -0600 Subject: [PATCH 08/10] Commenting out exclamation field. --- arrow-array/src/record_batch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 6b5b5af22a8..beb4c0a8690 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1349,10 +1349,10 @@ mod tests { (c.clone(), Arc::new(c_field.clone()) as ArrayRef), ])); - let exclamation_field = Arc::new(StructArray::from(vec![ + /*let exclamation_field = Arc::new(StructArray::from(vec![ (one.clone(), Arc::new(one_field.clone()) as ArrayRef), (two.clone(), Arc::new(two_field.clone()) as ArrayRef), - ])); + ]));*/ let schema = Schema::new(vec![exclamation.clone()]); /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) From 7adda580776abf257378ebdd09b83d0961b9b7d4 Mon Sep 17 00:00:00 2001 From: nglime Date: Wed, 4 Dec 2024 22:06:29 -0600 Subject: [PATCH 09/10] Fixed test for `RecordBatch`. --- arrow-array/src/record_batch.rs | 92 +++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 33 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index b0557173ea5..88916aaf448 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1295,14 +1295,10 @@ mod tests { #[test] fn normalize_nested() { // Initialize schema - let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let a = Arc::new(Field::new("a", DataType::Int64, true)); let b = Arc::new(Field::new("b", DataType::Int64, false)); let c = Arc::new(Field::new("c", DataType::Int64, true)); - let d = Arc::new(Field::new("d", DataType::Utf8, true)); - let e = Arc::new(Field::new("e", DataType::Int64, false)); - let f = Arc::new(Field::new("f", DataType::Int64, true)); - let one = Arc::new(Field::new( "1", DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), @@ -1310,56 +1306,86 @@ mod tests { )); let two = Arc::new(Field::new( "2", - DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), true, )); let exclamation = Arc::new(Field::new( "!", - DataType::Struct(Fields::from(vec![one, two])), + DataType::Struct(Fields::from(vec![one.clone(), two.clone()])), false, )); - // Initialize fields - let a_field: ArrayRef = Arc::new(StringArray::from(vec!["a1_field_data", "a1_field_data"])); - let b_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(0), Some(1)])); - let c_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2)])); + let schema = Schema::new(vec![exclamation.clone()]); - let d_field: ArrayRef = Arc::new(StringArray::from(vec!["d1_field_data", "d2_field_data"])); - let e_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), Some(4)])); - let f_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(5)])); + // Initialize fields + let a_field = Int64Array::from(vec![Some(0), Some(1)]); + let b_field = Int64Array::from(vec![Some(2), Some(3)]); + let c_field = Int64Array::from(vec![None, Some(4)]); - let one_field = Arc::new(StructArray::from(vec![ + let one_field = StructArray::from(vec![ (a.clone(), Arc::new(a_field.clone()) as ArrayRef), (b.clone(), Arc::new(b_field.clone()) as ArrayRef), (c.clone(), Arc::new(c_field.clone()) as ArrayRef), - ])); - let two_field = Arc::new(StructArray::from(vec![ + ]); + let two_field = StructArray::from(vec![ (a.clone(), Arc::new(a_field.clone()) as ArrayRef), (b.clone(), Arc::new(b_field.clone()) as ArrayRef), (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ]); + + let exclamation_field = Arc::new(StructArray::from(vec![ + (one.clone(), Arc::new(one_field) as ArrayRef), + (two.clone(), Arc::new(two_field) as ArrayRef), ])); - /*let exclamation_field = Arc::new(StructArray::from(vec![ - (one.clone(), Arc::new(one_field.clone()) as ArrayRef), - (two.clone(), Arc::new(two_field.clone()) as ArrayRef), - ]));*/ + // Normalize top level + let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) + .expect("valid conversion") + .normalize(".", 1) + .expect("valid normalization"); - let schema = Schema::new(vec![exclamation.clone()]); - /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) - .expect("valid conversion");*/ - //.normalize(".", 0) - //.expect("valid normalization"); + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ( + "!.1", + Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])) as ArrayRef, + false, + ), + ( + "!.2", + Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])) as ArrayRef, + true, + ), + ]) + .expect("valid conversion"); - /*let expected = RecordBatch::try_from_iter_with_nullable(vec![ - ("a.animals", animals.clone(), true), - ("a.n_legs", n_legs.clone(), true), - ("a.year", year.clone(), true), - ("month", month.clone(), true), + assert_eq!(expected, normalized); + + // Normalize all levels + let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) + .expect("valid conversion") + .normalize(".", 0) + .expect("valid normalization"); + + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("!.1.a", Arc::new(a_field.clone()) as ArrayRef, true), + ("!.1.b", Arc::new(b_field.clone()) as ArrayRef, false), + ("!.1.c", Arc::new(c_field.clone()) as ArrayRef, true), + ("!.2.a", Arc::new(a_field.clone()) as ArrayRef, true), + ("!.2.b", Arc::new(b_field.clone()) as ArrayRef, false), + ("!.2.c", Arc::new(c_field.clone()) as ArrayRef, true), ]) - .expect("valid conversion");*/ + .expect("valid conversion"); - //assert_eq!(expected, normalized); + assert_eq!(expected, normalized); } #[test] From 9c9c69952d53bc236c5f8ee078b6b607a515f595 Mon Sep 17 00:00:00 2001 From: nglime Date: Wed, 4 Dec 2024 22:07:59 -0600 Subject: [PATCH 10/10] Formatting. --- arrow-array/src/record_batch.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 88916aaf448..ec2b974fdf9 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1340,10 +1340,11 @@ mod tests { ])); // Normalize top level - let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) - .expect("valid conversion") - .normalize(".", 1) - .expect("valid normalization"); + let normalized = + RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) + .expect("valid conversion") + .normalize(".", 1) + .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ (