-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement GroupsAccumulator for corr(x,y) aggregate function #13581
base: main
Are you sure you want to change the base?
Conversation
This looks amazing -- thank you @2010YOUY01 I plan to review it over the next day or two It seems like maybe we should add the data generator for h2o benchmark to the bench.sh script 🤔 |
let nulls = arr | ||
.nulls() | ||
.expect("If null_count() > 0, nulls must be present"); | ||
match combined_nulls { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If passing combined_nulls
to NullBuffer::union
it will take care of handling Option
T: ArrowPrimitiveType + Send, | ||
F: FnMut(usize, &[T::Native]) + Send, | ||
{ | ||
let acc_cols: Vec<&[T::Native]> = value_columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think collecting into Vec
might not be necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true, I have updated
0d6e2c7
to
380ef0a
Compare
for (idx, &group_idx) in group_indices.iter().enumerate() { | ||
// Get `idx`-th row from all value(accumulate) columns | ||
let row_values: Vec<_> = | ||
value_columns.iter().map(|col| col.value(idx)).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to avoid collecting
here? Can we take an iterator instead in value_fn
?
let sum_xx_values = sum_xx.values().as_ref(); | ||
let sum_yy_values = sum_yy.values().as_ref(); | ||
|
||
let mut row = [0.0; 5]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't have to be a mutable slice if we initialize it in the loop based on the values directly (small win for readability, but might generate better code now or in future?).
Which issue does this PR close?
Closes #13549
Rationale for this change
Implement
GroupsAccumulator
forcorr
aggregation function, for better performance when group cardinality is highI rerun the H2o benchmark:
Data Generation
falsa groupby --path-prefix=/Users/yongting/data/ --size MEDIUM --data-format PARQUET
https://github.com/mrpowers-io/falsa
Run benchmark in datafusion-cli
Result
Main: 12s
This PR: 4s
(On my MacBook with m4 pro)
Remaining tasks
ImplementThis requires changes in aggregate fuzzer for test coverage, which can be done later to keep this PR smallconvert_to_states()
What changes are included in this PR?
accumulate_multiple
andaccumulate_correlation_states
to accumulate states in correlation function. (existing util functions is for aggregate functions with 1 input expravg(expr1) v.s. corr(expr1, expr2)
)GroupsAccumulator
forcorr()
Are these changes tested?
Unit tests for util functions
corr()
is covered by existing testsAre there any user-facing changes?
No