From 6637a477a02b1065cfb0e7d97f45fa61bfe9babc Mon Sep 17 00:00:00 2001 From: guojn1 Date: Tue, 29 Oct 2024 17:38:08 +0800 Subject: [PATCH] [fix][dingo-store-proxy] Support ddl region split --- .../calcite/stats/task/AnalyzeTask.java | 22 +++--- .../main/java/io/dingodb/common/CommonId.java | 1 + .../main/resources/mysql-gcDeleteRange.json | 9 +-- .../dingodb/store/proxy/ddl/DdlHandler.java | 11 ++- .../dingodb/store/proxy/meta/MetaCache.java | 40 ++++++++++- .../io/dingodb/store/service/CommitBase.java | 2 +- .../store/service/InfoSchemaService.java | 54 ++------------- .../io/dingodb/store/service/MetaStoreKv.java | 68 ++++++++++++++----- 8 files changed, 114 insertions(+), 93 deletions(-) diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java index eaa369fc71..bdb8219563 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/task/AnalyzeTask.java @@ -123,7 +123,7 @@ public void run() { if (metaService == null) { return; } - Table td = (Table) DdlService.root().getTable(schemaName, tableName); + Table td = DdlService.root().getTable(schemaName, tableName); if (td == null) { return; } @@ -144,17 +144,21 @@ public void run() { List cmSketchList = new ArrayList<>(); List statsNormals = new ArrayList<>(); long end1 = System.currentTimeMillis(); - LogUtils.info(log, "step1 cost:{}", (end1 - start)); // varchar -> count-min-sketch int,float,double,date,time,timestamp -> histogram // ndv, nullCount -> normal typeMetricAdaptor(td, histogramList, cmSketchList, statsNormals, cmSketchWidth, cmSketchHeight); // par scan get min, max // histogram equ-width need max, min - buildHistogram(histogramList, distributions, tableId, td); + try { + buildHistogram(histogramList, distributions, tableId, td); + } catch (Exception e) { + LogUtils.error(log, e.getMessage(), e); + //histogramList.clear(); + } long end2 = System.currentTimeMillis(); - LogUtils.info(log, "step2 cost:{}", (end2 - end1)); + LogUtils.info(log, "init type cost:{}", (end2 - end1)); List statsList = null; try { List> futureList = getCompletableFutures(td, tableId, distributions, @@ -178,22 +182,22 @@ public void run() { return; } long end3 = System.currentTimeMillis(); - LogUtils.info(log, "step3 cost:{}", (end3 - end2)); + LogUtils.info(log, "build stats cost:{}", (end3 - end2)); TableStats.mergeStats(statsList); TableStats tableStats = statsList.get(0); long end4 = System.currentTimeMillis(); - LogUtils.info(log, "step4 merge success cost:{}", (end4 - end3)); + LogUtils.info(log, "stats merge success cost:{}", (end4 - end3)); // save stats to store addHistogram(tableStats.getHistogramList()); long end5 = System.currentTimeMillis(); - LogUtils.info(log, "step5 cost:{}", (end5 - end4)); + LogUtils.info(log, "add histogram cost:{}", (end5 - end4)); addCountMinSketch(tableStats.getCountMinSketchList()); long end6 = System.currentTimeMillis(); - LogUtils.info(log, "step6 cost:{}", (end6 - end5)); + LogUtils.info(log, "add count min sketch cost:{}", (end6 - end5)); addStatsNormal(tableStats.getStatsNormalList()); long end7 = System.currentTimeMillis(); - LogUtils.info(log, "step7 cost:{}", (end7 - end6)); + LogUtils.info(log, "add stats normal cost:{}", (end7 - end6)); // update analyze job status cache(tableStats); rowCount = tableStats.getRowCount(); diff --git a/dingo-common/src/main/java/io/dingodb/common/CommonId.java b/dingo-common/src/main/java/io/dingodb/common/CommonId.java index 05efabc40d..22090b474c 100644 --- a/dingo-common/src/main/java/io/dingodb/common/CommonId.java +++ b/dingo-common/src/main/java/io/dingodb/common/CommonId.java @@ -91,6 +91,7 @@ public enum CommonType { SDK(121), META(122), FILL_BACK(123), + DDL(124), ; public final int code; diff --git a/dingo-executor/src/main/resources/mysql-gcDeleteRange.json b/dingo-executor/src/main/resources/mysql-gcDeleteRange.json index d60595811a..ecdd4803d3 100644 --- a/dingo-executor/src/main/resources/mysql-gcDeleteRange.json +++ b/dingo-executor/src/main/resources/mysql-gcDeleteRange.json @@ -8,14 +8,7 @@ "nullable": false }, { - "name": "data_type", - "type": "varchar", - "scale": -2147483648, - "primary": -1, - "nullable": false - }, - { - "name": "element_id", + "name": "region_id", "type": "bigint", "scale": -2147483648, "precision": -1, diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java index af6363d76c..207ac875b7 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java @@ -47,7 +47,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; @Slf4j @@ -421,12 +420,10 @@ public static void doDdlJob(DdlJob job) { throw new RuntimeException("wait ddl timeout"); } } - if (DdlUtil.historyJobEtcd) { - try { - InfoSchemaService.root().delHistoryDDLJob(job.getId()); - } catch (Exception e) { - LogUtils.error(log, "ddlhandler del history job:" + e.getMessage(), e); - } + try { + InfoSchemaService.root().delHistoryDDLJob(job.getId()); + } catch (Exception e) { + LogUtils.error(log, "[ddl-error] ddl handler del history job:" + e.getMessage(), e); } } diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java index bb2c985449..0558fa7323 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/meta/MetaCache.java @@ -52,6 +52,7 @@ import io.dingodb.sdk.service.entity.meta.WatchRequest.RequestUnionNest.ProgressRequest; import io.dingodb.sdk.service.entity.meta.WatchResponse; import io.dingodb.store.proxy.service.TsoService; +import io.dingodb.store.service.MetaStoreKv; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; @@ -68,7 +69,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static io.dingodb.common.CommonId.CommonType.DDL; import static io.dingodb.common.CommonId.CommonType.INDEX; +import static io.dingodb.common.CommonId.CommonType.META; import static io.dingodb.common.CommonId.CommonType.TABLE; import static io.dingodb.sdk.service.entity.meta.MetaEventType.META_EVENT_REGION_CREATE; import static io.dingodb.sdk.service.entity.meta.MetaEventType.META_EVENT_REGION_DELETE; @@ -216,6 +219,31 @@ private List getIndexes(TableDefinitionWithId tableWithId @SneakyThrows private NavigableMap loadDistribution(CommonId tableId) { try { + if (tableId.type == META || tableId.type == DDL) { + byte[] startKey = MetaStoreKv.getInstance().getMetaRegionKey(); + byte[] endKey = MetaStoreKv.getInstance().getMetaRegionEndKey(); + + if (tableId.type == DDL) { + startKey = MetaStoreKv.getDdlInstance().getMetaRegionKey(); + endKey = MetaStoreKv.getDdlInstance().getMetaRegionEndKey(); + } + List regionList = infoSchemaService + .scanRegions(startKey, endKey); + NavigableMap result = new TreeMap<>(); + regionList + .forEach(object -> { + ScanRegionInfo scanRegionInfo = (ScanRegionInfo) object; + RangeDistribution distribution = RangeDistribution.builder() + .id(new CommonId(tableId.type, + 0, scanRegionInfo.getRegionId())) + .startKey(scanRegionInfo.getRange().getStartKey()) + .endKey(scanRegionInfo.getRange().getEndKey()) + .build(); + result.put(new ComparableByteArray(distribution.getStartKey(), 1), distribution); + }); + return result; + } + TableDefinitionWithId tableWithId = (TableDefinitionWithId) infoSchemaService.getTable( tableId ); @@ -270,9 +298,15 @@ private static RangeDistribution mapping( public void invalidateDistribution(MetaEventRegion metaEventRegion) { RegionDefinition definition = metaEventRegion.getDefinition(); LogUtils.info(log, "Invalid table distribution {}", definition); - distributionCache.invalidate(new CommonId(TABLE, definition.getSchemaId(), definition.getTableId())); - if (definition.getIndexId() != 0) { - distributionCache.invalidate(new CommonId(INDEX, definition.getTableId(), definition.getIndexId())); + if (definition.getSchemaId() == 1001) { + distributionCache.invalidate(new CommonId(META, 0, 0)); + } else if (definition.getSchemaId() == 1002) { + distributionCache.invalidate(new CommonId(DDL, 0, 0)); + } else { + distributionCache.invalidate(new CommonId(TABLE, definition.getSchemaId(), definition.getTableId())); + if (definition.getIndexId() != 0) { + distributionCache.invalidate(new CommonId(INDEX, definition.getTableId(), definition.getIndexId())); + } } } diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/service/CommitBase.java b/dingo-store-proxy/src/main/java/io/dingodb/store/service/CommitBase.java index d82febc903..750e4d3c92 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/service/CommitBase.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/service/CommitBase.java @@ -74,7 +74,7 @@ public CommitBase(StoreService storeService, CommonId partId) { public void commit(byte[] key, byte[] value, int opCode, long startTs) { CommonId txnId = getTxnId(startTs); boolean need2PcPreWrite = false; - boolean need2PcCommit = false; + boolean need2PcCommit; try { Mutation mutation = new Mutation( diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/service/InfoSchemaService.java b/dingo-store-proxy/src/main/java/io/dingodb/store/service/InfoSchemaService.java index cbda8f70e3..69bf32274a 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/service/InfoSchemaService.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/service/InfoSchemaService.java @@ -24,13 +24,11 @@ import io.dingodb.common.codec.CodecKvUtil; import io.dingodb.common.ddl.DdlJob; import io.dingodb.common.ddl.DdlUtil; -import io.dingodb.common.ddl.JobState; import io.dingodb.common.ddl.SchemaDiff; import io.dingodb.common.log.LogUtils; import io.dingodb.common.meta.SchemaInfo; import io.dingodb.common.meta.Tenant; import io.dingodb.common.tenant.TenantConstant; -import io.dingodb.meta.DdlService; import io.dingodb.meta.InfoSchemaServiceProvider; import io.dingodb.meta.ddl.InfoSchemaBuilder; import io.dingodb.meta.entity.IndexTable; @@ -857,59 +855,17 @@ public void updateIndex(long tableId, Object index) { @Override public DdlJob getHistoryDDLJob(long jobId) { - if (DdlUtil.historyJobEtcd) { - byte[] key = historyJobIdKey(jobId); - RangeRequest rangeRequest = RangeRequest.builder() - .key(key) - .build(); - RangeResponse response = this.versionService.kvRange(System.identityHashCode(rangeRequest), - rangeRequest); - if (response.getKvs() != null && !response.getKvs().isEmpty()) { - byte[] val = response.getKvs().get(0).getKv().getValue(); - if (val == null) { - return DdlJob.builder().id(jobId).error(null).build(); - } - String valStr = new String(val); - if ("0".equalsIgnoreCase(valStr)) { - return DdlJob.builder().id(jobId).error(null) - .state(JobState.jobStateSynced).build(); - } else { - return DdlJob.builder().id(jobId).error(new String(val)).build(); - } - } + byte[] val = this.txn.ddlHGet(mHistoryJobPrefixKeys, jobIdKey(jobId)); + if (val == null) { return null; - } else { - byte[] val = this.txn.ddlHGet(mHistoryJobPrefixKeys, jobIdKey(jobId)); - if (val == null) { - return null; - } - return (DdlJob) getObjFromBytes(val, DdlJob.class); } + return (DdlJob) getObjFromBytes(val, DdlJob.class); } @Override public void addHistoryDDLJob(DdlJob job, boolean updateRawArgs) { - if (DdlUtil.historyJobEtcd) { - byte[] data; - if (job.getError() != null) { - data = job.getError().getBytes(); - } else { - data = "0".getBytes(); - } - byte[] key = historyJobIdKey(job.getId()); - PutRequest putRequest = PutRequest.builder() - .keyValue( - KeyValue.builder() - .key(key) - .value(data) - .build() - ) - .build(); - putKvToCoordinator(putRequest, 3); - } else { - byte[] data = job.encode(updateRawArgs); - this.txn.ddlHPut(mHistoryJobPrefixKeys, jobIdKey(job.getId()), data); - } + byte[] data = job.encode(updateRawArgs); + this.txn.ddlHPut(mHistoryJobPrefixKeys, jobIdKey(job.getId()), data); } @Override diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/service/MetaStoreKv.java b/dingo-store-proxy/src/main/java/io/dingodb/store/service/MetaStoreKv.java index 7d277786c6..3c7a45de58 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/service/MetaStoreKv.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/service/MetaStoreKv.java @@ -18,9 +18,14 @@ import io.dingodb.common.CommonId; import io.dingodb.common.log.LogUtils; +import io.dingodb.common.partition.RangeDistribution; import io.dingodb.common.store.KeyValue; +import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.common.util.Utils; import io.dingodb.exec.transaction.util.TransactionUtil; +import io.dingodb.meta.MetaService; +import io.dingodb.partition.DingoPartitionServiceProvider; +import io.dingodb.partition.PartitionService; import io.dingodb.sdk.common.serial.BufImpl; import io.dingodb.sdk.service.CoordinatorService; import io.dingodb.sdk.service.Services; @@ -32,6 +37,7 @@ import io.dingodb.sdk.service.entity.common.StorageEngine; import io.dingodb.sdk.service.entity.coordinator.CreateRegionRequest; import io.dingodb.sdk.service.entity.coordinator.CreateRegionResponse; +import io.dingodb.sdk.service.entity.coordinator.ScanRegionInfo; import io.dingodb.sdk.service.entity.coordinator.ScanRegionsRequest; import io.dingodb.sdk.service.entity.coordinator.ScanRegionsResponse; import io.dingodb.store.api.StoreInstance; @@ -47,6 +53,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.NavigableMap; import java.util.Set; @Slf4j @@ -59,8 +66,9 @@ public class MetaStoreKv { private static MetaStoreKv instanceDdl; Set coordinators = Services.parse(Configuration.coordinators()); // putAbsent - StoreService storeService; - MetaKvTxn metaKvTxn; + StoreService preStoreService; + MetaKvTxn preMetaKvTxn; + PartitionService ps = PartitionService.getService(DingoPartitionServiceProvider.RANGE_FUNC_NAME); long statementTimeout = 50000; @@ -87,20 +95,20 @@ public static synchronized MetaStoreKv getDdlInstance() { return instanceDdl; } - private MetaStoreKv(boolean ddl) { + public MetaStoreKv(boolean ddl) { this.ddl = ddl; if (!ddl) { partId = new CommonId(CommonId.CommonType.PARTITION, 0, 0); long metaPartId = checkMetaRegion(); - metaId = new CommonId(CommonId.CommonType.META, 0, metaPartId); - storeService = Services.storeRegionService(coordinators, metaPartId, TransactionUtil.STORE_RETRY); - metaKvTxn = new MetaKvTxn(storeService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); + metaId = new CommonId(CommonId.CommonType.META, 0, 0); + preStoreService = Services.storeRegionService(coordinators, metaPartId, TransactionUtil.STORE_RETRY); + preMetaKvTxn = new MetaKvTxn(preStoreService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); } else { partId = new CommonId(CommonId.CommonType.PARTITION, 0, 3); long metaPartId = checkMetaRegion(); - metaId = new CommonId(CommonId.CommonType.META, 0, metaPartId); - storeService = Services.storeRegionService(coordinators, metaPartId, TransactionUtil.STORE_RETRY); - metaKvTxn = new MetaKvTxn(storeService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); + metaId = new CommonId(CommonId.CommonType.DDL, 0, 0); + preStoreService = Services.storeRegionService(coordinators, metaPartId, TransactionUtil.STORE_RETRY); + preMetaKvTxn = new MetaKvTxn(preStoreService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); } } @@ -121,8 +129,10 @@ public long checkMetaRegion() { } Range range = Range.builder().startKey(startKey).endKey(endKey).build(); String regionName = "meta"; + long schemaId = 1001; if (ddl) { regionName = "ddl"; + schemaId = 1002; } CreateRegionRequest createRegionRequest = CreateRegionRequest.builder() .regionName(regionName) @@ -132,6 +142,7 @@ public long checkMetaRegion() { .storeEngine(StorageEngine.STORE_ENG_RAFT_STORE) .regionType(RegionType.STORE_REGION) .tenantId(0) + .schemaId(schemaId) .build(); try { CreateRegionResponse response = coordinatorService.createRegion(startTs, createRegionRequest); @@ -144,6 +155,14 @@ public long checkMetaRegion() { } public long getScanRegionId(byte[] start, byte[] end) { + List regionInfoList = scanRegion(start, end); + if (regionInfoList == null || regionInfoList.isEmpty()) { + return 0; + } + return regionInfoList.get(0).getRegionId(); + } + + public List scanRegion(byte[] start, byte[] end) { long startTs = io.dingodb.tso.TsoService.getDefault().tso(); ScanRegionsRequest request = ScanRegionsRequest.builder() .key(start) @@ -153,16 +172,18 @@ public long getScanRegionId(byte[] start, byte[] end) { CoordinatorService coordinatorService = Services.coordinatorService(coordinators); ScanRegionsResponse response = coordinatorService.scanRegions(startTs, request); if (response.getRegions() == null || response.getRegions().isEmpty()) { - return 0; + return new ArrayList<>(); } - return response.getRegions().get(0).getRegionId(); + return response.getRegions(); } public byte[] mGet(byte[] key, long startTs) { key = getMetaDataKey(key); List keys = Collections.singletonList(key); + StoreService storeService = getStoreService(key); TransactionStoreInstance storeInstance = new TransactionStoreInstance(storeService, null, partId); + List keyValueList = storeInstance.getKeyValues(startTs, keys, statementTimeout); if (keyValueList.isEmpty()) { return null; @@ -173,8 +194,9 @@ public byte[] mGet(byte[] key, long startTs) { public byte[] mGetImmediately(byte[] key, long startTs) { key = getMetaDataKey(key); - List keys = Collections.singletonList(key); + + StoreService storeService = getStoreService(key); TransactionStoreInstance storeInstance = new TransactionStoreInstance(storeService, null, partId); try { List keyValueList = storeInstance.getKeyValues(startTs, keys, 1000); @@ -191,7 +213,7 @@ public byte[] mGetImmediately(byte[] key, long startTs) { public List mRange(byte[] start, byte[] end, long startTs) { start = getMetaDataKey(start); end = getMetaDataKey(end); - TransactionStoreInstance storeInstance = new TransactionStoreInstance(storeService, null, partId); + TransactionStoreInstance storeInstance = new TransactionStoreInstance(preStoreService, null, partId); StoreInstance.Range range = new StoreInstance.Range(start, end, true, false); Iterator scanIterator = storeInstance.getScanIterator(startTs, range, statementTimeout, null); List values = new ArrayList<>(); @@ -203,17 +225,22 @@ public List mRange(byte[] start, byte[] end, long startTs) { public void mDel(byte[] key, long startTs) { key = getMetaDataKey(key); + StoreService storeService = getStoreService(key); + MetaKvTxn metaKvTxn = new MetaKvTxn(storeService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); metaKvTxn.commit(key, null, Op.DELETE.getCode(), startTs); - //commit(key, null, Op.DELETE.getCode(), startTs); } public void mInsert(byte[] key, byte[] value, long startTs) { key = getMetaDataKey(key); + StoreService storeService = getStoreService(key); + MetaKvTxn metaKvTxn = new MetaKvTxn(storeService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); metaKvTxn.commit(key, value, Op.PUTIFABSENT.getCode(), startTs); } public void put(byte[] key, byte[] value, long startTs) { key = getMetaDataKey(key); + StoreService storeService = getStoreService(key); + MetaKvTxn metaKvTxn = new MetaKvTxn(storeService, partId, r -> getMetaRegionKey(), r -> getMetaRegionEndKey()); metaKvTxn.commit(key, value, Op.PUT.getCode(), startTs); } @@ -225,7 +252,7 @@ private byte[] getMetaDataKey(byte[] key) { return bytes; } - private byte[] getMetaRegionEndKey() { + public byte[] getMetaRegionEndKey() { byte[] bytes = new byte[9]; BufImpl buf = new BufImpl(bytes); // skip namespace @@ -237,11 +264,20 @@ private byte[] getMetaRegionEndKey() { return bytes; } - private byte[] getMetaRegionKey() { + public byte[] getMetaRegionKey() { byte[] key = new byte[9]; CodecService.INSTANCE.setId(key, partId.seq); key[0] = namespace; return key; } + public StoreService getStoreService(byte[] key) { + NavigableMap ranges + = MetaService.root().getRangeDistribution(metaId); + CommonId commonId = ps.calcPartId(key, ranges); + return Services.storeRegionService( + coordinators, commonId.seq, TransactionUtil.STORE_RETRY + ); + } + }