Skip to content

Commit

Permalink
[feature][dingo-calcite] Support view
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 committed Oct 31, 2024
1 parent 6637a47 commit be61d32
Show file tree
Hide file tree
Showing 20 changed files with 558 additions and 66 deletions.
160 changes: 159 additions & 1 deletion dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.dingodb.calcite;

import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableList;
import io.dingodb.calcite.grammar.ddl.DingoSqlCreateTable;
import io.dingodb.calcite.grammar.ddl.SqlAlterAddColumn;
import io.dingodb.calcite.grammar.ddl.SqlAlterAddIndex;
Expand Down Expand Up @@ -46,6 +47,7 @@
import io.dingodb.calcite.schema.SubCalciteSchema;
import io.dingodb.calcite.schema.SubSnapshotSchema;
import io.dingodb.calcite.utils.IndexParameterUtils;
import io.dingodb.calcite.type.DingoSqlTypeFactory;
import io.dingodb.common.ddl.ActionType;
import io.dingodb.common.ddl.SchemaDiff;
import io.dingodb.common.log.LogUtils;
Expand Down Expand Up @@ -95,6 +97,8 @@
import io.dingodb.verify.service.UserService;
import io.dingodb.verify.service.UserServiceProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.ContextSqlValidator;
Expand All @@ -103,20 +107,26 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.server.DdlExecutorImpl;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlTypeNameSpec;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.ddl.DingoSqlColumn;
import org.apache.calcite.sql.ddl.DingoSqlKeyConstraint;
import org.apache.calcite.sql.ddl.SqlCreateTable;
import org.apache.calcite.sql.ddl.SqlCreateView;
import org.apache.calcite.sql.ddl.SqlDropSchema;
import org.apache.calcite.sql.ddl.SqlDropTable;
import org.apache.calcite.sql.ddl.SqlDropView;
import org.apache.calcite.sql.ddl.SqlKeyConstraint;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
Expand All @@ -138,6 +148,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static io.dingodb.calcite.DingoParser.PARSER_CONFIG;
import static io.dingodb.calcite.runtime.DingoResource.DINGO_RESOURCE;
import static io.dingodb.common.util.Optional.mapOrNull;
import static io.dingodb.common.util.PrivilegeUtils.getRealAddress;
Expand Down Expand Up @@ -454,7 +465,8 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {

tableDefinition.setIndices(indexTableDefinitions);
DdlService ddlService = DdlService.root();
ddlService.createTableWithInfo(schema.getSchemaName(), tableName, tableDefinition, connId, create.getOriginalCreateSql());
ddlService.createTableWithInfo(schema.getSchemaName(), tableName, tableDefinition,
connId, create.getOriginalCreateSql());

RootCalciteSchema rootCalciteSchema = (RootCalciteSchema) context.getMutableRootSchema();
RootSnapshotSchema rootSnapshotSchema = (RootSnapshotSchema) rootCalciteSchema.schema;
Expand Down Expand Up @@ -535,6 +547,48 @@ public void execute(SqlDropTable drop, CalcitePrepare.Context context) throws Ex
}
}

public void execute(SqlDropView sqlDropView, CalcitePrepare.Context context) throws Exception {
final Timer.Context timeCtx = DingoMetrics.getTimeContext("dropView");
long start = System.currentTimeMillis();
LogUtils.info(log, "DDL execute: {}", sqlDropView);
String connId = (String) context.getDataContext().get("connId");
final SubSnapshotSchema schema = getSnapShotSchema(sqlDropView.name, context, sqlDropView.ifExists);
if (schema == null) {
return;
}
final String tableName = getTableName(sqlDropView.name);
SchemaInfo schemaInfo = schema.getSchemaInfo(schema.getSchemaName());
if (schemaInfo == null) {
throw DINGO_RESOURCE.unknownSchema(schema.getSchemaName()).ex();
}
Table table = schema.getTableInfo(tableName);
if (table == null) {
if (sqlDropView.ifExists) {
return;
} else {
throw DINGO_RESOURCE.unknownTable(schema.getSchemaName() + "." + tableName).ex();
}
}
DdlService ddlService = DdlService.root();
ddlService.dropTable(schemaInfo, table.tableId.seq, tableName, connId);

RootCalciteSchema rootCalciteSchema = (RootCalciteSchema) context.getMutableRootSchema();
RootSnapshotSchema rootSnapshotSchema = (RootSnapshotSchema) rootCalciteSchema.schema;
SchemaDiff diff = SchemaDiff.builder().schemaId(schema.getSchemaId())
.type(ActionType.ActionDropTable).tableId(table.tableId.seq).build();
rootSnapshotSchema.applyDiff(diff);

timeCtx.stop();
long cost = System.currentTimeMillis() - start;
if (cost > 10000) {
LogUtils.info(log, "drop view take long time, cost:{}, schemaName:{}, tableName:{}",
cost, schemaInfo.getName(), tableName);
} else {
LogUtils.info(log, "[ddl] drop view success, cost:{}, schemaName:{}, tableName:{}",
cost, schema.getSchemaName(), tableName);
}
}

public void execute(@NonNull SqlTruncate truncate, CalcitePrepare.Context context) throws Exception {
long start = System.currentTimeMillis();
final Timer.Context timeCtx = DingoMetrics.getTimeContext("truncateTable");
Expand Down Expand Up @@ -666,6 +720,92 @@ public void execute(@NonNull SqlRevoke sqlRevoke, CalcitePrepare.Context context
}
}

public void execute(SqlCreateView sqlCreateView, CalcitePrepare.Context context) {
LogUtils.info(log, "DDL execute: {}", sqlCreateView);
String connId = (String) context.getDataContext().get("connId");
SubSnapshotSchema schema = getSnapShotSchema(sqlCreateView.name, context, false);
if (schema == null) {
if (context.getDefaultSchemaPath() != null && !context.getDefaultSchemaPath().isEmpty()) {
throw DINGO_RESOURCE.unknownSchema(context.getDefaultSchemaPath().get(0)).ex();
} else {
throw DINGO_RESOURCE.unknownSchema("DINGO").ex();
}
}
String tableName = getTableName(sqlCreateView.name);

// Check table exist
if (schema.getTable(tableName) != null) {
throw DINGO_RESOURCE.tableExists(tableName).ex();
}
SqlNode query = renameColumns(sqlCreateView.columnList, sqlCreateView.query);

List<String> schemas = new ArrayList<>();
schemas.add(schema.getSchemaName());
List<List<String>> schemaPaths = new ArrayList<>();
schemaPaths.add(schemas);
schemaPaths.add(new ArrayList<>());

CalciteConnectionConfigImpl config;
config = new CalciteConnectionConfigImpl(new Properties());
config.set(CalciteConnectionProperty.CASE_SENSITIVE, String.valueOf(PARSER_CONFIG.caseSensitive()));
DingoCatalogReader catalogReader = new DingoCatalogReader(context.getRootSchema(),
schemaPaths, DingoSqlTypeFactory.INSTANCE, config);
DingoSqlValidator sqlValidator = new DingoSqlValidator(catalogReader, DingoSqlTypeFactory.INSTANCE);
SqlNode sqlNode = sqlValidator.validate(query);
RelDataType type = sqlValidator.getValidatedNodeType(sqlNode);

RelDataType jdbcType = type;
if (!type.isStruct()) {
jdbcType = sqlValidator.getTypeFactory().builder().add("$0", type).build();
}

List<ColumnDefinition> columnDefinitionList = jdbcType.getFieldList()
.stream().map(f -> {
int precision = f.getType().getPrecision();
int scale = f.getType().getScale();
String name = f.getType().getSqlTypeName().getName();
if ("BIGINT".equals(name) || "FLOAT".equals(name)) {
precision = -1;
scale = -2147483648;
}
boolean nullable = f.getType().isNullable();
if (sqlValidator.isHybridSearch() && "BIGINT".equals(name)) {
nullable = true;
}
return ColumnDefinition
.builder()
.name(f.getName())
.type(f.getType().getSqlTypeName().getName())
.scale(scale)
.precision(precision)
.nullable(nullable)
.build();
})
.collect(Collectors.toList());

String schemaName = schema.getSchemaName();

String sql = query.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
// build tableDefinition
TableDefinition tableDefinition = TableDefinition.builder()
.name(tableName)
.columns(columnDefinitionList)
.version(1)
.ttl(0)
.replica(3)
.createSql(sql)
.comment("VIEW")
.charset(null)
.collate(null)
.tableType("VIEW")
.rowFormat(null)
.createTime(System.currentTimeMillis())
.updateTime(0)
.build();
DdlService ddlService = DdlService.root();
ddlService.createViewWithInfo(schemaName, tableName, tableDefinition, connId, null);
}

public void execute(@NonNull SqlCreateUser sqlCreateUser, CalcitePrepare.Context context) {
LogUtils.info(log, "DDL execute: {}", sqlCreateUser.toLog());
UserDefinition userDefinition = UserDefinition.builder().user(sqlCreateUser.user)
Expand Down Expand Up @@ -1742,4 +1882,22 @@ private static SubSnapshotSchema getSnapShotSchema(
return Pair.of(getSnapShotSchema(id, context, false), getTableName(id));
}

static SqlNode renameColumns(@Nullable SqlNodeList columnList,
SqlNode query) {
if (columnList == null) {
return query;
}
final SqlParserPos p = query.getParserPosition();
final SqlNodeList selectList = SqlNodeList.SINGLETON_STAR;
final SqlCall from =
SqlStdOperatorTable.AS.createCall(p,
ImmutableList.<SqlNode>builder()
.add(query)
.add(new SqlIdentifier("_", p))
.addAll(columnList)
.build());
return new SqlSelect(p, null, selectList, from, null, null, null, null,
null, null, null, null);
}

}
112 changes: 96 additions & 16 deletions dingo-calcite/src/main/java/io/dingodb/calcite/DingoTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,45 @@
import io.dingodb.calcite.rel.LogicalDingoTableScan;
import io.dingodb.calcite.schema.SubSnapshotSchema;
import io.dingodb.calcite.type.converter.DefinitionMapper;
import io.dingodb.calcite.utils.HybridNodeUtils;
import io.dingodb.common.CommonId;
import io.dingodb.common.log.LogUtils;
import io.dingodb.meta.TableStatistic;
import io.dingodb.common.table.HybridSearchTable;
import io.dingodb.meta.entity.IndexTable;
import io.dingodb.meta.entity.Table;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.HintPredicate;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.dingo.DingoSqlParserImpl;
import org.apache.calcite.sql2rel.InitializerExpressionFactory;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.util.ImmutableBitSet;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -65,7 +79,6 @@ public class DingoTable extends AbstractTable implements TranslatableTable {
public DingoTable(
@NonNull DingoParserContext context,
@NonNull List<String> names,
@NonNull TableStatistic tableStatistic,
@NonNull Table table
) {
super();
Expand Down Expand Up @@ -102,20 +115,87 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {

@Override
public RelNode toRel(RelOptTable.@NonNull ToRelContext context, RelOptTable relOptTable) {
return new LogicalDingoTableScan(
context.getCluster(),
context.getCluster().traitSet(),
context.getTableHints(),
relOptTable,
null,
null,
null,
null,
null,
((DingoParserContext) context.getCluster().getPlanner().getContext()).isPushDown(),
// && Optional.mapOrGet(table.engine, __ -> !__.contains("TXN"), () -> true),
false
);
DingoTable dingoTable = relOptTable.unwrap(DingoTable.class);
if ((!dingoTable.getTable().getTableType().equalsIgnoreCase("VIEW"))
|| dingoTable.getSchema().getSchemaName().equalsIgnoreCase("INFORMATION_SCHEMA")) {
return new LogicalDingoTableScan(
context.getCluster(),
context.getCluster().traitSet(),
context.getTableHints(),
relOptTable,
null,
null,
null,
null,
null,
((DingoParserContext) context.getCluster().getPlanner().getContext()).isPushDown(),
false
);
} else {
SqlParser.Config config = SqlParser.config().withParserFactory(DingoSqlParserImpl::new);
SqlParser parser = SqlParser.create(dingoTable.getTable().createSql, config);
try {
SqlNode sqlNode = parser.parseStmt();
DingoSqlValidator dingoSqlValidator = this.context.getSqlValidator();
sqlNode = dingoSqlValidator.validate(sqlNode);

if (dingoSqlValidator.isHybridSearch()) {
SqlNode originalSqlNode;
parser = SqlParser.create(dingoTable.getTable().createSql, config);
originalSqlNode = parser.parseQuery();
//syntacticSugar(originalSqlNode);
if (dingoSqlValidator.getHybridSearchMap().size() == 1) {
String hybridSearchSql = dingoSqlValidator.getHybridSearchSql();
LogUtils.info(log, "HybridSearchSql: {}", hybridSearchSql);
SqlNode hybridSqlNode;
parser = SqlParser.create(hybridSearchSql, DingoParser.PARSER_CONFIG);
hybridSqlNode = parser.parseQuery();
//syntacticSugar(hybridSqlNode);
HybridNodeUtils.lockUpHybridSearchNode(originalSqlNode, hybridSqlNode);
} else {
ConcurrentHashMap<SqlBasicCall, SqlNode> sqlNodeHashMap = new ConcurrentHashMap<>();
for (Map.Entry<SqlBasicCall, String> entry : dingoSqlValidator
.getHybridSearchMap().entrySet()) {
SqlBasicCall key = entry.getKey();
String value = entry.getValue();
SqlNode hybridSqlNode;
parser = SqlParser.create(value, config);
hybridSqlNode = parser.parseQuery();
//syntacticSugar(hybridSqlNode);
sqlNodeHashMap.put(key, hybridSqlNode);
}
HybridNodeUtils.lockUpHybridSearchNode(originalSqlNode, sqlNodeHashMap);
}
//LogUtils.info(log, "HybridSearch Rewrite Sql: {}", originalSqlNode.toString());
try {
sqlNode = dingoSqlValidator.validate(originalSqlNode);
} catch (CalciteContextException e) {
LogUtils.error(log, "HybridSearch parse and validate error, sql: <[{}]>.",
table.getCreateSql(), e);
throw e;
}
}
HintPredicate hintPredicate = (hint, rel) -> true;
HintStrategyTable hintStrategyTable = new HintStrategyTable.Builder()
.hintStrategy("vector_pre", hintPredicate)
.hintStrategy(HybridSearchTable.HINT_NAME, hintPredicate)
.hintStrategy("disable_index", hintPredicate)
.hintStrategy("text_search_pre", hintPredicate)
.build();
SqlToRelConverter sqlToRelConverter = new DingoSqlToRelConverter(
ViewExpanders.simpleContext(context.getCluster()),
dingoSqlValidator,
this.context.getCatalogReader(),
context.getCluster(),
sqlNode.getKind() == SqlKind.EXPLAIN,
hintStrategyTable
);

return sqlToRelConverter.convertQuery(sqlNode, false, true).rel;
} catch (SqlParseException e) {
throw new RuntimeException(e);
}
}
}

@Override
Expand Down
Loading

0 comments on commit be61d32

Please sign in to comment.