Skip to content

Commit

Permalink
update code syncing with new version
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdul Haris Djafar authored and Abdul Haris Djafar committed Dec 2, 2024
1 parent c31a5bd commit 056c791
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pg_replicate/examples/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
}
};

let delta_sink = DeltaSink::new(delta_args.delta_path);
let delta_sink = DeltaSink::new(delta_args.delta_path);

let batch_config = BatchConfig::new(1000, Duration::from_secs(10));
let mut pipeline = BatchDataPipeline::new(postgres_source, delta_sink, action, batch_config);
Expand Down
2 changes: 1 addition & 1 deletion pg_replicate/src/clients/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl DeltaClient {

fn cell_to_arrow(&self, typ: &Cell) -> Arc<dyn Array> {
match typ {
Cell::Null => Arc::new(StringArray::from(vec![String::from("")])),
Cell::Null => Arc::new(StringArray::from(vec![String::from("")])),
Cell::Uuid(value) => Arc::new(StringArray::from(vec![value.to_string()])),
Cell::Bytes(value) => {
let data = std::str::from_utf8(value)
Expand Down
24 changes: 14 additions & 10 deletions pg_replicate/src/pipeline/sinks/delta.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use std::{collections::{HashMap, HashSet}, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use tokio_postgres::types::PgLsn;
use tracing::info;

use super::{BatchSink, SinkError};
use crate::{
clients::delta::DeltaClient, conversions::{cdc_event::CdcEvent, table_row::TableRow}, pipeline::PipelineResumptionState, table::{TableId, TableSchema}
clients::delta::DeltaClient,
conversions::{cdc_event::CdcEvent, table_row::TableRow},
pipeline::PipelineResumptionState,
table::{TableId, TableSchema},
};
use deltalake::arrow::error::ArrowError;
use deltalake::{arrow::datatypes::Schema, DeltaTableError};
use super::{BatchSink, SinkError};
use thiserror::Error;
use deltalake::arrow::error::ArrowError;


#[derive(Debug, Error)]
pub enum DeltaSinkError {
Expand All @@ -31,7 +36,7 @@ pub enum DeltaSinkError {
CommitWithoutBegin,
}

pub struct DeltaSink{
pub struct DeltaSink {
client: DeltaClient,
}

Expand Down Expand Up @@ -63,7 +68,6 @@ impl BatchSink for DeltaSink {
&mut self,
table_schemas: HashMap<TableId, TableSchema>,
) -> Result<(), Self::Error> {

let mut delta_schema: HashMap<String, Arc<Schema>> = HashMap::new();

for (_, table_schema) in &table_schemas {
Expand All @@ -78,7 +82,7 @@ impl BatchSink for DeltaSink {
}
self.client.delta_schemas = Some(delta_schema);
self.client.table_schemas = Some(table_schemas);

Ok(())
}

Expand All @@ -89,8 +93,8 @@ impl BatchSink for DeltaSink {
) -> Result<(), Self::Error> {
for row in rows {
self.client
.write_to_table(row, table_id, String::from("I"))
.await?;
.write_to_table(row, table_id, String::from("I"))
.await?;
}
Ok(())
}
Expand Down

0 comments on commit 056c791

Please sign in to comment.