Skip to content

Commit

Permalink
Merge branch 'main' into better-cse-identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jun 23, 2024
2 parents d69033a + 08e4e6a commit 1a6f8e7
Show file tree
Hide file tree
Showing 116 changed files with 2,490 additions and 917 deletions.
170 changes: 112 additions & 58 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cargo run --example csv_sql
- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
- [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file
Expand Down
664 changes: 664 additions & 0 deletions datafusion-examples/examples/advanced_parquet_index.rs

Large diffs are not rendered by default.

44 changes: 20 additions & 24 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result, ScalarValue};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
};
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_optimizer::optimizer::{ApplyOrder, Optimizer};
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
Expand Down Expand Up @@ -131,32 +131,28 @@ impl OptimizerRule for MyOptimizerRule {
"my_optimizer_rule"
}

fn try_optimize(
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
let optimized_plan = utils::optimize_children(self, plan, config)?;
match optimized_plan {
Some(LogicalPlan::Filter(filter)) => {
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = my_rewrite(filter.predicate.clone())?;
Ok(Some(LogicalPlan::Filter(Filter::try_new(
Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
filter.input.clone(),
)?)))
}
Some(optimized_plan) => Ok(Some(optimized_plan)),
None => match plan {
LogicalPlan::Filter(filter) => {
let predicate = my_rewrite(filter.predicate.clone())?;
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input.clone(),
)?)))
}
_ => Ok(None),
},
_ => Ok(Transformed::no(plan)),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ impl Column {
})
}

/// return the column's name.
///
/// Note: This ignores the relation and returns the column name only.
pub fn name(&self) -> &str {
&self.name
}

/// Serialize column into a flat name string
pub fn flat_name(&self) -> String {
match &self.relation {
Expand Down
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,13 @@ pub struct TableParquetOptions {
pub key_value_metadata: HashMap<String, Option<String>>,
}

impl TableParquetOptions {
/// Return new default TableParquetOptions
pub fn new() -> Self {
Self::default()
}
}

impl ConfigField for TableParquetOptions {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
self.global.visit(v, key_prefix, description);
Expand Down Expand Up @@ -1559,6 +1566,7 @@ config_namespace! {
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down Expand Up @@ -1624,6 +1632,13 @@ impl CsvOptions {
self
}

/// Set true to indicate that the CSV quotes should be doubled.
/// - default to true
pub fn with_double_quote(mut self, double_quote: bool) -> Self {
self.double_quote = Some(double_quote);
self
}

/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down Expand Up @@ -1668,6 +1683,7 @@ pub enum FormatOptions {
AVRO,
ARROW,
}

impl Display for FormatOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = match self {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
if let Some(v) = &value.null_value {
builder = builder.with_null(v.into())
}
if let Some(v) = &value.escape {
builder = builder.with_escape(*v)
}
if let Some(v) = &value.double_quote {
builder = builder.with_double_quote(*v)
}
Ok(CsvWriterOptions {
writer_options: builder,
compression: value.compression,
Expand Down
Loading

0 comments on commit 1a6f8e7

Please sign in to comment.