Skip to content

Commit

Permalink
[CLN]: make materialization function rather than struct (#3165)
Browse files Browse the repository at this point in the history
## Description of changes

Simplifies usage of log materialization logic & lifetimes in preparation for next PR in this stack. There are no actual logic changes.

## Test plan
*How are these changes tested?*

- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*

n/a
  • Loading branch information
codetheweb authored Dec 2, 2024
1 parent 337fe73 commit ccff355
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 243 deletions.
7 changes: 2 additions & 5 deletions rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::execution::operator::Operator;
use crate::execution::operators::normalize_vectors::normalize;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::LogMaterializer;
use crate::segment::LogMaterializerError;
use crate::segment::{materialize_logs, LogMaterializerError};
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::DistanceFunction;
Expand Down Expand Up @@ -134,9 +133,7 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
}
}
};
let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None);
let logs = match log_materializer
.materialize()
let logs = match materialize_logs(&record_segment_reader, &input.log, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Expand Down
6 changes: 2 additions & 4 deletions rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {

#[cfg(test)]
mod tests {
use crate::segment::materialize_logs;
use crate::segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError};
use crate::segment::types::SegmentFlusher;
use crate::segment::LogMaterializer;
use crate::{
execution::{
operator::Operator,
Expand Down Expand Up @@ -293,9 +293,7 @@ mod tests {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
.expect("Log materialization failed");
Expand Down
9 changes: 4 additions & 5 deletions rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
metadata_segment::{MetadataSegmentError, MetadataSegmentReader},
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError, MaterializedLogRecord,
LogMaterializerError, MaterializedLogRecord,
},
};

Expand Down Expand Up @@ -416,10 +417,8 @@ impl Operator<FilterInput, FilterOutput> for FilterOperator {
}
Err(e) => Err(*e),
}?;
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()
let cloned_record_segment_reader = record_segment_reader.clone();
let materialized_logs = materialize_logs(&cloned_record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;
let metadata_log_reader = MetadataLogReader::new(&materialized_logs);
Expand Down
21 changes: 9 additions & 12 deletions rust/worker/src/execution/operators/get_vectors_operator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{self, RecordSegmentReader},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};
use async_trait::async_trait;
Expand Down Expand Up @@ -120,17 +121,13 @@ impl Operator<GetVectorsOperatorInput, GetVectorsOperatorOutput> for GetVectorsO
},
};
// Step 1: Materialize the logs.
let materializer = LogMaterializer::new(
record_segment_reader.clone(),
input.log_records.clone(),
None,
);
let mat_records = match materializer.materialize().await {
Ok(records) => records,
Err(e) => {
return Err(GetVectorsOperatorError::LogMaterialization(e));
}
};
let mat_records =
match materialize_logs(&record_segment_reader, &input.log_records, None).await {
Ok(records) => records,
Err(e) => {
return Err(GetVectorsOperatorError::LogMaterialization(e));
}
};

// Search the log records for the user ids
let mut remaining_search_user_ids: HashSet<String> =
Expand Down
11 changes: 3 additions & 8 deletions rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::{LogMaterializer, LogMaterializerError, MaterializedLogRecord};
use crate::segment::{materialize_logs, LogMaterializerError, MaterializedLogRecord};
use crate::{
execution::operator::Operator,
segment::{
Expand Down Expand Up @@ -147,13 +147,8 @@ impl Operator<HnswKnnOperatorInput, HnswKnnOperatorOutput> for HnswKnnOperator {
}
},
};
let log_materializer = LogMaterializer::new(
Some(record_segment_reader.clone()),
input.logs.clone(),
None,
);
let logs = match log_materializer
.materialize()
let some_reader = Some(record_segment_reader.clone());
let logs = match materialize_logs(&some_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/execution/operators/knn_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tonic::async_trait;
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -72,8 +73,7 @@ impl Operator<KnnLogInput, KnnLogOutput> for KnnOperator {
Err(e) => Err(*e),
}?;

let materializer = LogMaterializer::new(record_segment_reader, input.logs.clone(), None);
let logs = materializer.materialize().await?;
let logs = materialize_logs(&record_segment_reader, &input.logs, None).await?;

let target_vector;
let target_embedding = if let DistanceFunction::Cosine = input.distance_function {
Expand Down
8 changes: 3 additions & 5 deletions rust/worker/src/execution/operators/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -213,10 +214,7 @@ impl Operator<LimitInput, LimitOutput> for LimitOperator {
let mut materialized_log_offset_ids = match &input.log_offset_ids {
SignedRoaringBitmap::Include(rbm) => rbm.clone(),
SignedRoaringBitmap::Exclude(rbm) => {
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()
let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;

Expand Down
12 changes: 4 additions & 8 deletions rust/worker/src/execution/operators/prefetch_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tracing::{trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -84,13 +85,8 @@ impl Operator<PrefetchRecordInput, PrefetchRecordOutput> for PrefetchRecordOpera
Err(e) => return Err((*e).into()),
};

let materializer = LogMaterializer::new(
Some(record_segment_reader.clone()),
input.logs.clone(),
None,
);
let materialized_logs = materializer
.materialize()
let some_reader = Some(record_segment_reader.clone());
let materialized_logs = materialize_logs(&some_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;

Expand Down
9 changes: 4 additions & 5 deletions rust/worker/src/execution/operators/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use tracing::{error, trace, Instrument, Span};
use crate::{
execution::operator::Operator,
segment::{
materialize_logs,
record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
LogMaterializer, LogMaterializerError,
LogMaterializerError,
},
};

Expand Down Expand Up @@ -104,10 +105,8 @@ impl Operator<ProjectionInput, ProjectionOutput> for ProjectionOperator {
}
Err(e) => Err(*e),
}?;
let materializer =
LogMaterializer::new(record_segment_reader.clone(), input.logs.clone(), None);
let materialized_logs = materializer
.materialize()

let materialized_logs = materialize_logs(&record_segment_reader, &input.logs, None)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await?;

Expand Down
18 changes: 8 additions & 10 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::segment::materialize_logs;
use crate::segment::metadata_segment::MetadataSegmentError;
use crate::segment::metadata_segment::MetadataSegmentWriter;
use crate::segment::record_segment::ApplyMaterializedLogError;
use crate::segment::record_segment::RecordSegmentReader;
use crate::segment::record_segment::RecordSegmentReaderCreationError;
use crate::segment::LogMaterializer;
use crate::segment::LogMaterializerError;
use crate::segment::SegmentWriter;
use crate::{
Expand Down Expand Up @@ -160,16 +160,14 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
};
}
};
let materializer = LogMaterializer::new(
record_segment_reader,
input.chunk.clone(),
Some(input.next_offset_id.clone()),
);
// Materialize the logs.
let res = match materializer
.materialize()
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
let res = match materialize_logs(
&record_segment_reader,
&input.chunk,
Some(input.next_offset_id.clone()),
)
.instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs"))
.await
{
Ok(records) => records,
Err(e) => {
Expand Down
44 changes: 16 additions & 28 deletions rust/worker/src/segment/metadata_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,11 +1075,12 @@ mod test {
#![allow(deprecated)]

use crate::segment::{
materialize_logs,
metadata_segment::{MetadataSegmentReader, MetadataSegmentWriter},
record_segment::{
RecordSegmentReader, RecordSegmentReaderCreationError, RecordSegmentWriter,
},
LogMaterializer, SegmentFlusher, SegmentWriter,
SegmentFlusher, SegmentWriter,
};
use chroma_blockstore::{
arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
Expand Down Expand Up @@ -1193,9 +1194,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1265,9 +1264,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1347,9 +1345,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1487,9 +1484,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1566,9 +1561,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1740,9 +1734,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1801,9 +1793,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -1962,9 +1953,7 @@ mod test {
}
}
};
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
let mat_records = materialize_logs(&record_segment_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down Expand Up @@ -2021,9 +2010,8 @@ mod test {
MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Error creating segment writer");
let materializer = LogMaterializer::new(Some(record_segment_reader), data, None);
let mat_records = materializer
.materialize()
let some_reader = Some(record_segment_reader);
let mat_records = materialize_logs(&some_reader, &data, None)
.await
.expect("Log materialization failed");
metadata_writer
Expand Down
Loading

0 comments on commit ccff355

Please sign in to comment.