Skip to content

Commit

Permalink
Write null counts in parquet files when they are present
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 15, 2024
1 parent 69b17ad commit 6070619
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 47 deletions.
143 changes: 122 additions & 21 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,35 @@ pub fn from_thrift(
) -> Result<Option<Statistics>> {
Ok(match thrift_stats {
Some(stats) => {
// Number of nulls recorded, when it is not available, we just mark it as 0.
// TODO this should be `None` if there is no information about NULLS.
// see https://github.com/apache/arrow-rs/pull/6216/files
let null_count = stats.null_count.unwrap_or(0);

if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {}",
null_count
)));
}
// transform null count to u64
let null_count = stats
.null_count
.map(|null_count| {
if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {}",
null_count
)));
}
Ok(null_count as u64)
})
.transpose()?;

// Generic null count.
let null_count = Some(null_count as u64);
// Generic distinct count (count of distinct values occurring)
let distinct_count = stats.distinct_count.map(|value| value as u64);
let distinct_count = stats
.distinct_count
.map(|distinct_count| {
if distinct_count < 0 {
return Err(ParquetError::General(format!(
"Statistics distinct count is negative {}",
distinct_count
)));
}

Ok(distinct_count as u64)
})
.transpose()?;

// Whether or not statistics use deprecated min/max fields.
let old_format = stats.min_value.is_none() && stats.max_value.is_none();
// Generic min value as bytes.
Expand Down Expand Up @@ -244,20 +257,21 @@ pub fn from_thrift(
pub fn to_thrift(stats: Option<&Statistics>) -> Option<TStatistics> {
let stats = stats?;

// record null counts if greater than zero.
//
// TODO: This should be Some(0) if there are no nulls.
// see https://github.com/apache/arrow-rs/pull/6216/files
// record null count if it cam fit in i64
let null_count = stats
.null_count_opt()
.map(|value| value as i64)
.filter(|&x| x > 0);
.and_then(|value| i64::try_from(value).ok());

// record distinct count if it can fit in i64
let distinct_count = stats
.distinct_count()
.and_then(|value| i64::try_from(value).ok());

let mut thrift_stats = TStatistics {
max: None,
min: None,
null_count,
distinct_count: stats.distinct_count().map(|value| value as i64),
distinct_count,
max_value: None,
min_value: None,
is_max_value_exact: None,
Expand Down Expand Up @@ -1041,4 +1055,91 @@ mod tests {
true,
));
}

#[test]
fn test_count_encoding() {
statistics_count_test(None, None);
statistics_count_test(Some(0), Some(0));
statistics_count_test(Some(100), Some(2000));
statistics_count_test(Some(1), None);
statistics_count_test(None, Some(1));
}

#[test]
fn test_count_encoding_distinct_too_large() {
// statistics are stored using i64, so test trying to store larger values
let statistics = make_bool_stats(Some(u64::MAX), Some(100));
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.distinct_count, None); // can't store u64 max --> null
assert_eq!(thrift_stats.null_count, Some(100));
}

#[test]
fn test_count_encoding_null_too_large() {
// statistics are stored using i64, so test trying to store larger values
let statistics = make_bool_stats(Some(100), Some(u64::MAX));
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.distinct_count, Some(100));
assert_eq!(thrift_stats.null_count, None); // can' store u64 max --> null
}

#[test]
fn test_count_decoding_distinct_invalid() {
let tstatistics = TStatistics {
distinct_count: Some(-42),
..Default::default()
};
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Statistics distinct count is negative -42"
);
}

#[test]
fn test_count_decoding_null_invalid() {
let tstatistics = TStatistics {
null_count: Some(-42),
..Default::default()
};
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Statistics null count is negative -42"
);
}

/// Writes statistics to thrift and reads them back and ensures:
/// - The statistics are the same
/// - The statistics written to thrift are the same as the original statistics
fn statistics_count_test(distinct_count: Option<u64>, null_count: Option<u64>) {
let statistics = make_bool_stats(distinct_count, null_count);

let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.null_count.map(|c| c as u64), null_count);
assert_eq!(
thrift_stats.distinct_count.map(|c| c as u64),
distinct_count
);

let round_tripped = from_thrift(Type::BOOLEAN, Some(thrift_stats))
.unwrap()
.unwrap();
assert_eq!(round_tripped, statistics);
}

fn make_bool_stats(distinct_count: Option<u64>, null_count: Option<u64>) -> Statistics {
let min = Some(true);
let max = Some(false);
let is_min_max_deprecated = false;

// test is about the counts, so we aren't really testing the min/max values
Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
))
}
}
52 changes: 26 additions & 26 deletions parquet/tests/arrow_writer_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn test_primitive() {
pages: (0..8)
.map(|_| Page {
rows: 250,
page_header_size: 36,
page_header_size: 38,
compressed_size: 1000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
Expand Down Expand Up @@ -218,22 +218,22 @@ fn test_primitive() {
pages: vec![
Page {
rows: 250,
page_header_size: 36,
page_header_size: 38,
compressed_size: 258,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 1750,
page_header_size: 36,
page_header_size: 38,
compressed_size: 7000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 250,
page_header_size: 36,
page_header_size: 38,
compressed_size: 1000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
Expand All @@ -260,50 +260,50 @@ fn test_primitive() {
pages: vec![
Page {
rows: 400,
page_header_size: 36,
page_header_size: 38,
compressed_size: 452,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 370,
page_header_size: 36,
page_header_size: 38,
compressed_size: 472,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
page_header_size: 38,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
page_header_size: 38,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
page_header_size: 38,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 240,
page_header_size: 36,
page_header_size: 38,
compressed_size: 332,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 2000,
page_header_size: 36,
page_header_size: 38,
compressed_size: 8000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
Expand All @@ -329,7 +329,7 @@ fn test_primitive() {
pages: (0..20)
.map(|_| Page {
rows: 100,
page_header_size: 36,
page_header_size: 38,
compressed_size: 400,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
Expand Down Expand Up @@ -364,14 +364,14 @@ fn test_string() {
pages: (0..15)
.map(|_| Page {
rows: 130,
page_header_size: 36,
page_header_size: 38,
compressed_size: 1040,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.chain(std::iter::once(Page {
rows: 50,
page_header_size: 35,
page_header_size: 37,
compressed_size: 400,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
Expand Down Expand Up @@ -400,29 +400,29 @@ fn test_string() {
pages: vec![
Page {
rows: 130,
page_header_size: 36,
page_header_size: 38,
compressed_size: 138,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 1250,
page_header_size: 38,
page_header_size: 40,
compressed_size: 10000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 620,
page_header_size: 36,
page_header_size: 38,
compressed_size: 4960,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 130,
page_header_size: 36,
page_header_size: 38,
compressed_size: 1040,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
Expand All @@ -449,50 +449,50 @@ fn test_string() {
pages: vec![
Page {
rows: 400,
page_header_size: 36,
page_header_size: 38,
compressed_size: 452,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 370,
page_header_size: 36,
page_header_size: 38,
compressed_size: 472,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
page_header_size: 38,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
page_header_size: 38,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
page_header_size: 38,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 240,
page_header_size: 36,
page_header_size: 38,
compressed_size: 332,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 2000,
page_header_size: 36,
page_header_size: 38,
compressed_size: 16000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
Expand Down Expand Up @@ -532,7 +532,7 @@ fn test_list() {
pages: (0..10)
.map(|_| Page {
rows: 20,
page_header_size: 36,
page_header_size: 38,
compressed_size: 672,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
Expand Down

0 comments on commit 6070619

Please sign in to comment.