Skip to content

Commit

Permalink
Merge pull request #243 from Shnatsel/drop-flume
Browse files Browse the repository at this point in the history
Drop flume to reduce dependency footprint now that std has a good channel
  • Loading branch information
johannesvollmer authored Oct 29, 2024
2 parents ce6ec41 + e94073a commit 9cc9642
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ bit_field = "^0.10.1" # exr file version bit flags
miniz_oxide = "^0.7.1" # zip compression for pxr24
smallvec = "^1.7.0" # make cache-friendly allocations TODO profile if smallvec is really an improvement!
rayon-core = "^1.11.0" # threading for parallel compression TODO make this an optional feature?
flume = { version = "^0.11.0", default-features = false } # crossbeam, but less unsafe code TODO make this an optional feature?
zune-inflate = { version = "^0.2.3", default-features = false, features = ["zlib"] } # zip decompression, faster than miniz_oxide

[dev-dependencies]
Expand Down
7 changes: 4 additions & 3 deletions src/block/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::convert::TryFrom;
use std::fmt::Debug;
use std::io::{Read, Seek};
use std::sync::mpsc;
use rayon_core::{ThreadPool, ThreadPoolBuildError};

use smallvec::alloc::sync::Arc;
Expand Down Expand Up @@ -387,8 +388,8 @@ impl<R: ChunksReader> SequentialBlockDecompressor<R> {
#[derive(Debug)]
pub struct ParallelBlockDecompressor<R: ChunksReader> {
remaining_chunks: R,
sender: flume::Sender<Result<UncompressedBlock>>,
receiver: flume::Receiver<Result<UncompressedBlock>>,
sender: mpsc::Sender<Result<UncompressedBlock>>,
receiver: mpsc::Receiver<Result<UncompressedBlock>>,
currently_decompressing_count: usize,
max_threads: usize,

Expand Down Expand Up @@ -437,7 +438,7 @@ impl<R: ChunksReader> ParallelBlockDecompressor<R> {

let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times

let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic?
let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic?

Ok(Self {
shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
Expand Down
9 changes: 5 additions & 4 deletions src/block/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::fmt::Debug;
use std::io::Seek;
use std::iter::Peekable;
use std::ops::Not;
use std::sync::mpsc;
use rayon_core::{ThreadPool, ThreadPoolBuildError};

use smallvec::alloc::collections::BTreeMap;

use crate::block::UncompressedBlock;
use crate::block::chunk::{Chunk};
use crate::block::chunk::Chunk;
use crate::compression::Compression;
use crate::error::{Error, Result, UnitResult, usize_to_u64};
use crate::io::{Data, Tracking, Write};
Expand Down Expand Up @@ -337,8 +338,8 @@ pub struct ParallelBlocksCompressor<'w, W> {
meta: &'w MetaData,
sorted_writer: SortedBlocksWriter<'w, W>,

sender: flume::Sender<Result<(usize, usize, Chunk)>>,
receiver: flume::Receiver<Result<(usize, usize, Chunk)>>,
sender: mpsc::Sender<Result<(usize, usize, Chunk)>>,
receiver: mpsc::Receiver<Result<(usize, usize, Chunk)>>,
pool: rayon_core::ThreadPool,

currently_compressing_count: usize,
Expand Down Expand Up @@ -379,7 +380,7 @@ impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter {
};

let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times
let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic?
let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic?

Some(Self {
sorted_writer: SortedBlocksWriter::new(meta, chunks_writer),
Expand Down

0 comments on commit 9cc9642

Please sign in to comment.