Skip to content

Commit

Permalink
Merge branch 'master' into feature/data_expire_by_partition_info
Browse files Browse the repository at this point in the history
  • Loading branch information
XBaith authored Dec 6, 2024
2 parents aba77d8 + 530b700 commit 49ecbdc
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ public static Set<DeleteFile> getDanglingDeleteFiles(Table internalTable) {
if (internalTable.currentSnapshot() == null) {
return Collections.emptySet();
}
long snapshotId = internalTable.currentSnapshot().snapshotId();
Set<String> deleteFilesPath = new HashSet<>();
TableScan tableScan = internalTable.newScan();
TableScan tableScan = internalTable.newScan().useSnapshot(snapshotId);
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (FileScanTask fileScanTask : fileScanTasks) {
for (DeleteFile delete : fileScanTask.deletes()) {
Expand All @@ -165,7 +166,7 @@ public static Set<DeleteFile> getDanglingDeleteFiles(Table internalTable) {
Set<DeleteFile> danglingDeleteFiles = new HashSet<>();
TableEntriesScan entriesScan =
TableEntriesScan.builder(internalTable)
.useSnapshot(internalTable.currentSnapshot().snapshotId())
.useSnapshot(snapshotId)
.includeFileContent(FileContent.EQUALITY_DELETES, FileContent.POSITION_DELETES)
.build();
try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.amoro.table.KeyedTableSnapshot;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableSnapshot;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
Expand Down Expand Up @@ -121,9 +120,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
tableFileScanHelper.scan()) {
for (TableFileScanHelper.FileScanResult fileScanResult : results) {
PartitionSpec partitionSpec =
MixedTableUtil.getMixedTablePartitionSpecById(
mixedTable, fileScanResult.file().specId());
PartitionSpec partitionSpec = tableFileScanHelper.getSpec(fileScanResult.file().specId());
StructLike partition = fileScanResult.file().partition();
String partitionPath = partitionSpec.partitionToPath(partition);
PartitionEvaluator evaluator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;

import java.util.Map;

public class IcebergTableFileScanHelper implements TableFileScanHelper {
private final Table table;
private Expression partitionFilter = Expressions.alwaysTrue();
private final long snapshotId;
private final Map<Integer, PartitionSpec> specs;

public IcebergTableFileScanHelper(Table table, long snapshotId) {
this.table = table;
this.snapshotId = snapshotId;
this.specs = table.specs();
}

@Override
Expand All @@ -61,4 +66,9 @@ public TableFileScanHelper withPartitionFilter(Expression partitionFilter) {
this.partitionFilter = partitionFilter;
return this;
}

@Override
public PartitionSpec getSpec(int specId) {
return specs.get(specId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public class KeyedTableFileScanHelper implements TableFileScanHelper {
private final long changeSnapshotId;
private final long baseSnapshotId;
private Expression partitionFilter = Expressions.alwaysTrue();
private final PartitionSpec spec;

public KeyedTableFileScanHelper(KeyedTable keyedTable, KeyedTableSnapshot snapshot) {
this.keyedTable = keyedTable;
this.baseSnapshotId = snapshot.baseSnapshotId();
this.changeSnapshotId = snapshot.changeSnapshotId();
this.spec = keyedTable.spec();
}

/**
Expand Down Expand Up @@ -441,4 +443,13 @@ public void setMinTransactionIdAfter(long minTransactionIdAfter) {
this.minTransactionIdAfter = minTransactionIdAfter;
}
}

@Override
public PartitionSpec getSpec(int specId) {
if (specId != spec.specId()) {
throw new IllegalArgumentException(
"Partition spec id " + specId + " not found in table " + keyedTable.name());
}
return spec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;

Expand Down Expand Up @@ -47,4 +48,6 @@ public List<ContentFile<?>> deleteFiles() {
CloseableIterable<FileScanResult> scan();

TableFileScanHelper withPartitionFilter(Expression partitionFilter);

PartitionSpec getSpec(int specId);
}
1 change: 1 addition & 0 deletions charts/amoro/templates/amoro-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ data:
bind-port: {{ .Values.server.optimizing.port }}
http-server:
rest-auth-type: {{ .Values.server.rest.restAuthType }}
bind-port: {{ .Values.server.rest.port }}
refresh-external-catalogs:
Expand Down
1 change: 1 addition & 0 deletions charts/amoro/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ server:
rest:
enabled: true
port: 1630
restAuthType: token
service:
## @param type Can set as "ClusterIP" or "NodePort". If set to "NodePort", @param nodePort below should set a value
##
Expand Down

0 comments on commit 49ecbdc

Please sign in to comment.