Skip to content

Commit

Permalink
feat: inital clickhouse client impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed Nov 14, 2024
1 parent 24575a5 commit 4ed3d6e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tracing-subscriber = { version = "0.3", default-features = false }
utoipa = { version = "4.2.3", default-features = false }
utoipa-swagger-ui = { version = "7.1.0", default-features = false }
uuid = { version = "1.10.0", default-features = false }
clickhouse = { version = "0.12.2", default-features = false}

# [patch.crates-io]
# gcp-bigquery-client = { path = "../gcp-bigquery-client" }
2 changes: 2 additions & 0 deletions pg_replicate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ bigdecimal = { workspace = true, features = ["std"] }
bytes = { workspace = true }
byteorder = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clickhouse = { workspace = true, default-features = true, optional= true}
duckdb = { workspace = true, optional = true }
futures = { workspace = true }
gcp-bigquery-client = { workspace = true, optional = true, features = [
Expand Down Expand Up @@ -59,6 +60,7 @@ tracing-subscriber = { workspace = true, default-features = true, features = [

[features]
bigquery = ["dep:gcp-bigquery-client", "dep:prost"]
clickhouse = ["dep:clickhouse"]
duckdb = ["dep:duckdb"]
stdout = []
# When enabled converts unknown types to bytes
Expand Down
89 changes: 89 additions & 0 deletions pg_replicate/src/clients/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@

use clickhouse::Client;

// use crate::{
// conversions::{table_row::TableRow, Cell},
// table::{ColumnSchema, TableId, TableName, TableSchema},
// };


pub struct ClickhouseClient {
client: Client,
}

pub struct ClickhouseConfig {
url: String,
user: String,
pasword: String,
database: String
}

impl ClickhouseClient {
pub fn create_client(config: &ClickhouseConfig) -> Result<ClickhouseClient, clickhouse::error::Error> {
let client = Client::default()
// should include both protocol and port
.with_url(config.url)
.with_user(config.user)
.with_password(config.password)
.with_database(config.database);
Ok(ClickhouseClient {
client
})
}

// fn async_insert(client: &Client) ->



pub fn create_table_if_missing(
&self,
table_name: &TableName,
column_schemas: &[ColumnSchema],
) -> Result<bool, clickhouse::error::Error> {
if self.table_exists(table_name)? {
Ok(false)
} else {
self.create_table(table_name, column_schemas)?;
Ok(true)
}
}



pub fn create_table(
&self,
table_name: &TableName,
column_schemas: &[ColumnSchema],
) -> Result<(), clickhouse::error::Error> {
let columns_spec = Self::create_columns_spec(column_schemas);
let query = format!(
"create table {}.{} {}",
table_name.schema, table_name.name, columns_spec
);
self.conn.execute(&query, [])?;
Ok(())
}

pub fn table_exists(&self, table_name: &TableName) -> Result<bool, clickhouse::error::Error> {
let query =
"select * from information_schema.tables where table_catalog = ? and table_schema = ? and table_name = ?;";
let mut stmt = self.conn.prepare(query)?;
let exists = stmt.exists([&self.current_database, &table_name.schema, &table_name.name])?;
Ok(exists)
}

pub fn insert_rows(
&self,
table_name: &TableName,
table_rows: &Vec<TableRow>,
) -> Result<(), clickhouse::error::Error> {
let table_name = format!("{}.{}", table_name.schema, table_name.name);
let column_count = table_row.values.len();
let query = Self::create_insert_row_query(&table_name, column_count);
let mut stmt = self.conn.prepare(&query)?;
stmt.execute(params_from_iter(table_row.values.iter()))?;

Ok(())
}

}
2 changes: 2 additions & 0 deletions pg_replicate/src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "clickhouse")]
pub mod clickhouse;
#[cfg(feature = "bigquery")]
pub mod bigquery;
#[cfg(feature = "duckdb")]
Expand Down

0 comments on commit 4ed3d6e

Please sign in to comment.