Skip to content

Commit

Permalink
[AMORO-3317]: Move optimizing planer and scan to iceberg/mixed-format…
Browse files Browse the repository at this point in the history
… module (#3314)

* Move optimizing-type to amoro-iceberg module

* Move rewrite stage task to amoro-iceberg module

* move plan to iceberg/mixed module. add new mixed-iceberg-rewrite-executor

* spotless

* move scan to iceberg module

* move optimizing planner && evaluator to iceberg/mixed module

* Fix ut error

* Fix ut error

* Fix ut error

* merge conflict

* spotless

* fix review comments
  • Loading branch information
baiyangtx authored Nov 13, 2024
1 parent b95329a commit 933c1ed
Show file tree
Hide file tree
Showing 90 changed files with 1,341 additions and 562 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ public class AmoroServiceConstants {
public static final long INVALID_TIME = 0;

public static final long QUOTA_LOOK_BACK_TIME = 60 * 60 * 1000;

public static final long INVALID_SNAPSHOT_ID = -1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.FileNameRules;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.ProcessTaskStatus;
import org.apache.amoro.server.dashboard.component.reverser.IcebergTableMetaExtract;
import org.apache.amoro.server.dashboard.model.TableBasicInfo;
import org.apache.amoro.server.dashboard.model.TableStatistics;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.TableStatCollector;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down Expand Up @@ -558,7 +559,7 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
for (OptimizingType type : OptimizingType.values()) {
types.put(type.name(), type.getStatus().displayValue());
types.put(type.name(), OptimizingStatus.ofOptimizingType(type).displayValue());
}
return types;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.UpgradeHiveTableUtil;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.properties.HiveTableProperties;
Expand All @@ -49,7 +50,6 @@
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Function;
Expand Down Expand Up @@ -152,7 +152,8 @@ public void getTableDetail(Context ctx) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
if (tableRuntime != null) {
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary();
AbstractOptimizingEvaluator.PendingInput tableRuntimeSummary =
tableRuntime.getTableSummary();
if (tableRuntimeSummary != null) {
tableSummary.setHealthScore(tableRuntimeSummary.getHealthScore());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.amoro.server.dashboard.utils;

import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.table.descriptor.FilesStatistics;
import org.apache.iceberg.ContentFile;
Expand Down Expand Up @@ -61,7 +61,7 @@ public static TableOptimizingInfo buildTableOptimizeInfo(TableRuntime optimizing
}

private static FilesStatistics collectPendingFileInfo(
OptimizingEvaluator.PendingInput pendingInput) {
AbstractOptimizingEvaluator.PendingInput pendingInput) {
if (pendingInput == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.exception.OptimizingCommitException;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.op.OverwriteBaseFiles;
import org.apache.amoro.op.SnapshotSummary;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.MixedTableUtil;
Expand Down Expand Up @@ -75,8 +76,7 @@ public KeyedTableCommit(
super(fromSnapshotId, table, tasks);
this.table = table;
this.tasks = tasks;
this.fromSnapshotId =
fromSnapshotId == null ? AmoroServiceConstants.INVALID_SNAPSHOT_ID : fromSnapshotId;
this.fromSnapshotId = fromSnapshotId == null ? Constants.INVALID_SNAPSHOT_ID : fromSnapshotId;
this.fromSequenceOfPartitions = fromSequenceOfPartitions;
this.toSequenceOfPartitions = toSequenceOfPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;

public interface OptimizingProcess {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.plan.OptimizingPlanner;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -263,8 +267,8 @@ private TableOptimizingProcess planInternal(TableRuntime tableRuntime) {
tableRuntime.beginPlanning();
try {
AmoroTable<?> table = tableManager.loadTable(tableRuntime.getTableIdentifier());
OptimizingPlanner planner =
OptimizingPlanner.createOptimizingPlanner(
AbstractOptimizingPlanner planner =
IcebergTableUtil.createOptimizingPlanner(
tableRuntime.refresh(table),
(MixedTable) table.originalTable(),
getAvailableCore(),
Expand Down Expand Up @@ -371,7 +375,7 @@ public TaskRuntime poll() {
}
}

public TableOptimizingProcess(OptimizingPlanner planner, TableRuntime tableRuntime) {
public TableOptimizingProcess(AbstractOptimizingPlanner planner, TableRuntime tableRuntime) {
processId = planner.getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = planner.getOptimizingType();
Expand Down Expand Up @@ -585,7 +589,13 @@ public void commit() {

@Override
public MetricsSummary getSummary() {
return new MetricsSummary(taskMap.values());
List<MetricsSummary> taskSummaries =
taskMap.values().stream()
.map(TaskRuntime::getTaskDescriptor)
.map(RewriteStageTask::getSummary)
.collect(Collectors.toList());

return new MetricsSummary(taskSummaries);
}

private UnKeyedTableCommit buildCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.OptimizingType;

public enum OptimizingStatus {
FULL_OPTIMIZING("full", true, 100),
MAJOR_OPTIMIZING("major", true, 200),
Expand Down Expand Up @@ -67,4 +69,17 @@ public static OptimizingStatus ofDisplayValue(String displayValue) {
}
return null;
}

public static OptimizingStatus ofOptimizingType(OptimizingType optimizingType) {
switch (optimizingType) {
case FULL:
return FULL_OPTIMIZING;
case MAJOR:
return MAJOR_OPTIMIZING;
case MINOR:
return MINOR_OPTIMIZING;
default:
throw new IllegalStateException("unknown optimizing-type: " + optimizingType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.MetricsSummary;

import java.util.Map;

/** A simplified meta of task, not include input/output files. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.exception.TaskRuntimeException;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.apache.amoro.hive.utils.HivePartitionUtil;
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.op.SnapshotSummary;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
Expand Down Expand Up @@ -262,7 +263,7 @@ private void rewriteFiles(
}

RewriteFiles rewriteFiles = transaction.newRewrite();
if (targetSnapshotId != AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (targetSnapshotId != Constants.INVALID_SNAPSHOT_ID) {
long sequenceNumber = table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber();
rewriteFiles.validateFromSnapshot(targetSnapshotId).dataSequenceNumber(sequenceNumber);
}
Expand Down Expand Up @@ -349,11 +350,11 @@ private DataFile moveTargetFiles(DataFile targetFile, String hiveLocation) {
private static Set<String> getCommittedDataFilesFromSnapshotId(
UnkeyedTable table, Long snapshotId) {
long currentSnapshotId = IcebergTableUtil.getSnapshotId(table, true);
if (currentSnapshotId == AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (currentSnapshotId == Constants.INVALID_SNAPSHOT_ID) {
return Collections.emptySet();
}

if (snapshotId == AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (snapshotId == Constants.INVALID_SNAPSHOT_ID) {
snapshotId = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.io.PathInfo;
import org.apache.amoro.io.SupportsFileSystemOperations;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
import org.apache.amoro.server.table.TableRuntime;
Expand Down Expand Up @@ -659,7 +659,7 @@ CloseableIterable<FileEntry> fileScan(
return CloseableIterable.empty();
}
long snapshotId = snapshot.snapshotId();
if (snapshotId == AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (snapshotId == Constants.INVALID_SNAPSHOT_ID) {
tasks = tableScan.planFiles();
} else {
tasks = tableScan.useSnapshot(snapshotId).planFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.amoro.TableFormat;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;

import java.util.Map;

Expand All @@ -45,8 +45,8 @@ public class TableRuntimeMeta {
private long currentStatusStartTime;
private String optimizerGroup;
private TableConfiguration tableConfig;
private OptimizingEvaluator.PendingInput pendingInput;
private OptimizingEvaluator.PendingInput tableSummary;
private AbstractOptimizingEvaluator.PendingInput pendingInput;
private AbstractOptimizingEvaluator.PendingInput tableSummary;
private long optimizingProcessId = 0;
private ProcessStatus processStatus;
private OptimizingType optimizingType;
Expand Down Expand Up @@ -189,11 +189,11 @@ public void setLastOptimizedSnapshotId(long lastOptimizedSnapshotId) {
this.lastOptimizedSnapshotId = lastOptimizedSnapshotId;
}

public OptimizingEvaluator.PendingInput getTableSummary() {
public AbstractOptimizingEvaluator.PendingInput getTableSummary() {
return tableSummary;
}

public void setTableSummary(OptimizingEvaluator.PendingInput tableSummary) {
public void setTableSummary(AbstractOptimizingEvaluator.PendingInput tableSummary) {
this.tableSummary = tableSummary;
}

Expand Down Expand Up @@ -285,11 +285,11 @@ public void setSummary(String summary) {
this.summary = summary;
}

public OptimizingEvaluator.PendingInput getPendingInput() {
public AbstractOptimizingEvaluator.PendingInput getPendingInput() {
return pendingInput;
}

public void setPendingInput(OptimizingEvaluator.PendingInput pendingInput) {
public void setPendingInput(AbstractOptimizingEvaluator.PendingInput pendingInput) {
this.pendingInput = pendingInput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.utils.CompressUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.amoro.server.persistence.converter;

import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.StagedTaskDescriptor;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.TypeHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.StagedTaskDescriptor;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.converter.JsonObjectConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.amoro.metrics.Metric;
import org.apache.amoro.metrics.MetricDefine;
import org.apache.amoro.metrics.MetricKey;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.metrics.MetricRegistry;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down
Loading

0 comments on commit 933c1ed

Please sign in to comment.