From 49eeb88232ed498aa658066f28e1ac1b7d5d4ef9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 2 Oct 2024 16:23:25 -0400 Subject: [PATCH] Use upstream is_null and is_not_null kernels --- .../src/expressions/is_not_null.rs | 2 +- .../physical-expr/src/expressions/is_null.rs | 77 ++----------------- 2 files changed, 7 insertions(+), 72 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 58559352d44c..cbab7d0c9d1f 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -73,7 +73,7 @@ impl PhysicalExpr for IsNotNullExpr { let arg = self.arg.evaluate(batch)?; match arg { ColumnarValue::Array(array) => { - let is_not_null = super::is_null::compute_is_not_null(array)?; + let is_not_null = arrow::compute::is_not_null(&array)?; Ok(ColumnarValue::Array(Arc::new(is_not_null))) } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 3cdb49bcab42..1c8597d3fdea 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -20,14 +20,10 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use arrow::compute; use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray}; -use arrow_buffer::{BooleanBuffer, ScalarBuffer}; -use arrow_ord::cmp; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; @@ -77,9 +73,9 @@ impl PhysicalExpr for IsNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) => { - Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) - } + ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( + arrow::compute::is_null(&array)?, + ))), ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(scalar.is_null())), )), @@ -103,65 +99,6 @@ impl PhysicalExpr for IsNullExpr { } } -/// workaround , -/// this can be replaced with a direct call to `arrow::compute::is_null` once it's fixed. -pub(crate) fn compute_is_null(array: ArrayRef) -> Result { - if let Some(union_array) = array.as_any().downcast_ref::() { - if let Some(offsets) = union_array.offsets() { - dense_union_is_null(union_array, offsets) - } else { - sparse_union_is_null(union_array) - } - } else { - compute::is_null(array.as_ref()).map_err(Into::into) - } -} - -/// workaround , -/// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed. -pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result { - if array.as_any().is::() { - compute::not(&compute_is_null(array)?).map_err(Into::into) - } else { - compute::is_not_null(array.as_ref()).map_err(Into::into) - } -} - -fn dense_union_is_null( - union_array: &UnionArray, - offsets: &ScalarBuffer, -) -> Result { - let child_arrays = (0..union_array.type_names().len()) - .map(|type_id| { - compute::is_null(&union_array.child(type_id as i8)).map_err(Into::into) - }) - .collect::>>()?; - - let buffer: BooleanBuffer = offsets - .iter() - .zip(union_array.type_ids()) - .map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset as usize)) - .collect(); - - Ok(BooleanArray::new(buffer, None)) -} - -fn sparse_union_is_null(union_array: &UnionArray) -> Result { - let type_ids = Int8Array::new(union_array.type_ids().clone(), None); - - let mut union_is_null = - BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None); - for type_id in 0..union_array.type_names().len() { - let type_id = type_id as i8; - let union_is_child = cmp::eq(&type_ids, &Int8Array::new_scalar(type_id))?; - let child = union_array.child(type_id); - let child_array_is_null = compute::is_null(&child)?; - let child_is_null = compute::and(&union_is_child, &child_array_is_null)?; - union_is_null = compute::or(&union_is_null, &child_is_null)?; - } - Ok(union_is_null) -} - impl PartialEq for IsNullExpr { fn eq(&self, other: &dyn Any) -> bool { down_cast_any_ref(other) @@ -184,7 +121,7 @@ mod tests { array::{BooleanArray, StringArray}, datatypes::*, }; - use arrow_array::{Float64Array, Int32Array}; + use arrow_array::{Array, Float64Array, Int32Array, UnionArray}; use arrow_buffer::ScalarBuffer; use datafusion_common::cast::as_boolean_array; @@ -243,8 +180,7 @@ mod tests { let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); - let array_ref = Arc::new(array) as ArrayRef; - let result = compute_is_null(array_ref).unwrap(); + let result = arrow::compute::is_null(&array).unwrap(); let expected = &BooleanArray::from(vec![false, true, false, false, true, true, false]); @@ -272,8 +208,7 @@ mod tests { UnionArray::try_new(union_fields(), type_ids, Some(offsets), children) .unwrap(); - let array_ref = Arc::new(array) as ArrayRef; - let result = compute_is_null(array_ref).unwrap(); + let result = arrow::compute::is_null(&array).unwrap(); let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); assert_eq!(expected, &result);