From 3ec4d63a814579b0656a987e447b818d77e3a8b5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 16 Oct 2024 13:01:04 -0700 Subject: [PATCH 1/5] Revert "chore: Revert "chore: Reserve memory for native shuffle writer per partition (#988)" (#1020)" This reverts commit 8d097d584e9e70399b635e5063ad7dea3fc87cac. --- .../execution/datafusion/shuffle_writer.rs | 251 +++++++++++++----- 1 file changed, 190 insertions(+), 61 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 9668359fc..6c3174667 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -66,6 +66,14 @@ use crate::{ }; use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; +/// The status of appending rows to a partition buffer. +enum AppendRowStatus { + /// The difference in memory usage after appending rows + MemDiff(Result), + /// The index of the next row to append + StartIndex(usize), +} + /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] @@ -206,10 +214,21 @@ struct PartitionBuffer { /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, /// the active array builders will be frozen and appended to frozen buffer `frozen`. batch_size: usize, + /// Memory reservation for this partition buffer. + reservation: MemoryReservation, } impl PartitionBuffer { - fn new(schema: SchemaRef, batch_size: usize) -> Self { + fn new( + schema: SchemaRef, + batch_size: usize, + partition_id: usize, + runtime: &Arc, + ) -> Self { + let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id)) + .with_can_spill(true) + .register(&runtime.memory_pool); + Self { schema, frozen: vec![], @@ -217,47 +236,52 @@ impl PartitionBuffer { active_slots_mem_size: 0, num_active_rows: 0, batch_size, + reservation, } } /// Initializes active builders if necessary. + /// Returns error if memory reservation fails. fn init_active_if_necessary(&mut self) -> Result { let mut mem_diff = 0; if self.active.is_empty() { - self.active = new_array_builders(&self.schema, self.batch_size); + // Estimate the memory size of active builders if self.active_slots_mem_size == 0 { self.active_slots_mem_size = self - .active + .schema + .fields() .iter() - .zip(self.schema.fields()) - .map(|(_ab, field)| slot_size(self.batch_size, field.data_type())) + .map(|field| slot_size(self.batch_size, field.data_type())) .sum::(); } + + self.reservation.try_grow(self.active_slots_mem_size)?; + + self.active = new_array_builders(&self.schema, self.batch_size); + mem_diff += self.active_slots_mem_size as isize; } Ok(mem_diff) } - /// Appends all rows of given batch into active array builders. - fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result { - let columns = batch.columns(); - let indices = (0..batch.num_rows()).collect::>(); - self.append_rows(columns, &indices, time_metric) - } - /// Appends rows of specified indices from columns into active array builders. fn append_rows( &mut self, columns: &[ArrayRef], indices: &[usize], + start_index: usize, time_metric: &Time, - ) -> Result { + ) -> AppendRowStatus { let mut mem_diff = 0; - let mut start = 0; + let mut start = start_index; // lazy init because some partition may be empty - mem_diff += self.init_active_if_necessary()?; + let init = self.init_active_if_necessary(); + if init.is_err() { + return AppendRowStatus::StartIndex(start); + } + mem_diff += init.unwrap(); while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); @@ -270,14 +294,22 @@ impl PartitionBuffer { self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { let mut timer = time_metric.timer(); - mem_diff += self.flush()?; + let flush = self.flush(); + if let Err(e) = flush { + return AppendRowStatus::MemDiff(Err(e)); + } + mem_diff += flush.unwrap(); timer.stop(); - mem_diff += self.init_active_if_necessary()?; + let init = self.init_active_if_necessary(); + if init.is_err() { + return AppendRowStatus::StartIndex(end); + } + mem_diff += init.unwrap(); } start = end; } - Ok(mem_diff) + AppendRowStatus::MemDiff(Ok(mem_diff)) } /// flush active data into frozen bytes @@ -291,7 +323,7 @@ impl PartitionBuffer { let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; - mem_diff -= self.active_slots_mem_size as isize; + self.reservation.try_shrink(self.active_slots_mem_size)?; let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; @@ -575,7 +607,7 @@ struct ShuffleRepartitioner { output_data_file: String, output_index_file: String, schema: SchemaRef, - buffered_partitions: Mutex>, + buffered_partitions: Vec, spills: Mutex>, /// Sort expressions /// Partitioning scheme to use @@ -648,11 +680,11 @@ impl ShuffleRepartitioner { output_data_file, output_index_file, schema: Arc::clone(&schema), - buffered_partitions: Mutex::new( - (0..num_output_partitions) - .map(|_| PartitionBuffer::new(Arc::clone(&schema), batch_size)) - .collect::>(), - ), + buffered_partitions: (0..num_output_partitions) + .map(|partition_id| { + PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime) + }) + .collect::>(), spills: Mutex::new(vec![]), partitioning, num_output_partitions, @@ -699,8 +731,6 @@ impl ShuffleRepartitioner { // Update data size metric self.metrics.data_size.add(input.get_array_memory_size()); - let time_metric = self.metrics.baseline.elapsed_compute(); - // NOTE: in shuffle writer exec, the output_rows metrics represents the // number of rows those are written to output data file. self.metrics.baseline.record_output(input.num_rows()); @@ -765,34 +795,36 @@ impl ShuffleRepartitioner { .enumerate() .filter(|(_, (start, end))| start < end) { - let mut buffered_partitions = self.buffered_partitions.lock().await; - let output = &mut buffered_partitions[partition_id]; - - // If the range of indices is not big enough, just appending the rows into - // active array builders instead of directly adding them as a record batch. - mem_diff += output.append_rows( - input.columns(), - &shuffled_partition_ids[start..end], - time_metric, - )?; - } + mem_diff += self + .append_rows_to_partition( + input.columns(), + &shuffled_partition_ids[start..end], + partition_id, + ) + .await?; + + if mem_diff > 0 { + let mem_increase = mem_diff as usize; + if self.reservation.try_grow(mem_increase).is_err() { + self.spill().await?; + self.reservation.free(); + self.reservation.try_grow(mem_increase)?; + + mem_diff = 0; + } + } - if mem_diff > 0 { - let mem_increase = mem_diff as usize; - if self.reservation.try_grow(mem_increase).is_err() { - self.spill().await?; - self.reservation.free(); - self.reservation.try_grow(mem_increase)?; + if mem_diff < 0 { + let mem_used = self.reservation.size(); + let mem_decrease = mem_used.min(-mem_diff as usize); + self.reservation.shrink(mem_decrease); + + mem_diff += mem_decrease as isize; } } - if mem_diff < 0 { - let mem_used = self.reservation.size(); - let mem_decrease = mem_used.min(-mem_diff as usize); - self.reservation.shrink(mem_decrease); - } } Partitioning::UnknownPartitioning(n) if *n == 1 => { - let mut buffered_partitions = self.buffered_partitions.lock().await; + let buffered_partitions = &mut self.buffered_partitions; assert!( buffered_partitions.len() == 1, @@ -800,8 +832,10 @@ impl ShuffleRepartitioner { buffered_partitions.len() ); - let output = &mut buffered_partitions[0]; - output.append_batch(&input, time_metric)?; + let indices = (0..input.num_rows()).collect::>(); + + self.append_rows_to_partition(input.columns(), &indices, 0) + .await?; } other => { // this should be unreachable as long as the validation logic @@ -818,7 +852,7 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { let num_output_partitions = self.num_output_partitions; - let mut buffered_partitions = self.buffered_partitions.lock().await; + let buffered_partitions = &mut self.buffered_partitions; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { @@ -916,16 +950,15 @@ impl ShuffleRepartitioner { self.metrics.data_size.value() } - async fn spill(&self) -> Result { + async fn spill(&mut self) -> Result { log::debug!( "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", self.used(), self.spill_count() ); - let mut buffered_partitions = self.buffered_partitions.lock().await; // we could always get a chance to free some memory as long as we are holding some - if buffered_partitions.len() == 0 { + if self.buffered_partitions.is_empty() { return Ok(0); } @@ -936,7 +969,7 @@ impl ShuffleRepartitioner { .disk_manager .create_tmp_file("shuffle writer spill")?; let offsets = spill_into( - &mut buffered_partitions, + &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, ) @@ -954,6 +987,60 @@ impl ShuffleRepartitioner { }); Ok(used) } + + /// Appends rows of specified indices from columns into active array builders in the specified partition. + async fn append_rows_to_partition( + &mut self, + columns: &[ArrayRef], + indices: &[usize], + partition_id: usize, + ) -> Result { + let mut mem_diff = 0; + + let output = &mut self.buffered_partitions[partition_id]; + + let time_metric = self.metrics.baseline.elapsed_compute(); + + // If the range of indices is not big enough, just appending the rows into + // active array builders instead of directly adding them as a record batch. + let mut start_index: usize = 0; + let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); + + loop { + match output_ret { + AppendRowStatus::MemDiff(l) => { + mem_diff += l?; + break; + } + AppendRowStatus::StartIndex(new_start) => { + // Cannot allocate enough memory for the array builders in the partition, + // spill partitions and retry. + self.spill().await?; + + let output = &mut self.buffered_partitions[partition_id]; + output.reservation.free(); + + let time_metric = self.metrics.baseline.elapsed_compute(); + + start_index = new_start; + output_ret = output.append_rows(columns, indices, start_index, time_metric); + + if let AppendRowStatus::StartIndex(new_start) = output_ret { + if new_start == start_index { + // If the start index is not updated, it means that the partition + // is still not able to allocate enough memory for the array builders. + return Err(DataFusionError::Internal( + "Partition is still not able to allocate enough memory for the array builders after spilling." + .to_string(), + )); + } + } + } + } + } + + Ok(mem_diff) + } } /// consume the `buffered_partitions` and do spill into a single temp shuffle output file @@ -1470,6 +1557,8 @@ mod test { use datafusion::physical_plan::common::collect; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_expr::expressions::Column; use tokio::runtime::Runtime; @@ -1504,25 +1593,65 @@ mod test { #[test] #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` fn test_insert_larger_batch() { + shuffle_write_test(10000, 1, 16, None); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn test_insert_smaller_batch() { + shuffle_write_test(1000, 1, 16, None); + shuffle_write_test(1000, 10, 16, None); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files". + fn test_large_number_of_partitions() { + shuffle_write_test(10000, 10, 200, Some(10 * 1024 * 1024)); + shuffle_write_test(10000, 10, 2000, Some(10 * 1024 * 1024)); + } + + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + #[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files". + fn test_large_number_of_partitions_spilling() { + shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); + } + + fn shuffle_write_test( + batch_size: usize, + num_batches: usize, + num_partitions: usize, + memory_limit: Option, + ) { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); let mut b = StringBuilder::new(); - for i in 0..10000 { + for i in 0..batch_size { b.append_value(format!("{i}")); } let array = b.finish(); let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); - let batches = vec![batch.clone()]; + let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); let partitions = &[batches]; let exec = ShuffleWriterExec::try_new( Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), ) .unwrap(); - let ctx = SessionContext::new(); + + // 10MB memory should be enough for running this test + let config = SessionConfig::new(); + let mut runtime_env_builder = RuntimeEnvBuilder::new(); + runtime_env_builder = match memory_limit { + Some(limit) => runtime_env_builder.with_memory_limit(limit, 1.0), + None => runtime_env_builder, + }; + let runtime_env = Arc::new(runtime_env_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); let rt = Runtime::new().unwrap(); From 851427f253c522168adf403c4587763e428f6fa1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 16 Oct 2024 13:03:01 -0700 Subject: [PATCH 2/5] fix --- native/core/src/execution/datafusion/shuffle_writer.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 6c3174667..32c12c804 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -55,6 +55,7 @@ use datafusion::{ }, }; use datafusion_physical_expr::EquivalenceProperties; +use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; @@ -1111,7 +1112,11 @@ async fn external_shuffle( ); while let Some(batch) = input.next().await { - repartitioner.insert_batch(batch?).await?; + // Block on the repartitioner to insert the batch and shuffle the rows + // into the corresponding partition buffer. + // Otherwise, pull the next batch from the input stream might overwrite the + // current batch in the repartitioner. + block_on(repartitioner.insert_batch(batch?))?; } repartitioner.shuffle_write().await } From 2d5478abf30d60ebaa613b635a5a62d60404c234 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Oct 2024 12:36:56 -0700 Subject: [PATCH 3/5] fix --- .../execution/datafusion/shuffle_writer.rs | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 32c12c804..957b7432d 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -59,7 +59,6 @@ use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; -use tokio::task; use crate::{ common::bit::ceil, @@ -973,8 +972,7 @@ impl ShuffleRepartitioner { &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, - ) - .await?; + )?; timer.stop(); @@ -1045,7 +1043,7 @@ impl ShuffleRepartitioner { } /// consume the `buffered_partitions` and do spill into a single temp shuffle output file -async fn spill_into( +fn spill_into( buffered_partitions: &mut [PartitionBuffer], path: &Path, num_output_partitions: usize, @@ -1058,25 +1056,22 @@ async fn spill_into( } let path = path.to_owned(); - task::spawn_blocking(move || { - let mut offsets = vec![0; num_output_partitions + 1]; - let mut spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path)?; + let mut offsets = vec![0; num_output_partitions + 1]; + let mut spill_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?; - for i in 0..num_output_partitions { - offsets[i] = spill_data.stream_position()?; - spill_data.write_all(&output_batches[i])?; - output_batches[i].clear(); - } - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = spill_data.stream_position()?; - Ok(offsets) - }) - .await - .map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))? + for i in 0..num_output_partitions { + offsets[i] = spill_data.stream_position()?; + spill_data.write_all(&output_batches[i])?; + output_batches[i].clear(); + } + // add one extra offset at last to ease partition length computation + offsets[num_output_partitions] = spill_data.stream_position()?; + Ok(offsets) } impl Debug for ShuffleRepartitioner { From e8bf53cdebd46bd119a4879c8efaa6fa7bb129ca Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Oct 2024 14:15:35 -0700 Subject: [PATCH 4/5] fix --- native/core/src/execution/datafusion/shuffle_writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 957b7432d..55aaa5bd1 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -1015,6 +1015,7 @@ impl ShuffleRepartitioner { // Cannot allocate enough memory for the array builders in the partition, // spill partitions and retry. self.spill().await?; + self.reservation.free(); let output = &mut self.buffered_partitions[partition_id]; output.reservation.free(); From fd78a749877a2cdcfd37d3dc0c34a81c489f34d7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Oct 2024 23:42:39 -0700 Subject: [PATCH 5/5] fix --- native/core/src/execution/datafusion/shuffle_writer.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 55aaa5bd1..7587ff06d 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -786,7 +786,6 @@ impl ShuffleRepartitioner { let mut partition_starts = partition_ends; partition_starts.push(input.num_rows()); - let mut mem_diff = 0; // For each interval of row indices of partition, taking rows from input batch and // appending into output buffer. for (partition_id, (&start, &end)) in partition_starts @@ -795,7 +794,7 @@ impl ShuffleRepartitioner { .enumerate() .filter(|(_, (start, end))| start < end) { - mem_diff += self + let mut mem_diff = self .append_rows_to_partition( input.columns(), &shuffled_partition_ids[start..end], @@ -818,8 +817,6 @@ impl ShuffleRepartitioner { let mem_used = self.reservation.size(); let mem_decrease = mem_used.min(-mem_diff as usize); self.reservation.shrink(mem_decrease); - - mem_diff += mem_decrease as isize; } } }