Skip to content

Commit

Permalink
feat: cleanup mysql impl
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen committed Mar 21, 2024
1 parent e4fd3ca commit 4148cb2
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 122 deletions.
94 changes: 17 additions & 77 deletions connector_arrow/src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,92 +49,32 @@ impl<C: Queryable> Connector for MySQLConnection<C> {

fn type_db_into_arrow(ty: &str) -> Option<DataType> {
Some(match ty {
"boolean" | "bool" => DataType::Boolean,
"smallint" | "int2" => DataType::Int16,
"integer" | "int4" => DataType::Int32,
"bigint" | "int8" => DataType::Int64,
"real" | "float4" => DataType::Float32,
"double precision" | "float8" => DataType::Float64,
"numeric" | "decimal" => DataType::Utf8,
"null" => DataType::Null,

"timestamp" | "timestamp without time zone" => {
DataType::Timestamp(TimeUnit::Microsecond, None)
}
"timestamptz" | "timestamp with time zone" => {
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into()))
}
"date" => DataType::Date32,
"time" | "time without time zone" => DataType::Time64(TimeUnit::Microsecond),
"interval" => DataType::Interval(IntervalUnit::MonthDayNano),
"tinyint" | "bool" | "boolean" => DataType::Int8,
"smallint" => DataType::Int16,
"integer" | "int" => DataType::Int32,
"bigint" => DataType::Int64,

"bytea" => DataType::Binary,
"bit" | "bit varying" | "varbit" => DataType::Binary,
"tinyint unsigned" => DataType::UInt8,
"smallint unsigned" => DataType::UInt16,
"integer unsigned" | "int unsigned" => DataType::UInt32,
"bigint unsigned" => DataType::UInt64,

"text" | "varchar" | "char" | "bpchar" => DataType::Utf8,
"real" | "float4" => DataType::Float32,
"double" | "float8" => DataType::Float64,

_ if ty.starts_with("bit") => DataType::Binary,
_ if ty.starts_with("varchar") | ty.starts_with("char") | ty.starts_with("bpchar") => {
DataType::Utf8
}
_ if ty.starts_with("decimal") | ty.starts_with("numeric") => DataType::Utf8,
"bytea" => DataType::Binary,
"bit" | "tiny_blob" | "medium_blob" | "long_blob" | "blob" => DataType::Binary,

"varchar" | "var_string" | "string" => DataType::Utf8,

_ => return None,
})
}

fn type_arrow_into_db(ty: &DataType) -> Option<String> {
Some(
match ty {
DataType::Null => "smallint",
DataType::Boolean => "bool",

DataType::Int8 => "smallint",
DataType::Int16 => "smallint",
DataType::Int32 => "integer",
DataType::Int64 => "bigint",

DataType::UInt8 => "smallint",
DataType::UInt16 => "integer",
DataType::UInt32 => "bigint",
DataType::UInt64 => "decimal(20, 0)",

DataType::Float16 => "real",
DataType::Float32 => "real",
DataType::Float64 => "double precision",

// PostgreSQL timestamps cannot store timezone in the schema.
// PostgreSQL timestamps are microseconds since 2000-01-01T00:00.
// Arrow timestamps *can be* microseconds since 1970-01-01T00:00.
// ... which means we cannot store the full range of the Arrow microsecond
// timestamp in PostgreSQL timestamp without changing its meaning.
// ... so we must Int64 instead.
DataType::Timestamp(_, _) => "bigint",
DataType::Date32 => "integer",
DataType::Date64 => "bigint",
DataType::Time32(_) => "integer",
DataType::Time64(_) => "bigint",
DataType::Duration(_) => "bigint",
DataType::Interval(_) => return None,

DataType::Utf8 | DataType::LargeUtf8 => "text",

DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => "bytea",

DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
return Some(format!("decimal({precision}, {scale})"))
}

DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _) => return None,
}
.into(),
)
fn type_arrow_into_db(_ty: &DataType) -> Option<String> {
None
}
}

Expand Down
107 changes: 68 additions & 39 deletions connector_arrow/src/mysql/types.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,92 @@
use std::sync::Arc;

use arrow::datatypes::*;
use mysql::{consts::ColumnType, prelude::Protocol};
use mysql::consts::{ColumnFlags, ColumnType};
use mysql::prelude::Protocol;

use crate::api::Connector;
use crate::ConnectorError;

pub fn get_result_schema<'a, P: Protocol>(
result: &mysql::ResultSet<'a, 'a, 'a, 'a, P>,
) -> Result<SchemaRef, ConnectorError> {
let mut fields = Vec::new();
for column in result.columns().as_ref() {
let ty = match column.column_type() {
ColumnType::MYSQL_TYPE_NULL => DataType::Null,
let is_unsigned = !(column.flags() & ColumnFlags::UNSIGNED_FLAG).is_empty();
let is_not_null = !(column.flags() & ColumnFlags::NOT_NULL_FLAG).is_empty();

ColumnType::MYSQL_TYPE_TINY => DataType::Int8,
ColumnType::MYSQL_TYPE_SHORT => DataType::Int16,
ColumnType::MYSQL_TYPE_LONG => DataType::Int32,
ColumnType::MYSQL_TYPE_LONGLONG => DataType::Int64,
let db_ty = get_name_of_column_type(&column.column_type(), is_unsigned);
fields.push(create_field(
column.name_str().to_string(),
db_ty,
!is_not_null,
));
}

ColumnType::MYSQL_TYPE_FLOAT => DataType::Float32,
ColumnType::MYSQL_TYPE_DOUBLE => DataType::Float64,
Ok(Arc::new(Schema::new(fields)))
}

ColumnType::MYSQL_TYPE_INT24 => todo!(),
fn create_field(name: String, db_ty: &str, nullable: bool) -> Field {
let data_type = super::MySQLConnection::<mysql::Conn>::type_db_into_arrow(db_ty);
let data_type = data_type.unwrap_or_else(|| todo!());

ColumnType::MYSQL_TYPE_TIMESTAMP => todo!(),
ColumnType::MYSQL_TYPE_DATE => todo!(),
ColumnType::MYSQL_TYPE_TIME => todo!(),
ColumnType::MYSQL_TYPE_DATETIME => todo!(),
ColumnType::MYSQL_TYPE_YEAR => todo!(),
ColumnType::MYSQL_TYPE_NEWDATE => todo!(),
Field::new(name, data_type, nullable)
}

ColumnType::MYSQL_TYPE_TIMESTAMP2 => todo!(),
ColumnType::MYSQL_TYPE_DATETIME2 => todo!(),
ColumnType::MYSQL_TYPE_TIME2 => todo!(),
ColumnType::MYSQL_TYPE_TYPED_ARRAY => todo!(),
fn get_name_of_column_type(col_ty: &ColumnType, unsigned: bool) -> &'static str {
use ColumnType::*;

ColumnType::MYSQL_TYPE_NEWDECIMAL => todo!(),
ColumnType::MYSQL_TYPE_DECIMAL => todo!(),
match (col_ty, unsigned) {
(MYSQL_TYPE_NULL, _) => "null",

ColumnType::MYSQL_TYPE_VARCHAR => DataType::Utf8,
ColumnType::MYSQL_TYPE_VAR_STRING => DataType::Utf8,
ColumnType::MYSQL_TYPE_STRING => DataType::Utf8,
ColumnType::MYSQL_TYPE_JSON => DataType::Utf8,
(MYSQL_TYPE_TINY, false) => "tinyint",
(MYSQL_TYPE_TINY, true) => "tinyint unsigned",

ColumnType::MYSQL_TYPE_ENUM => todo!(),
ColumnType::MYSQL_TYPE_SET => todo!(),
(MYSQL_TYPE_SHORT, false) => "smallint",
(MYSQL_TYPE_SHORT, true) => "smallint unsigned",

ColumnType::MYSQL_TYPE_BIT => todo!(),
ColumnType::MYSQL_TYPE_TINY_BLOB => DataType::Binary,
ColumnType::MYSQL_TYPE_MEDIUM_BLOB => DataType::Binary,
ColumnType::MYSQL_TYPE_LONG_BLOB => DataType::Binary,
ColumnType::MYSQL_TYPE_BLOB => DataType::Binary,
(MYSQL_TYPE_INT24, false) => "mediumint",
(MYSQL_TYPE_INT24, true) => "mediumint unsigned",

ColumnType::MYSQL_TYPE_GEOMETRY => todo!(),
ColumnType::MYSQL_TYPE_UNKNOWN => todo!(),
};
(MYSQL_TYPE_LONG, false) => "int",
(MYSQL_TYPE_LONG, true) => "int unsigned",

fields.push(Field::new(column.name_str(), ty, true));
}
(MYSQL_TYPE_LONGLONG, false) => "bigint",
(MYSQL_TYPE_LONGLONG, true) => "bigint unsigned",

Ok(Arc::new(Schema::new(fields)))
(MYSQL_TYPE_FLOAT, _) => "float",
(MYSQL_TYPE_DOUBLE, _) => "double",

(MYSQL_TYPE_TIMESTAMP, _) => "timestamp",
(MYSQL_TYPE_DATE, _) => "date",
(MYSQL_TYPE_TIME, _) => "time",
(MYSQL_TYPE_DATETIME, _) => "datetime",
(MYSQL_TYPE_YEAR, _) => "year",
(MYSQL_TYPE_NEWDATE, _) => "newdate",

(MYSQL_TYPE_TIMESTAMP2, _) => "timestamp2",
(MYSQL_TYPE_DATETIME2, _) => "datetime2",
(MYSQL_TYPE_TIME2, _) => "time2",
(MYSQL_TYPE_TYPED_ARRAY, _) => "typed_array",

(MYSQL_TYPE_NEWDECIMAL, _) => "newdecimal",
(MYSQL_TYPE_DECIMAL, _) => "decimal",

(MYSQL_TYPE_VARCHAR, _) => "varchar",
(MYSQL_TYPE_VAR_STRING, _) => "var_string",
(MYSQL_TYPE_STRING, _) => "string",
(MYSQL_TYPE_JSON, _) => "json",

(MYSQL_TYPE_ENUM, _) => "enum",
(MYSQL_TYPE_SET, _) => "set",

(MYSQL_TYPE_BIT, _) => "bit",
(MYSQL_TYPE_TINY_BLOB, _) => "tiny_blob",
(MYSQL_TYPE_MEDIUM_BLOB, _) => "medium_blob",
(MYSQL_TYPE_LONG_BLOB, _) => "long_blob",
(MYSQL_TYPE_BLOB, _) => "blob",

(MYSQL_TYPE_GEOMETRY, _) => "geometry",
(MYSQL_TYPE_UNKNOWN, _) => "unknown",
}
}
2 changes: 0 additions & 2 deletions connector_arrow/src/postgres/protocol_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ impl<'a> FromSql<'a> for Binary<'a> {
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
Ok(if matches!(ty, &Type::VARBIT | &Type::BIT) {
let varbit = postgres_protocol::types::varbit_from_sql(raw)?;
dbg!(varbit.len());
dbg!(varbit.bytes());
Binary(varbit.bytes())
} else {
Binary(postgres_protocol::types::bytea_from_sql(raw))
Expand Down
4 changes: 0 additions & 4 deletions connector_arrow/src/util/row_collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ pub fn next_batch_from_rows<'stmt, T: RowsReader<'stmt>>(
if let Some(mut cell_reader) = rows_reader.next_row()? {
writer.prepare_for_batch(1)?;

dbg!("row");

for field in &schema.fields {
dbg!(field);

let cell_ref = cell_reader.next_cell();

transport::transport(field, cell_ref.unwrap(), &mut writer)?;
Expand Down

0 comments on commit 4148cb2

Please sign in to comment.