From 7faac72a37cefd5cd1450e045bd995bf6713197e Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 21 Mar 2024 15:13:27 +0200 Subject: [PATCH] first commit --- parquet/Cargo.toml | 2 + parquet/src/arrow/arrow_reader/mod.rs | 65 +++++- parquet/src/encryption/ciphers.rs | 282 ++++++++++++++++++++++++++ parquet/src/encryption/mod.rs | 21 ++ parquet/src/file/footer.rs | 264 +++++++++++++++++++++++- parquet/src/file/mod.rs | 1 + parquet/src/lib.rs | 4 + 7 files changed, 636 insertions(+), 3 deletions(-) create mode 100644 parquet/src/encryption/ciphers.rs create mode 100644 parquet/src/encryption/mod.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 491122298e08..67474ba71932 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -69,6 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } +ring = { version = "0.17", default-features = false, features = ["std"]} [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -120,6 +121,7 @@ zstd = ["dep:zstd", "zstd-sys"] sysinfo = ["dep:sysinfo"] # Verify 32-bit CRC checksum when decoding parquet pages crc = ["dep:crc32fast"] +#encryption = ["aes-gcm", "base64"] [[example]] name = "read_parquet" diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a109851f72b6..0a0a4f81e916 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -42,6 +42,10 @@ mod filter; mod selection; pub mod statistics; +use crate::file::footer; +use crate::file::page_index::index_reader; +use crate::encryption::ciphers::FileDecryptionProperties; + /// Builder for constructing parquet readers into arrow. /// /// Most users should use one of the following specializations: @@ -317,7 +321,7 @@ impl ArrowReaderOptions { /// /// // Create the reader and read the data using the supplied schema. /// let mut reader = builder.build().unwrap(); - /// let _batch = reader.next().unwrap().unwrap(); + /// let _batch = reader.next().unwrap().unwrap(); /// ``` pub fn with_schema(self, schema: SchemaRef) -> Self { Self { @@ -369,6 +373,35 @@ pub struct ArrowReaderMetadata { } impl ArrowReaderMetadata { + /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`] + /// + /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used + pub fn load2(reader: &T, options: ArrowReaderOptions) -> Result { + Self::load_with_decryption(reader, options, FileDecryptionProperties::builder().build()) + } + + pub fn load_with_decryption(reader: &T, options: ArrowReaderOptions, + file_decryption_properties: FileDecryptionProperties) -> Result { + let mut metadata = footer::parse_metadata_with_decryption(reader, file_decryption_properties)?; + if options.page_index { + let column_index = metadata + .row_groups() + .iter() + .map(|rg| index_reader::read_columns_indexes(reader, rg.columns())) + .collect::>>()?; + metadata.set_column_index(Some(column_index)); + + let offset_index = metadata + .row_groups() + .iter() + .map(|rg| index_reader::read_offset_indexes(reader, rg.columns())) + .collect::>>()?; + + metadata.set_offset_index(Some(offset_index)) + } + Self::try_new(Arc::new(metadata), options) + } + /// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary /// /// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an @@ -532,6 +565,11 @@ impl ParquetRecordBatchReaderBuilder { Ok(Self::new_with_metadata(reader, metadata)) } + pub fn try_new_with_decryption(reader: T, options: ArrowReaderOptions, file_decryption_properties: FileDecryptionProperties) -> Result { + let metadata = ArrowReaderMetadata::load_with_decryption(&reader, options, file_decryption_properties)?; + Ok(Self::new_with_metadata(reader, metadata)) + } + /// Create a [`ParquetRecordBatchReaderBuilder`] from the provided [`ArrowReaderMetadata`] /// /// This interface allows: @@ -788,6 +826,13 @@ impl ParquetRecordBatchReader { .build() } + pub fn try_new_with_decryption(reader: T, batch_size: usize, + file_decryption_properties: FileDecryptionProperties) -> Result { + ParquetRecordBatchReaderBuilder::try_new_with_decryption(reader, Default::default(), file_decryption_properties)? + .with_batch_size(batch_size) + .build() + } + /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`] /// /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a @@ -955,6 +1000,7 @@ mod tests { BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; + use crate::encryption::ciphers; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::file::writer::SerializedFileWriter; @@ -1663,6 +1709,23 @@ mod tests { assert!(col.value(2).is_nan()); } + #[test] + fn test_uniform_encryption() { + let path = format!( + "{}/uniform_encryption.parquet.encrypted", + arrow::util::test_util::parquet_test_data(), + ); + let file = File::open(path).unwrap(); + // todo + let key_code: &[u8] = "0123456789012345".as_bytes(); + // todo + let decryption_properties = ciphers::FileDecryptionProperties::builder() + .with_footer_key(key_code.to_vec()) + .build(); + let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties).unwrap(); + // todo check contents + } + #[test] fn test_read_float32_float64_byte_stream_split() { let path = format!( diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs new file mode 100644 index 000000000000..db32146c6d5f --- /dev/null +++ b/parquet/src/encryption/ciphers.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM}; +use ring::rand::{SecureRandom, SystemRandom}; +use crate::errors::{ParquetError, Result}; + +pub trait BlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec; +} + +pub trait BlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec; +} + +const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff; +const NONCE_LEN: usize = 12; +const TAG_LEN: usize = 16; +const SIZE_LEN: usize = 4; + +struct CounterNonce { + start: u128, + counter: u128, +} + +impl CounterNonce { + pub fn new(rng: &SystemRandom) -> Self { + let mut buf = [0; 16]; + rng.fill(&mut buf).unwrap(); + + // Since this is a random seed value, endianess doesn't matter at all, + // and we can use whatever is platform-native. + let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE; + let counter = start.wrapping_add(1); + + Self { start, counter } + } + + /// One accessor for the nonce bytes to avoid potentially flipping endianess + #[inline] + pub fn get_bytes(&self) -> [u8; NONCE_LEN] { + self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap() + } +} + +impl NonceSequence for CounterNonce { + fn advance(&mut self) -> Result { + // If we've wrapped around, we've exhausted this nonce sequence + if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) { + Err(ring::error::Unspecified) + } else { + // Otherwise, just advance and return the new value + let buf: [u8; NONCE_LEN] = self.get_bytes(); + self.counter = self.counter.wrapping_add(1); + Ok(ring::aead::Nonce::assume_unique_for_key(buf)) + } + } +} + +pub(crate) struct RingGcmBlockEncryptor { + key: LessSafeKey, + nonce_sequence: CounterNonce, +} + +impl RingGcmBlockEncryptor { + // todo TBD: some KMS systems produce data keys, need to be able to pass them to Encryptor. + // todo TBD: for other KMSs, we will create data keys inside arrow-rs, making sure to use SystemRandom + /// Create a new `RingGcmBlockEncryptor` with a given key and random nonce. + /// The nonce will advance appropriately with each block encryption and + /// return an error if it wraps around. + pub(crate) fn new(key_bytes: &[u8]) -> Self { + let rng = SystemRandom::new(); + + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap(); + let nonce = CounterNonce::new(&rng); + + Self { + key: LessSafeKey::new(key), + nonce_sequence: nonce, + } + } +} + +impl BlockEncryptor for RingGcmBlockEncryptor { + fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec { + let nonce = self.nonce_sequence.advance().unwrap(); + let ciphertext_len = plaintext.len() + NONCE_LEN + TAG_LEN; + // todo TBD: add first 4 bytes with the length, per https://github.com/apache/parquet-format/blob/master/Encryption.md#51-encrypted-module-serialization + let mut result = Vec::with_capacity(SIZE_LEN + ciphertext_len); + result.extend_from_slice((ciphertext_len as i32).to_le_bytes().as_ref()); + result.extend_from_slice(nonce.as_ref()); + result.extend_from_slice(plaintext); + + let tag = self + .key + .seal_in_place_separate_tag(nonce, Aad::from(aad), &mut result[SIZE_LEN + NONCE_LEN..]) + .unwrap(); + result.extend_from_slice(tag.as_ref()); + + result + } +} + +pub(crate) struct RingGcmBlockDecryptor { + key: LessSafeKey, +} + +impl RingGcmBlockDecryptor { + pub(crate) fn new(key_bytes: &[u8]) -> Self { + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes).unwrap(); + + Self { + key: LessSafeKey::new(key), + } + } +} + +impl BlockDecryptor for RingGcmBlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Vec { + let mut result = Vec::with_capacity( + length_and_ciphertext.len() - SIZE_LEN - NONCE_LEN - TAG_LEN, + ); + result.extend_from_slice(&length_and_ciphertext[SIZE_LEN + NONCE_LEN..]); + + let nonce = ring::aead::Nonce::try_assume_unique_for_key( + &length_and_ciphertext[SIZE_LEN..SIZE_LEN + NONCE_LEN], + ) + .unwrap(); + + self.key + .open_in_place(nonce, Aad::from(aad), &mut result) + .unwrap(); + + result + } +} + +pub(crate) enum ModuleType { + Footer = 0, + ColumnMetaData = 1, + DataPage = 2, + DictionaryPage = 3, + DataPageHeader = 4, + DictionaryPageHeader = 5, + ColumnIndex = 6, + OffsetIndex = 7, + BloomFilterHeader = 8, + BloomFilterBitset = 9, +} + +pub fn create_footer_aad(file_aad: &[u8]) -> Result> { + create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1) +} + +pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32, + column_ordinal: i32, page_ordinal: i32) -> Result> { + + let module_buf = [module_type as u8]; + + if module_buf[0] == (ModuleType::Footer as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 1); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + return Ok(aad) + } + + if row_group_ordinal < 0 { + return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal)); + } + if row_group_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}", + u16::MAX, row_group_ordinal)); + } + + if column_ordinal < 0 { + return Err(general_err!("Wrong column ordinal: {}", column_ordinal)); + } + if column_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}", + u16::MAX, column_ordinal)); + } + + if module_buf[0] != (ModuleType::DataPageHeader as u8) && + module_buf[0] != (ModuleType::DataPage as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 5); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + return Ok(aad) + } + + if page_ordinal < 0 { + return Err(general_err!("Wrong column ordinal: {}", page_ordinal)); + } + if page_ordinal > u16::MAX as i32 { + return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}", + u16::MAX, page_ordinal)); + } + + let mut aad = Vec::with_capacity(file_aad.len() + 7); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref()); + aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref()); + Ok(aad) +} + +pub struct FileDecryptionProperties { + footer_key: Option> +} + +impl FileDecryptionProperties { + pub fn builder() -> DecryptionPropertiesBuilder { + DecryptionPropertiesBuilder::with_defaults() + } +} + +pub struct DecryptionPropertiesBuilder { + footer_key: Option> +} + +impl DecryptionPropertiesBuilder { + pub fn with_defaults() -> Self { + Self { + footer_key: None + } + } + + pub fn build(self) -> FileDecryptionProperties { + FileDecryptionProperties { + footer_key: self.footer_key + } + } + + // todo decr: doc comment + pub fn with_footer_key(mut self, value: Vec) -> Self { + self.footer_key = Some(value); + self + } +} + +pub struct FileDecryptor { + decryption_properties: FileDecryptionProperties, + // todo decr: change to BlockDecryptor + footer_decryptor: RingGcmBlockDecryptor +} + +impl FileDecryptor { + pub(crate) fn new(decryption_properties: FileDecryptionProperties) -> Self { + Self { + // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) + footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()), + decryption_properties + } + } + + // todo decr: change to BlockDecryptor + pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor { + self.footer_decryptor + } +} diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs new file mode 100644 index 000000000000..e0e7f5d81919 --- /dev/null +++ b/parquet/src/encryption/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +pub mod ciphers; diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index bd31c9142f56..c7829420b514 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -17,8 +17,20 @@ //! Module for working with Parquet file footers. -use crate::errors::Result; -use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE}; +use std::{io::Read, sync::Arc}; + +use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData, + FileCryptoMetaData as TFileCryptoMetaData, EncryptionAlgorithm}; +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; + +use crate::basic::ColumnOrder; +use crate::encryption::ciphers; +use crate::encryption::ciphers::{BlockDecryptor, FileDecryptionProperties, FileDecryptor}; +use crate::errors::{ParquetError, Result}; +use crate::file::{metadata::*, reader::ChunkReader, + FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; + +use crate::schema::types::{self, SchemaDescriptor}; /// Reads the [ParquetMetaData] from the footer of the parquet file. /// @@ -49,6 +61,60 @@ pub fn parse_metadata(chunk_reader: &R) -> Result(chunk_reader: &R) -> Result { + parse_metadata_with_decryption(chunk_reader, FileDecryptionProperties::builder().build()) +} + +pub fn parse_metadata_with_decryption(chunk_reader: &R, decr_props: FileDecryptionProperties) -> Result { + // check file is large enough to hold footer + let file_size = chunk_reader.len(); + if file_size < (FOOTER_SIZE as u64) { + return Err(general_err!( + "Invalid Parquet file. Size is smaller than footer" + )); + } + + let mut footer = [0_u8; 8]; + chunk_reader + .get_read(file_size - 8)? + .read_exact(&mut footer)?; + + let encrypted_footer; + // check this is indeed a parquet file + if footer[4..] == PARQUET_MAGIC { + encrypted_footer = false; + } else if footer[4..] == PARQUET_MAGIC_ENCR_FOOTER { + encrypted_footer = true; + //panic!() // todo rm + } else { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + // get the metadata length from the footer + let metadata_len = u32::from_le_bytes(footer[..4].try_into().unwrap()) as usize; + + //let metadata_len = decode_footer(&footer)?; todo rm this function + let footer_metadata_len = FOOTER_SIZE + metadata_len; + + if footer_metadata_len > file_size as usize { + return Err(general_err!( + "Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes", + metadata_len, + FOOTER_SIZE, + file_size + )); + } + + let start = file_size - footer_metadata_len as u64; + + if encrypted_footer { + let file_decryptor = FileDecryptor::new(decr_props); + decode_encrypted_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref(), file_decryptor) + } else { + decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref()) + } +} + /// Decodes [`ParquetMetaData`] from the provided bytes. /// /// Typically this is used to decode the metadata from the end of a parquet @@ -61,6 +127,41 @@ pub fn decode_metadata(buf: &[u8]) -> Result { ParquetMetaDataReader::decode_metadata(buf) } +pub fn decode_metadata2(buf: &[u8]) -> Result { + decode_metadata_with_decryption(buf) +} + +/// Decodes [`ParquetMetaData`] from the provided bytes +// todo add file decryptor +pub fn decode_metadata_with_decryption(buf: &[u8]) -> Result { + // TODO: row group filtering + let mut prot = TCompactSliceInputProtocol::new(buf); + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr); + + if t_file_metadata.encryption_algorithm.is_some() { + // todo get key_metadata etc. Set file decryptor in return value + // todo check signature + } + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + /// Decodes the Parquet footer returning the metadata length in bytes /// /// A parquet footer is 8 bytes long and has the following layout: @@ -76,3 +177,162 @@ pub fn decode_metadata(buf: &[u8]) -> Result { pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { ParquetMetaDataReader::decode_footer(slice) } + +fn decode_encrypted_metadata(buf: &[u8], file_decryptor: FileDecryptor) -> Result { + // parse FileCryptoMetaData + let mut prot = TCompactSliceInputProtocol::new(buf.as_ref()); + let t_file_crypto_metadata: TFileCryptoMetaData = TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| ParquetError::General(format!("Could not parse crypto metadata: {e}")))?; + let algo = t_file_crypto_metadata.encryption_algorithm; + let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { a } + else { unreachable!() }; // todo decr: add support for GCMCTRV1 + + // todo decr: get key_metadata + + // remaining buffer contains encrypted FileMetaData + let decryptor = file_decryptor.get_footer_decryptor(); + // todo decr: get aad_prefix + // todo decr: set both aad_prefix and aad_file_unique in file_decryptor + let fmd_aad = ciphers::create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); + let decrypted_fmd_buf = decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref()); + + // todo add file decryptor + decode_metadata_with_decryption(decrypted_fmd_buf.as_slice()) +} + +// todo decr: add encryption support +/// Decodes the footer returning the metadata length in bytes +pub fn decode_footer2(slice: &[u8; FOOTER_SIZE]) -> Result { + // check this is indeed a parquet file + if slice[4..] != PARQUET_MAGIC { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + // get the metadata length from the footer + let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); + // u32 won't be larger than usize in most cases + Ok(metadata_len as usize) +} + +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> Option> { + match t_column_orders { + Some(orders) => { + // Should always be the case + assert_eq!( + orders.len(), + schema_descr.num_columns(), + "Column order length mismatch" + ); + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + TColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Some(res) + } + None => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + + use crate::basic::SortOrder; + use crate::basic::Type; + use crate::format::TypeDefinedOrder; + use crate::schema::types::Type as SchemaType; + + #[test] + fn test_parse_metadata_size_smaller_than_footer() { + let test_file = tempfile::tempfile().unwrap(); + let reader_result = parse_metadata2(&test_file); + assert_eq!( + reader_result.unwrap_err().to_string(), + "Parquet error: Invalid Parquet file. Size is smaller than footer" + ); + } + + #[test] + fn test_parse_metadata_corrupt_footer() { + let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]); + let reader_result = parse_metadata2(&data); + assert_eq!( + reader_result.unwrap_err().to_string(), + "Parquet error: Invalid Parquet file. Corrupt footer" + ); + } + + #[test] + fn test_parse_metadata_invalid_start() { + let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']); + let reader_result = parse_metadata2(&test_file); + assert_eq!( + reader_result.unwrap_err().to_string(), + "Parquet error: Invalid Parquet file. Reported metadata length of 255 + 8 byte footer, but file is only 8 bytes" + ); + } + + #[test] + fn test_metadata_column_orders_parse() { + // Define simple schema, we do not need to provide logical types. + let fields = vec![ + Arc::new( + SchemaType::primitive_type_builder("col1", Type::INT32) + .build() + .unwrap(), + ), + Arc::new( + SchemaType::primitive_type_builder("col2", Type::FLOAT) + .build() + .unwrap(), + ), + ]; + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![ + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + ]); + + assert_eq!( + parse_column_orders(t_column_orders, &schema_descr), + Some(vec![ + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) + ]) + ); + + // Test when no column orders are defined. + assert_eq!(parse_column_orders(None, &schema_descr), None); + } + + #[test] + #[should_panic(expected = "Column order length mismatch")] + fn test_metadata_column_orders_len_mismatch() { + let schema = SchemaType::group_type_builder("schema").build().unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); + + parse_column_orders(t_column_orders, &schema_descr); + } +} diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 12ff35b51646..b36ef752ae6f 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -110,3 +110,4 @@ pub mod writer; /// The length of the parquet footer in bytes pub const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +const PARQUET_MAGIC_ENCR_FOOTER: [u8; 4] = [b'P', b'A', b'R', b'E']; diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index 3b63845e709c..e32ae1aea147 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -137,6 +137,10 @@ pub mod column; experimental!(mod compression); experimental!(mod encodings); pub mod bloom_filter; + +//#[cfg(feature = "encryption")] +experimental!(mod encryption); + pub mod file; pub mod record; pub mod schema;