Skip to content

Commit

Permalink
[fix][dingo-store-proxy] Support ddl region split
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 authored and githubgxll committed Oct 29, 2024
1 parent a4eb5f6 commit 6637a47
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void run() {
if (metaService == null) {
return;
}
Table td = (Table) DdlService.root().getTable(schemaName, tableName);
Table td = DdlService.root().getTable(schemaName, tableName);
if (td == null) {
return;
}
Expand All @@ -144,17 +144,21 @@ public void run() {
List<CountMinSketch> cmSketchList = new ArrayList<>();
List<StatsNormal> statsNormals = new ArrayList<>();
long end1 = System.currentTimeMillis();
LogUtils.info(log, "step1 cost:{}", (end1 - start));

// varchar -> count-min-sketch int,float,double,date,time,timestamp -> histogram
// ndv, nullCount -> normal
typeMetricAdaptor(td, histogramList, cmSketchList, statsNormals, cmSketchWidth, cmSketchHeight);
// par scan get min, max
// histogram equ-width need max, min
buildHistogram(histogramList, distributions, tableId, td);
try {
buildHistogram(histogramList, distributions, tableId, td);
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
//histogramList.clear();
}

long end2 = System.currentTimeMillis();
LogUtils.info(log, "step2 cost:{}", (end2 - end1));
LogUtils.info(log, "init type cost:{}", (end2 - end1));
List<TableStats> statsList = null;
try {
List<CompletableFuture<TableStats>> futureList = getCompletableFutures(td, tableId, distributions,
Expand All @@ -178,22 +182,22 @@ public void run() {
return;
}
long end3 = System.currentTimeMillis();
LogUtils.info(log, "step3 cost:{}", (end3 - end2));
LogUtils.info(log, "build stats cost:{}", (end3 - end2));
TableStats.mergeStats(statsList);
TableStats tableStats = statsList.get(0);
long end4 = System.currentTimeMillis();
LogUtils.info(log, "step4 merge success cost:{}", (end4 - end3));
LogUtils.info(log, "stats merge success cost:{}", (end4 - end3));

// save stats to store
addHistogram(tableStats.getHistogramList());
long end5 = System.currentTimeMillis();
LogUtils.info(log, "step5 cost:{}", (end5 - end4));
LogUtils.info(log, "add histogram cost:{}", (end5 - end4));
addCountMinSketch(tableStats.getCountMinSketchList());
long end6 = System.currentTimeMillis();
LogUtils.info(log, "step6 cost:{}", (end6 - end5));
LogUtils.info(log, "add count min sketch cost:{}", (end6 - end5));
addStatsNormal(tableStats.getStatsNormalList());
long end7 = System.currentTimeMillis();
LogUtils.info(log, "step7 cost:{}", (end7 - end6));
LogUtils.info(log, "add stats normal cost:{}", (end7 - end6));
// update analyze job status
cache(tableStats);
rowCount = tableStats.getRowCount();
Expand Down
1 change: 1 addition & 0 deletions dingo-common/src/main/java/io/dingodb/common/CommonId.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public enum CommonType {
SDK(121),
META(122),
FILL_BACK(123),
DDL(124),
;

public final int code;
Expand Down
9 changes: 1 addition & 8 deletions dingo-executor/src/main/resources/mysql-gcDeleteRange.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@
"nullable": false
},
{
"name": "data_type",
"type": "varchar",
"scale": -2147483648,
"primary": -1,
"nullable": false
},
{
"name": "element_id",
"name": "region_id",
"type": "bigint",
"scale": -2147483648,
"precision": -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;

@Slf4j
Expand Down Expand Up @@ -421,12 +420,10 @@ public static void doDdlJob(DdlJob job) {
throw new RuntimeException("wait ddl timeout");
}
}
if (DdlUtil.historyJobEtcd) {
try {
InfoSchemaService.root().delHistoryDDLJob(job.getId());
} catch (Exception e) {
LogUtils.error(log, "ddlhandler del history job:" + e.getMessage(), e);
}
try {
InfoSchemaService.root().delHistoryDDLJob(job.getId());
} catch (Exception e) {
LogUtils.error(log, "[ddl-error] ddl handler del history job:" + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.dingodb.sdk.service.entity.meta.WatchRequest.RequestUnionNest.ProgressRequest;
import io.dingodb.sdk.service.entity.meta.WatchResponse;
import io.dingodb.store.proxy.service.TsoService;
import io.dingodb.store.service.MetaStoreKv;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand All @@ -68,7 +69,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.dingodb.common.CommonId.CommonType.DDL;
import static io.dingodb.common.CommonId.CommonType.INDEX;
import static io.dingodb.common.CommonId.CommonType.META;
import static io.dingodb.common.CommonId.CommonType.TABLE;
import static io.dingodb.sdk.service.entity.meta.MetaEventType.META_EVENT_REGION_CREATE;
import static io.dingodb.sdk.service.entity.meta.MetaEventType.META_EVENT_REGION_DELETE;
Expand Down Expand Up @@ -216,6 +219,31 @@ private List<TableDefinitionWithId> getIndexes(TableDefinitionWithId tableWithId
@SneakyThrows
private NavigableMap<ComparableByteArray, RangeDistribution> loadDistribution(CommonId tableId) {
try {
if (tableId.type == META || tableId.type == DDL) {
byte[] startKey = MetaStoreKv.getInstance().getMetaRegionKey();
byte[] endKey = MetaStoreKv.getInstance().getMetaRegionEndKey();

if (tableId.type == DDL) {
startKey = MetaStoreKv.getDdlInstance().getMetaRegionKey();
endKey = MetaStoreKv.getDdlInstance().getMetaRegionEndKey();
}
List<Object> regionList = infoSchemaService
.scanRegions(startKey, endKey);
NavigableMap<ComparableByteArray, RangeDistribution> result = new TreeMap<>();
regionList
.forEach(object -> {
ScanRegionInfo scanRegionInfo = (ScanRegionInfo) object;
RangeDistribution distribution = RangeDistribution.builder()
.id(new CommonId(tableId.type,
0, scanRegionInfo.getRegionId()))
.startKey(scanRegionInfo.getRange().getStartKey())
.endKey(scanRegionInfo.getRange().getEndKey())
.build();
result.put(new ComparableByteArray(distribution.getStartKey(), 1), distribution);
});
return result;
}

TableDefinitionWithId tableWithId = (TableDefinitionWithId) infoSchemaService.getTable(
tableId
);
Expand Down Expand Up @@ -270,9 +298,15 @@ private static RangeDistribution mapping(
public void invalidateDistribution(MetaEventRegion metaEventRegion) {
RegionDefinition definition = metaEventRegion.getDefinition();
LogUtils.info(log, "Invalid table distribution {}", definition);
distributionCache.invalidate(new CommonId(TABLE, definition.getSchemaId(), definition.getTableId()));
if (definition.getIndexId() != 0) {
distributionCache.invalidate(new CommonId(INDEX, definition.getTableId(), definition.getIndexId()));
if (definition.getSchemaId() == 1001) {
distributionCache.invalidate(new CommonId(META, 0, 0));
} else if (definition.getSchemaId() == 1002) {
distributionCache.invalidate(new CommonId(DDL, 0, 0));
} else {
distributionCache.invalidate(new CommonId(TABLE, definition.getSchemaId(), definition.getTableId()));
if (definition.getIndexId() != 0) {
distributionCache.invalidate(new CommonId(INDEX, definition.getTableId(), definition.getIndexId()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CommitBase(StoreService storeService, CommonId partId) {
public void commit(byte[] key, byte[] value, int opCode, long startTs) {
CommonId txnId = getTxnId(startTs);
boolean need2PcPreWrite = false;
boolean need2PcCommit = false;
boolean need2PcCommit;

try {
Mutation mutation = new Mutation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import io.dingodb.common.codec.CodecKvUtil;
import io.dingodb.common.ddl.DdlJob;
import io.dingodb.common.ddl.DdlUtil;
import io.dingodb.common.ddl.JobState;
import io.dingodb.common.ddl.SchemaDiff;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.meta.SchemaInfo;
import io.dingodb.common.meta.Tenant;
import io.dingodb.common.tenant.TenantConstant;
import io.dingodb.meta.DdlService;
import io.dingodb.meta.InfoSchemaServiceProvider;
import io.dingodb.meta.ddl.InfoSchemaBuilder;
import io.dingodb.meta.entity.IndexTable;
Expand Down Expand Up @@ -857,59 +855,17 @@ public void updateIndex(long tableId, Object index) {

@Override
public DdlJob getHistoryDDLJob(long jobId) {
if (DdlUtil.historyJobEtcd) {
byte[] key = historyJobIdKey(jobId);
RangeRequest rangeRequest = RangeRequest.builder()
.key(key)
.build();
RangeResponse response = this.versionService.kvRange(System.identityHashCode(rangeRequest),
rangeRequest);
if (response.getKvs() != null && !response.getKvs().isEmpty()) {
byte[] val = response.getKvs().get(0).getKv().getValue();
if (val == null) {
return DdlJob.builder().id(jobId).error(null).build();
}
String valStr = new String(val);
if ("0".equalsIgnoreCase(valStr)) {
return DdlJob.builder().id(jobId).error(null)
.state(JobState.jobStateSynced).build();
} else {
return DdlJob.builder().id(jobId).error(new String(val)).build();
}
}
byte[] val = this.txn.ddlHGet(mHistoryJobPrefixKeys, jobIdKey(jobId));
if (val == null) {
return null;
} else {
byte[] val = this.txn.ddlHGet(mHistoryJobPrefixKeys, jobIdKey(jobId));
if (val == null) {
return null;
}
return (DdlJob) getObjFromBytes(val, DdlJob.class);
}
return (DdlJob) getObjFromBytes(val, DdlJob.class);
}

@Override
public void addHistoryDDLJob(DdlJob job, boolean updateRawArgs) {
if (DdlUtil.historyJobEtcd) {
byte[] data;
if (job.getError() != null) {
data = job.getError().getBytes();
} else {
data = "0".getBytes();
}
byte[] key = historyJobIdKey(job.getId());
PutRequest putRequest = PutRequest.builder()
.keyValue(
KeyValue.builder()
.key(key)
.value(data)
.build()
)
.build();
putKvToCoordinator(putRequest, 3);
} else {
byte[] data = job.encode(updateRawArgs);
this.txn.ddlHPut(mHistoryJobPrefixKeys, jobIdKey(job.getId()), data);
}
byte[] data = job.encode(updateRawArgs);
this.txn.ddlHPut(mHistoryJobPrefixKeys, jobIdKey(job.getId()), data);
}

@Override
Expand Down
Loading

0 comments on commit 6637a47

Please sign in to comment.