Skip to content

Commit

Permalink
revert unrelated write_table_rows change
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Nov 30, 2024
1 parent 15fecf6 commit 388e84d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
7 changes: 6 additions & 1 deletion pg_replicate/src/pipeline/batching/data_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ impl<Src: Source, Snk: BatchSink> BatchDataPipeline<Src, Snk> {

while let Some(batch) = batch_timeout_stream.next().await {
info!("got {} table copy events in a batch", batch.len());
//TODO: Avoid a vec copy
let mut rows = Vec::with_capacity(batch.len());
for row in batch {
rows.push(row.map_err(CommonSourceError::TableCopyStream)?);
}
self.sink
.write_table_rows(batch, table_schema.table_id)
.write_table_rows(rows, table_schema.table_id)
.await
.map_err(PipelineError::Sink)?;
}
Expand Down
17 changes: 6 additions & 11 deletions pg_replicate/src/pipeline/sinks/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::info;
use crate::{
clients::bigquery::BigQueryClient,
conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell},
pipeline::{sources::postgres::TableCopyStreamError, PipelineResumptionState},
pipeline::PipelineResumptionState,
table::{ColumnSchema, TableId, TableName, TableSchema},
};

Expand Down Expand Up @@ -162,24 +162,19 @@ impl BatchSink for BigQueryBatchSink {

async fn write_table_rows(
&mut self,
mut table_rows: Vec<Result<TableRow, TableCopyStreamError>>,
mut table_rows: Vec<TableRow>,
table_id: TableId,
) -> Result<(), Self::Error> {
let table_schema = self.get_table_schema(table_id)?;
let table_name = Self::table_name_in_bq(&table_schema.table_name);
let table_descriptor = table_schema.into();

let new_rows = table_rows
.drain(..)
.filter_map(|row| row.ok())
.map(|mut row| {
row.values.push(Cell::String("UPSERT".to_string()));
row
})
.collect::<Vec<TableRow>>();
for table_row in &mut table_rows {
table_row.values.push(Cell::String("UPSERT".to_string()));
}

self.client
.stream_rows(&self.dataset_id, table_name, &table_descriptor, &new_rows)
.stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows)
.await?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions pg_replicate/src/pipeline/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
table::{TableId, TableSchema},
};

use super::{sources::postgres::TableCopyStreamError, PipelineResumptionState};
use super::PipelineResumptionState;

#[cfg(feature = "bigquery")]
pub mod bigquery;
Expand Down Expand Up @@ -53,7 +53,7 @@ pub trait BatchSink {
) -> Result<(), Self::Error>;
async fn write_table_rows(
&mut self,
rows: Vec<Result<TableRow, TableCopyStreamError>>,
rows: Vec<TableRow>,
table_id: TableId,
) -> Result<(), Self::Error>;
async fn write_cdc_events(&mut self, events: Vec<CdcEvent>) -> Result<PgLsn, Self::Error>;
Expand Down

0 comments on commit 388e84d

Please sign in to comment.