Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Dec 11, 2024
1 parent 3f9a143 commit 9d17990
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 37 deletions.
33 changes: 26 additions & 7 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl ArrowReaderMetadata {
pub fn load<T: ChunkReader>(
reader: &T,
options: ArrowReaderOptions,
file_decryption_properties: Option<FileDecryptionProperties>,
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<Self> {
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(options.page_index)
Expand Down Expand Up @@ -543,7 +543,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
pub fn try_new_with_decryption(
reader: T,
options: ArrowReaderOptions,
file_decryption_properties: Option<FileDecryptionProperties>,
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<Self> {
let metadata = ArrowReaderMetadata::load(&reader, options, file_decryption_properties)?;
Ok(Self::new_with_metadata(reader, metadata))
Expand Down Expand Up @@ -809,10 +809,11 @@ impl ParquetRecordBatchReader {
/// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader and [`FileDecryptionProperties`]
///
/// Note: this is needed when the parquet file is encrypted
// todo: add options or put file_decryption_properties into options
pub fn try_new_with_decryption<T: ChunkReader + 'static>(
reader: T,
batch_size: usize,
file_decryption_properties: Option<FileDecryptionProperties>,
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<Self> {
ParquetRecordBatchReaderBuilder::try_new_with_decryption(
reader,
Expand Down Expand Up @@ -1713,7 +1714,7 @@ mod tests {
.build(),
);

let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.clone()).unwrap();
let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()).unwrap();
let file_metadata = metadata.metadata.file_metadata();

assert_eq!(file_metadata.num_rows(), 50);
Expand All @@ -1727,9 +1728,27 @@ mod tests {
});

// todo: decrypting data
// let record_reader =
// ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties)
// .unwrap();
let record_reader =
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref())
.unwrap();
// todo check contents
let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();
let f32_col = batch.column(0).as_primitive::<Float32Type>();
let f64_col = batch.column(1).as_primitive::<Float64Type>();

// This file contains floats from a standard normal distribution
for &x in f32_col.values() {
assert!(x > -10.0);
assert!(x < 10.0);
}
for &x in f64_col.values() {
assert!(x > -10.0);
assert!(x < 10.0);
}
}
}

#[test]
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,7 @@ mod tests {
r.rows_written as usize,
None,
Arc::new(props),
None,
)
.unwrap();

Expand Down Expand Up @@ -2079,6 +2080,7 @@ mod tests {
r.rows_written as usize,
None,
Arc::new(props),
None,
)
.unwrap();

Expand Down Expand Up @@ -2214,6 +2216,7 @@ mod tests {
r.rows_written as usize,
None,
Arc::new(props),
None,
)
.unwrap(),
);
Expand Down Expand Up @@ -3484,6 +3487,7 @@ mod tests {
result.rows_written as usize,
None,
Arc::new(props),
None,
)
.unwrap(),
);
Expand Down
28 changes: 25 additions & 3 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl BlockEncryptor for RingGcmBlockEncryptor {
}
}

#[derive(Debug, Clone)]
pub(crate) struct RingGcmBlockDecryptor {
key: LessSafeKey,
}
Expand Down Expand Up @@ -226,7 +227,7 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
Ok(aad)
}

#[derive(Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct FileDecryptionProperties {
footer_key: Option<Vec<u8>>
}
Expand Down Expand Up @@ -261,18 +262,25 @@ impl DecryptionPropertiesBuilder {
}
}

#[derive(Debug, Clone)]
pub struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
// todo decr: change to BlockDecryptor
footer_decryptor: RingGcmBlockDecryptor
}

impl PartialEq for FileDecryptor {
fn eq(&self, other: &Self) -> bool {
self.decryption_properties == other.decryption_properties
}
}

impl FileDecryptor {
pub(crate) fn new(decryption_properties: FileDecryptionProperties) -> Self {
pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self {
Self {
// todo decr: if no key available yet (not set in properties, will be retrieved from metadata)
footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()),
decryption_properties
decryption_properties: decryption_properties.clone()
}
}

Expand All @@ -281,3 +289,17 @@ impl FileDecryptor {
self.footer_decryptor
}
}

pub struct CryptoContext {
row_group_ordinal: i32,
column_ordinal: i32,
metadata_decryptor: FileDecryptor,
data_decryptor: FileDecryptor,
file_decryption_properties: FileDecryptionProperties,
aad: Vec<u8>,
}

impl CryptoContext {
pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor }
pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties }
}
2 changes: 1 addition & 1 deletion parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader::decode_metadata")]
pub fn decode_metadata(
buf: &[u8],
file_decryption_properties: Option<FileDecryptionProperties>,
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<ParquetMetaData> {
ParquetMetaDataReader::decode_metadata(buf, file_decryption_properties)
}
Expand Down
14 changes: 10 additions & 4 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ use crate::schema::types::{
pub use reader::ParquetMetaDataReader;
pub use writer::ParquetMetaDataWriter;
pub(crate) use writer::ThriftMetadataWriter;
use crate::encryption::ciphers::FileDecryptor;

/// Page level statistics for each column chunk of each row group.
///
Expand Down Expand Up @@ -174,15 +175,18 @@ pub struct ParquetMetaData {
column_index: Option<ParquetColumnIndex>,
/// Offset index for each page in each column chunk
offset_index: Option<ParquetOffsetIndex>,
/// Optional file decryptor
file_decryptor: Option<FileDecryptor>,
}

impl ParquetMetaData {
/// Creates Parquet metadata from file metadata and a list of row
/// group metadata
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>, file_decryptor: Option<FileDecryptor>) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
file_decryptor,
column_index: None,
offset_index: None,
}
Expand Down Expand Up @@ -337,7 +341,7 @@ pub struct ParquetMetaDataBuilder(ParquetMetaData);
impl ParquetMetaDataBuilder {
/// Create a new builder from a file metadata, with no row groups
pub fn new(file_meta_data: FileMetaData) -> Self {
Self(ParquetMetaData::new(file_meta_data, vec![]))
Self(ParquetMetaData::new(file_meta_data, vec![], None))
}

/// Create a new builder from an existing ParquetMetaData
Expand Down Expand Up @@ -540,6 +544,8 @@ pub struct RowGroupMetaData {
ordinal: Option<i16>,
}

// todo:rok

impl RowGroupMetaData {
/// Returns builder for row group metadata.
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
Expand Down Expand Up @@ -1861,7 +1867,7 @@ mod tests {
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta_with_stats)
.build();
let base_expected_size = 2312;
let base_expected_size = 2896;

assert_eq!(parquet_meta.memory_size(), base_expected_size);

Expand All @@ -1888,7 +1894,7 @@ mod tests {
]]))
.build();

let bigger_expected_size = 2816;
let bigger_expected_size = 3400;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
Expand Down
21 changes: 11 additions & 10 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ impl ParquetMetaDataReader {
/// This is only necessary when the file is encrypted.
pub fn with_encryption_properties(
mut self,
properties: Option<FileDecryptionProperties>,
properties: Option<&FileDecryptionProperties>,
) -> Self {
self.file_decryption_properties = properties;
self.file_decryption_properties = properties.cloned();
self
}

Expand Down Expand Up @@ -364,7 +364,7 @@ impl ParquetMetaDataReader {
&mut fetch,
file_size,
self.get_prefetch_size(),
self.file_decryption_properties.clone(),
self.file_decryption_properties.as_ref(),
)
.await?;

Expand Down Expand Up @@ -532,7 +532,7 @@ impl ParquetMetaDataReader {
let start = file_size - footer_metadata_len as u64;
Self::decode_metadata(
chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
self.file_decryption_properties.clone(),
self.file_decryption_properties.as_ref(),
)
}

Expand All @@ -554,7 +554,7 @@ impl ParquetMetaDataReader {
fetch: &mut F,
file_size: usize,
prefetch: usize,
file_decryption_properties: Option<FileDecryptionProperties>,
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
if file_size < FOOTER_SIZE {
return Err(eof_err!("file size of {} is less than footer", file_size));
Expand Down Expand Up @@ -639,10 +639,11 @@ impl ParquetMetaDataReader {
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn decode_metadata(
buf: &[u8],
file_decryption_properties: Option<FileDecryptionProperties>,
file_decryption_properties: Option<&FileDecryptionProperties>,
) -> Result<ParquetMetaData> {
let mut prot = TCompactSliceInputProtocol::new(buf);

let mut file_decryptor = None;
let decrypted_fmd_buf;
if let Some(file_decryption_properties) = file_decryption_properties {
let t_file_crypto_metadata: TFileCryptoMetaData =
Expand All @@ -658,13 +659,13 @@ impl ParquetMetaDataReader {
// todo decr: get key_metadata

// remaining buffer contains encrypted FileMetaData
let file_decryptor = FileDecryptor::new(file_decryption_properties);
let decryptor = file_decryptor.get_footer_decryptor();
file_decryptor = Some(FileDecryptor::new(file_decryption_properties));
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
// todo decr: get aad_prefix
// todo decr: set both aad_prefix and aad_file_unique in file_decryptor
let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
decrypted_fmd_buf =
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref());
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref());
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
}

Expand Down Expand Up @@ -693,7 +694,7 @@ impl ParquetMetaDataReader {
schema_descr,
column_orders,
);
Ok(ParquetMetaData::new(file_metadata, row_groups))
Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor))
}

/// Parses column orders from Thrift definition.
Expand Down
Loading

0 comments on commit 9d17990

Please sign in to comment.