Skip to content

Commit

Permalink
Add unit tests for data expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
XBaith committed Dec 7, 2024
1 parent 5cdeeee commit fbfb6fb
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan
.toLocalDateTime(),
table.name());

Expression dataFilter = getDataExpression(table.schema(), expirationConfig, expireTimestamp);
Expression dataFilter =
getDataExpression(table.schema(), table.spec(), expirationConfig, expireTimestamp);

ExpireFiles expiredFiles = expiredFileScan(expirationConfig, dataFilter, expireTimestamp);
expireFiles(expiredFiles, expireTimestamp);
Expand Down Expand Up @@ -696,10 +697,11 @@ CloseableIterable<FileEntry> fileScan(
}

protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
DataExpirationConfig config, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {

try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, config)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
Expand All @@ -709,7 +711,7 @@ protected ExpireFiles expiredFileScan(
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness, table.spec()))
.filter(e -> willNotRetain(e, config, partitionFreshness))
.forEach(expiredFiles::addFile);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -722,12 +724,24 @@ protected ExpireFiles expiredFileScan(
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
* filter for expired files at the scanning stage
*
* @param schema table schema
* @param spec current partition spec
* @param expirationConfig expiration configuration
* @param expireTimestamp expired timestamp
*/
protected static Expression getDataExpression(
Schema schema, DataExpirationConfig expirationConfig, long expireTimestamp) {
Schema schema,
PartitionSpec spec,
DataExpirationConfig expirationConfig,
long expireTimestamp) {
if (expirationConfig.getExpirationLevel().equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
Set<String> currentPartitionColumns =
spec.fields().stream()
.map(p -> schema.findColumnName(p.sourceId()))
.collect(Collectors.toSet());
if (!currentPartitionColumns.contains(expirationConfig.getExpirationField())) {
return Expressions.alwaysFalse();
}
return Expressions.alwaysTrue();
}

Expand Down Expand Up @@ -889,21 +903,16 @@ static boolean mayExpired(
static boolean willNotRetain(
FileEntry fileEntry,
DataExpirationConfig expirationConfig,
Map<StructLike, DataFileFreshness> partitionFreshness,
PartitionSpec currentSpec) {
Map<StructLike, DataFileFreshness> partitionFreshness) {
ContentFile<?> contentFile = fileEntry.getFile();

switch (expirationConfig.getExpirationLevel()) {
case PARTITION:
if (currentSpec.specId() != contentFile.specId()) {
return false;
} else {
// if only partial expired files in a partition, all the files in that partition should be
// preserved
return partitionFreshness.containsKey(contentFile.partition())
&& partitionFreshness.get(contentFile.partition()).expiredDataFileCount
== partitionFreshness.get(contentFile.partition()).totalDataFileCount;
}
// if only partial expired files in a partition, all the files in that partition should be
// preserved
return partitionFreshness.containsKey(contentFile.partition())
&& partitionFreshness.get(contentFile.partition()).expiredDataFileCount
== partitionFreshness.get(contentFile.partition()).totalDataFileCount;
case FILE:
if (!contentFile.content().equals(FileContent.DATA)) {
long seqUpperBound =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan

Expression dataFilter =
IcebergTableMaintainer.getDataExpression(
mixedTable.schema(), expirationConfig, expireTimestamp);
mixedTable.schema(), mixedTable.spec(), expirationConfig, expireTimestamp);

Pair<IcebergTableMaintainer.ExpireFiles, IcebergTableMaintainer.ExpireFiles> mixedExpiredFiles =
mixedExpiredFileScan(expirationConfig, dataFilter, expireTimestamp);
Expand Down Expand Up @@ -217,9 +217,7 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan
fileEntries
.parallelStream()
.filter(
e ->
IcebergTableMaintainer.willNotRetain(
e, expirationConfig, partitionFreshness, mixedTable.spec()))
e -> IcebergTableMaintainer.willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(
e -> {
if (e.isChange()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import static org.apache.amoro.BasicTableTestHelper.PRIMARY_KEY_SPEC;
import static org.apache.amoro.BasicTableTestHelper.SPEC;
import static org.mockito.Mockito.when;

import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.data.ChangeAction;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.PrimaryKeyedFile;
Expand All @@ -36,6 +38,7 @@
import org.apache.amoro.optimizing.scan.UnkeyedTableFileScanHelper;
import org.apache.amoro.server.optimizing.OptimizingTestHelpers;
import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.executor.ExecutorTestBase;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -63,10 +66,9 @@
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

import java.time.Instant;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -245,14 +247,7 @@ private void testKeyedPartitionLevel() {

// expire partitions that order than 2022-01-02 18:00:00.000
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(keyedTable);
tableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2022-01-03T18:00:00.000")
.atZone(
IcebergTableMaintainer.getDefaultZoneId(
keyedTable.schema().findField(config.getExpirationField())))
.toInstant());
getMaintainerAndExpire(config, "2022-01-03T18:00:00.000");

CloseableIterable<TableFileScanHelper.FileScanResult> scanAfterExpire =
buildKeyedFileScanHelper().scan();
Expand Down Expand Up @@ -292,7 +287,6 @@ private void testKeyedPartitionLevel() {
}

@Test
@DisplayName("Test expiring partition after drop the partition")
public void testKeyedPartitionLevelAfterDropping() {
Assume.assumeTrue(getMixedTable().isKeyedTable());
Assume.assumeTrue(getMixedTable().spec().isPartitioned());
Expand Down Expand Up @@ -332,12 +326,12 @@ public void testKeyedPartitionLevelAfterDropping() {
updateChangeTableSpec.commit();
updateBaseTableSpec.commit();

Assertions.assertEquals(0, keyedTable.changeTable().spec().fields().size());
Assertions.assertEquals(0, keyedTable.baseTable().spec().fields().size());
Assert.assertEquals(0, keyedTable.changeTable().spec().fields().size());
Assert.assertEquals(0, keyedTable.baseTable().spec().fields().size());

// start expiring partitions that order than 2024-12-01 18:00:00.000
// All records should not be expired since the partition column has been dropped
DataExpirationConfig config = new DataExpirationConfig(keyedTable);
DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
getMaintainerAndExpire(config, "2024-12-01T18:00:00.000");

CloseableIterable<TableFileScanHelper.FileScanResult> scanAfterExpire =
Expand Down Expand Up @@ -374,11 +368,11 @@ public void testUnKeyedPartitionLevelAfterDropping() {
}
updateBaseTableSpec.commit();

Assertions.assertEquals(0, table.spec().fields().size());
Assert.assertEquals(0, table.spec().fields().size());

// start expiring partitions that order than 2024-12-01 18:00:00.000
// All records should not be expired since the partition column has been dropped
DataExpirationConfig config = new DataExpirationConfig(table);
DataExpirationConfig config = parseDataExpirationConfig(table);
getMaintainerAndExpire(config, "2024-12-01T18:00:00.000");

CloseableIterable<TableFileScanHelper.FileScanResult> scanAfterExpire =
Expand Down Expand Up @@ -553,17 +547,60 @@ protected void getMaintainerAndExpire(DataExpirationConfig config, String dateti
}

@Test
public void testNormalFieldPartitionLevel() {
public void testNormalFieldFileLevel() {
Assume.assumeTrue(
parseDataExpirationConfig(getMixedTable()).getExpirationLevel()
== DataExpirationConfig.ExpireLevel.FILE);
getMixedTable().updateProperties().set(TableProperties.DATA_EXPIRATION_FIELD, "ts").commit();

testPartitionLevel();
testFileLevel();
}

@Test
public void testNormalFieldFileLevel() {
getMixedTable().updateProperties().set(TableProperties.DATA_EXPIRATION_FIELD, "ts").commit();
public void testIllegalPartitionField() {
MixedTable testTable = getMixedTable();
Assume.assumeTrue(testTable.spec().isPartitioned());
DataExpirationConfig config = parseDataExpirationConfig(testTable);
Assume.assumeTrue(config.getExpirationLevel() == DataExpirationConfig.ExpireLevel.PARTITION);

testFileLevel();
testTable.updateProperties().set(TableProperties.DATA_EXPIRATION_FIELD, "ts").commit();
testTable.refresh();

List<Record> records =
Lists.newArrayList(
createRecord(
1, "Oliver Bennett", parseMillis("2024-10-01T12:00:00"), "2024-10-01T12:00:00"));
OptimizingTestHelpers.appendBase(
testTable, tableTestHelper().writeBaseStore(testTable, 0, records, false));

getMaintainerAndExpire(parseDataExpirationConfig(testTable), "2024-12-01T18:00:00.000");

CloseableIterable<TableFileScanHelper.FileScanResult> scanAfterExpire =
testTable.isKeyedTable()
? buildKeyedFileScanHelper().scan()
: getTableFileScanHelper().scan();
assertScanResult(scanAfterExpire, records.size(), 0);

List<Record> result = readSortedBaseRecords(testTable);
Assert.assertEquals(records, result);
}

@Test
public void testNotExistedField() {
MixedTable testTable = getMixedTable();
testTable
.updateProperties()
.set(TableProperties.DATA_EXPIRATION_FIELD, "not_existed_field")
.commit();

DataExpirationConfig config = parseDataExpirationConfig(testTable);
MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(testTable);
TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
TableConfiguration tableConfiguration = Mockito.mock(TableConfiguration.class);
when(tableRuntime.getTableConfiguration()).thenReturn(tableConfiguration);
when(tableRuntime.getTableConfiguration().getExpiringDataConfig()).thenReturn(config);

mixedTableMaintainer.expireData(tableRuntime);
}

@Test
Expand All @@ -586,22 +623,23 @@ public void testGcDisabled() {
assertScanResult(scan, 1, 0);

DataExpirationConfig config = parseDataExpirationConfig(testTable);
MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getMixedTable());
mixedTableMaintainer.expireDataFrom(
config,
LocalDateTime.parse("2024-01-01T00:00:00.000")
.atZone(
IcebergTableMaintainer.getDefaultZoneId(
testTable.schema().findField(config.getExpirationField())))
.toInstant());
Assert.assertFalse(config.isEnabled());

MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(testTable);
TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
TableConfiguration tableConfiguration = Mockito.mock(TableConfiguration.class);
when(tableRuntime.getTableConfiguration()).thenReturn(tableConfiguration);
when(tableRuntime.getTableConfiguration().getExpiringDataConfig()).thenReturn(config);

mixedTableMaintainer.expireData(tableRuntime);

CloseableIterable<TableFileScanHelper.FileScanResult> scanAfterExpire;
if (isKeyedTable()) {
scanAfterExpire = buildKeyedFileScanHelper().scan();
} else {
scanAfterExpire = getTableFileScanHelper().scan();
}
assertScanResult(scanAfterExpire, 0, 0);
assertScanResult(scanAfterExpire, 1, 0);
}

protected Record createRecord(int id, String name, long ts, String opTime) {
Expand Down

0 comments on commit fbfb6fb

Please sign in to comment.