Skip to content

Commit

Permalink
expire since latest_snapshot or current_timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
XBaith committed Nov 24, 2023
1 parent 32cf15e commit 7d383be
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.netease.arctic.ams.api.CommitMetaProducer;
import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.PathInfo;
import com.netease.arctic.io.SupportsFileSystemOperations;
Expand Down Expand Up @@ -156,8 +158,17 @@ public void expireData(TableRuntime tableRuntime) {
if (!expirationConfig.isValid(field, table.name())) {
return;

Check warning on line 159 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L159

Added line #L159 was not covered by tests
}
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(getDefaultZoneId(field)).toInstant();

Check warning on line 163 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L163

Added line #L163 was not covered by tests
} else {
startInstant =
Instant.ofEpochMilli(fetchLatestNonOptimizedSnapshotTime(table))
.atZone(getDefaultZoneId(field))
.toInstant();

Check warning on line 168 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java#L165-L168

Added lines #L165 - L168 were not covered by tests
}

expireDataFrom(expirationConfig, Instant.now().atZone(getDefaultZoneId(field)).toInstant());
expireDataFrom(expirationConfig, startInstant);
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
Expand Down Expand Up @@ -265,8 +276,10 @@ protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {
// Latest checkpoint of flink need retain. If Flink does not continuously commit new snapshots,
// it can lead to issues with table partitions not expiring.
long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
// Retain the latest non-optimized snapshot for remember the real latest update
long latestNonOptimizedTime = fetchLatestNonOptimizedSnapshotTime(table);
long olderThan = System.currentTimeMillis() - baseSnapshotsKeepTime;
return min(latestFlinkCommitTime, mustOlderThan, olderThan);
return min(latestFlinkCommitTime, latestNonOptimizedTime, mustOlderThan, olderThan);
}

protected Set<String> expireSnapshotNeedToExcludeFiles() {
Expand Down Expand Up @@ -388,6 +401,20 @@ public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRu
return Long.MAX_VALUE;
}

/**
* When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot
* should not be produced by Amoro.
*
* @param table iceberg table
* @return the latest non-optimized snapshot timestamp
*/
public static long fetchLatestNonOptimizedSnapshotTime(Table table) {
Optional<Snapshot> snapshot =
IcebergTableUtil.findSnapshot(
table, s -> s.summary().containsValue(CommitMetaProducer.OPTIMIZE.name()));
return snapshot.isPresent() ? snapshot.get().timestampMillis() : Long.MAX_VALUE;
}

private static int deleteInvalidFilesInFs(
SupportsFileSystemOperations fio, String location, long lastTime, Set<String> excludes) {
if (!fio.exists(location)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,22 @@ public void expireData(TableRuntime tableRuntime) {
return;

Check warning on line 145 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L145

Added line #L145 was not covered by tests
}
ZoneId defaultZone = IcebergTableMaintainer.getDefaultZoneId(field);

Check warning on line 147 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L147

Added line #L147 was not covered by tests
Instant instant = Instant.now().atZone(defaultZone).toInstant();
expireDataFrom(expirationConfig, instant);
Instant startInstant;
if (expirationConfig.getSince() == DataExpirationConfig.Since.CURRENT_TIMESTAMP) {
startInstant = Instant.now().atZone(defaultZone).toInstant();

Check warning on line 150 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L150

Added line #L150 was not covered by tests
} else {
long latestBaseTs =
IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(baseMaintainer.getTable());

Check warning on line 153 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L152-L153

Added lines #L152 - L153 were not covered by tests
long latestChangeTs =
changeMaintainer == null
? Long.MAX_VALUE
: IcebergTableMaintainer.fetchLatestNonOptimizedSnapshotTime(
changeMaintainer.getTable());
long latestNonOptimizedTs = Longs.min(latestChangeTs, latestBaseTs);

Check warning on line 159 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L157-L159

Added lines #L157 - L159 were not covered by tests

startInstant = Instant.ofEpochMilli(latestNonOptimizedTs).atZone(defaultZone).toInstant();

Check warning on line 161 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java#L161

Added line #L161 was not covered by tests
}
expireDataFrom(expirationConfig, startInstant);
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class DataExpirationConfig {
private String dateTimePattern;
// data-expire.datetime-number-format
private String numberDateFormat;
// data-expire.since
private Since since;

@VisibleForTesting
public enum ExpireLevel {
Expand All @@ -49,6 +51,22 @@ public static ExpireLevel fromString(String level) {
}
}

@VisibleForTesting
public enum Since {
LATEST_SNAPSHOT,
CURRENT_TIMESTAMP;

public static Since fromString(String since) {
Preconditions.checkArgument(null != since, "data-expire.since is invalid: null");
try {
return Since.valueOf(since.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Unable to expire data since: %s", since), e);

Check warning on line 65 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L63-L65

Added lines #L63 - L65 were not covered by tests
}
}
}

public static final Set<Type.TypeID> FIELD_TYPES =
Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG);

Expand All @@ -62,13 +80,15 @@ public DataExpirationConfig(
ExpireLevel expirationLevel,
long retentionTime,
String dateTimePattern,
String numberDateFormat) {
String numberDateFormat,
Since since) {

Check warning on line 84 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L84

Added line #L84 was not covered by tests
this.enabled = enabled;
this.expirationField = expirationField;
this.expirationLevel = expirationLevel;
this.retentionTime = retentionTime;
this.dateTimePattern = dateTimePattern;
this.numberDateFormat = numberDateFormat;
this.since = since;

Check warning on line 91 in ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/DataExpirationConfig.java#L91

Added line #L91 was not covered by tests
}

public DataExpirationConfig(ArcticTable table) {
Expand Down Expand Up @@ -113,6 +133,12 @@ public DataExpirationConfig(ArcticTable table) {
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT);
since =
Since.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT));
}

public static DataExpirationConfig parse(Map<String, String> properties) {
Expand Down Expand Up @@ -141,7 +167,13 @@ public static DataExpirationConfig parse(Map<String, String> properties) {
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT,
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT));
TableProperties.DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT))
.setSince(
Since.fromString(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.DATA_EXPIRATION_SINCE,
TableProperties.DATA_EXPIRATION_SINCE_DEFAULT)));
String retention =
CompatiblePropertyUtil.propertyAsString(
properties, TableProperties.DATA_EXPIRATION_RETENTION_TIME, null);
Expand Down Expand Up @@ -206,6 +238,15 @@ public DataExpirationConfig setNumberDateFormat(String numberDateFormat) {
return this;
}

public Since getSince() {
return since;
}

public DataExpirationConfig setSince(Since since) {
this.since = since;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -220,7 +261,8 @@ public boolean equals(Object o) {
&& Objects.equal(expirationField, config.expirationField)
&& expirationLevel == config.expirationLevel
&& Objects.equal(dateTimePattern, config.dateTimePattern)
&& Objects.equal(numberDateFormat, config.numberDateFormat);
&& Objects.equal(numberDateFormat, config.numberDateFormat)
&& since == config.since;
}

@Override
Expand All @@ -231,7 +273,8 @@ public int hashCode() {
expirationLevel,
retentionTime,
dateTimePattern,
numberDateFormat);
numberDateFormat,
since);
}

public boolean isValid(Types.NestedField field, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package com.netease.arctic.server.utils;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.netease.arctic.IcebergFileEntry;
import com.netease.arctic.scan.TableEntriesScan;
import com.netease.arctic.server.ArcticServiceConstants;
Expand Down Expand Up @@ -75,6 +78,11 @@ public static Snapshot getSnapshot(Table table, boolean refresh) {
return table.currentSnapshot();
}

public static Optional<Snapshot> findSnapshot(Table table, Predicate<Snapshot> predicate) {
Iterable<Snapshot> snapshots = table.snapshots();
return Iterables.tryFind(snapshots, predicate);
}

public static Set<String> getAllContentFilePath(Table internalTable) {
Set<String> validFilesPath = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ private TableProperties() {}
"data-expire.datetime-number-format";
public static final String DATA_EXPIRATION_DATE_NUMBER_FORMAT_DEFAULT = "TIMESTAMP_MS";
public static final String DATA_EXPIRATION_RETENTION_TIME = "data-expire.retention-time";
public static final String DATA_EXPIRATION_SINCE = "data-expire.since";
public static final String DATA_EXPIRATION_SINCE_DEFAULT = "LATEST_SNAPSHOT";

public static final String ENABLE_DANGLING_DELETE_FILES_CLEAN =
"clean-dangling-delete-files.enabled";
Expand Down
29 changes: 15 additions & 14 deletions docs/user-guides/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ Self-optimizing configurations are applicable to both Iceberg Format and Mixed s

Data-cleaning configurations are applicable to both Iceberg Format and Mixed streaming Format.

| Key | Default | Description |
|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------|
| table-expire.enabled | true | Enables periodically expire table |
| change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore |
| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes |
| clean-orphan-file.enabled | false | Enables periodically clean orphan files |
| clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes |
| clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files |
| data-expire.enabled | false | Whether to enable data expiration |
| data-expire.level | partition | Level of data expiration. Including partition and file |
| data-expire.field | NULL | Field used to determine data expiration, supporting timestamp/timestampz/long type and string type field in date format |
| data-expire.datetime-string-pattern | yyyy-MM-dd | Pattern used for matching string datetime |
| data-expire.datetime-number-format | TIMESTAMP_MS | Timestamp unit for long field. Including TIMESTAMP_MS and TIMESTAMP_S |
| data-expire.retention-time | NULL | Retention period for data expiration. For example, 1d means retaining data for 1 day. Other supported units include h (hour), min (minute), s (second), ms (millisecond), etc. |
| Key | Default | Description |
|---------------------------------------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| table-expire.enabled | true | Enables periodically expire table |
| change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore |
| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes |
| clean-orphan-file.enabled | false | Enables periodically clean orphan files |
| clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes |
| clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files |
| data-expire.enabled | false | Whether to enable data expiration |
| data-expire.level | partition | Level of data expiration. Including partition and file |
| data-expire.field | NULL | Field used to determine data expiration, supporting timestamp/timestampz/long type and string type field in date format |
| data-expire.datetime-string-pattern | yyyy-MM-dd | Pattern used for matching string datetime |
| data-expire.datetime-number-format | TIMESTAMP_MS | Timestamp unit for long field. Including TIMESTAMP_MS and TIMESTAMP_S |
| data-expire.retention-time | NULL | Retention period for data expiration. For example, 1d means retaining data for 1 day. Other supported units include h (hour), min (minute), s (second), ms (millisecond), etc. |
| data-expire.since | LATEST_SNAPSHOT | A event to indicate when start expire data. Including LATEST_SNAPSHOT and CURRENT_TIMESTAMP. LATEST_SNAPSHOT uses the timestamp of latest **non-optimized** snapshot as the start of the expiration, which ensures that the table has retention-time data |

## Mixed Format configurations

Expand Down
15 changes: 12 additions & 3 deletions docs/user-guides/using-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ ALTER TABLE test_db.test_log_store set tblproperties (
'data-expire.enabled' = 'true');
```

### Set the data retention period
### Set retention period

The configuration for data retention duration consists of a number and a unit. For example, '90d' represents retaining data for 90 days, and '12h' indicates 12 hours.

Expand All @@ -164,7 +164,7 @@ ALTER TABLE test_db.test_log_store set tblproperties (
'data-expire.retention-time' = '90d');
```

### Select the event-time field
### Select expiration field

Data expiration requires users to specify a field for determining expiration.
In addition to supporting timestampz/timestamp field types for this purpose, it also supports string and long field type.
Expand All @@ -186,7 +186,7 @@ ALTER TABLE test_db.test_log_store set tblproperties (
'data-expire.datetime-number-format' = 'TIMESTAMP_MS');
```

### Adjust the data expiration level
### Adjust expiration level

Data expiration supports two levels, including `PARTITION` and `FILE`. The default level is `PARTITION`, which means that AMS deletes files only when all the files within a partition have expired.

Expand All @@ -195,6 +195,15 @@ ALTER TABLE test_db.test_log_store set tblproperties (
'data-expire.level' = 'partition');
```

### Specify start time

Amoro expire data since `CURRENT_SNAPSHOT` or `CURRENT_TIMESTAMP`. `CURRENT_SNAPSHOT` will follow the timestamp of the table's most recent snapshot as the start time of the expiration, which ensures that the table has `data-expire.retention-time` data; while `CURRENT_TIMESTAMP` will follow the current time of the service.

```sql
ALTER TABLE test_db.test_log_store set tblproperties (
'data-expire.since' = 'current_timestamp');
```

## Delete table

After logging into the AMS Dashboard. To modify a table, enter the modification statement in the `terminal` and execute it.
Expand Down

0 comments on commit 7d383be

Please sign in to comment.