Skip to content

Commit

Permalink
add date support
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 6, 2024
1 parent 1743fa1 commit 08fb64d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 6 deletions.
15 changes: 13 additions & 2 deletions pg_replicate/src/clients/bigquery.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashSet, fs};

use bytes::{Buf, BufMut};
use chrono::{DateTime, NaiveDateTime, Utc};
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use futures::StreamExt;
use gcp_bigquery_client::yup_oauth2::parse_service_account_key;
use gcp_bigquery_client::{
Expand Down Expand Up @@ -75,8 +75,9 @@ impl BigQueryClient {
&Type::NUMERIC => "bignumeric",
&Type::BOOL => "bool",
&Type::BYTEA => "bytes",
&Type::VARCHAR | &Type::BPCHAR | &Type::TEXT => "string",
&Type::CHAR | &Type::BPCHAR | &Type::VARCHAR | &Type::NAME | &Type::TEXT => "string",
&Type::TIMESTAMP | &Type::TIMESTAMPTZ => "timestamp",
&Type::DATE => "date",
_ => "bytes",
}
}
Expand Down Expand Up @@ -351,6 +352,7 @@ impl BigQueryClient {
Cell::F32(i) => s.push_str(&format!("{i}")),
Cell::F64(i) => s.push_str(&format!("{i}")),
Cell::Numeric(n) => s.push_str(&format!("{n}")),
Cell::Date(t) => s.push_str(&format!("'{t}'")),
Cell::TimeStamp(t) => s.push_str(&format!("'{t}'")),
Cell::TimeStampTz(t) => s.push_str(&format!("'{t}'")),
Cell::Bytes(b) => {
Expand Down Expand Up @@ -543,6 +545,10 @@ impl Message for TableRow {
let s = n.to_string();
::prost::encoding::string::encode(tag, &s, buf);
}
Cell::Date(t) => {
let s = t.format("%Y-%m-%d").to_string();
::prost::encoding::string::encode(tag, &s, buf);
}
Cell::TimeStamp(t) => {
let s = t.format("%Y-%m-%d %H:%M:%S%.f").to_string();
::prost::encoding::string::encode(tag, &s, buf);
Expand Down Expand Up @@ -592,6 +598,10 @@ impl Message for TableRow {
let s = n.to_string();
::prost::encoding::string::encoded_len(tag, &s)
}
Cell::Date(t) => {
let s = t.format("%Y-%m-%d").to_string();
::prost::encoding::string::encoded_len(tag, &s)
}
Cell::TimeStamp(t) => {
let s = t.format("%Y-%m-%d %H:%M:%S%.f").to_string();
::prost::encoding::string::encoded_len(tag, &s)
Expand Down Expand Up @@ -619,6 +629,7 @@ impl Message for TableRow {
Cell::F32(i) => *i = 0.,
Cell::F64(i) => *i = 0.,
Cell::Numeric(n) => *n = PgNumeric::default(),
Cell::Date(t) => *t = NaiveDate::default(),
Cell::TimeStamp(t) => *t = NaiveDateTime::default(),
Cell::TimeStampTz(t) => *t = DateTime::<Utc>::default(),
Cell::Bytes(b) => b.clear(),
Expand Down
11 changes: 10 additions & 1 deletion pg_replicate/src/clients/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,15 @@ impl DuckDbClient {
fn postgres_typ_to_duckdb_typ(typ: &Type) -> &'static str {
match typ {
&Type::INT2 | &Type::INT4 | &Type::INT8 => "integer",
&Type::NUMERIC => "numeric",
&Type::FLOAT4 => "float",
&Type::FLOAT8 => "double",
&Type::BOOL => "bool",
&Type::BYTEA => "bytea",
&Type::VARCHAR | &Type::BPCHAR => "text",
&Type::CHAR | &Type::BPCHAR | &Type::VARCHAR | &Type::NAME | &Type::TEXT => "text",
&Type::TIMESTAMP => "timestamp",
&Type::TIMESTAMPTZ => "timestamptz",
&Type::DATE => "date",
typ => panic!("duckdb doesn't yet support type {typ}"),
}
}
Expand Down Expand Up @@ -365,6 +370,10 @@ impl ToSql for Cell {
let s = n.to_string();
Ok(ToSqlOutput::Owned(Value::Text(s)))
}
Cell::Date(t) => {
let s = t.format("%Y-%m-%d").to_string();
Ok(ToSqlOutput::Owned(Value::Text(s)))
}
Cell::TimeStamp(t) => {
let s = t.format("%Y-%m-%d %H:%M:%S%.f").to_string();
Ok(ToSqlOutput::Owned(Value::Text(s)))
Expand Down
7 changes: 6 additions & 1 deletion pg_replicate/src/conversions/cdc_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use bigdecimal::{BigDecimal, ParseBigDecimalError};
use chrono::{DateTime, FixedOffset, NaiveDateTime};
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
use postgres_protocol::message::backend::{
BeginBody, CommitBody, DeleteBody, InsertBody, LogicalReplicationMessage, RelationBody,
ReplicationMessage, TupleData, UpdateBody,
Expand Down Expand Up @@ -140,6 +140,11 @@ impl CdcEventConverter {
let val = from_bytea_hex(val)?;
Ok(Cell::Bytes(val))
}
Type::DATE => {
let val = from_utf8(bytes)?;
let val = NaiveDate::parse_from_str(val, "%Y-%m-%d")?;
Ok(Cell::Date(val))
}
Type::TIMESTAMP => {
let val = from_utf8(bytes)?;
let val = NaiveDateTime::parse_from_str(val, "%Y-%m-%d %H:%M:%S%.f")?;
Expand Down
3 changes: 2 additions & 1 deletion pg_replicate/src/conversions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use numeric::PgNumeric;

pub mod cdc_event;
Expand All @@ -17,6 +17,7 @@ pub enum Cell {
F32(f32),
F64(f64),
Numeric(PgNumeric),
Date(NaiveDate),
TimeStamp(NaiveDateTime),
TimeStampTz(DateTime<Utc>),
Bytes(Vec<u8>),
Expand Down
5 changes: 4 additions & 1 deletion pg_replicate/src/conversions/table_row.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, FixedOffset, NaiveDateTime};
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
#[cfg(feature = "unknown_types_to_bytes")]
use postgres_protocol::types;
use thiserror::Error;
Expand Down Expand Up @@ -120,6 +120,9 @@ impl TableRowConverter {
Type::BYTEA => Self::get_from_row(row, i, column_schema.nullable, |val: Vec<u8>| {
Cell::Bytes(val)
}),
Type::DATE => Self::get_from_row(row, i, column_schema.nullable, |val: NaiveDate| {
Cell::Date(val)
}),
Type::TIMESTAMPTZ => Self::get_from_row(
row,
i,
Expand Down

0 comments on commit 08fb64d

Please sign in to comment.