Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix] Fix expire data after dropping partition #3354

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,14 @@ public void expireData(TableRuntime tableRuntime) {
try {
DataExpirationConfig expirationConfig =
tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
if (!TableConfigurations.isValidDataExpirationField(expirationConfig, field, table.name())) {
return;
}

expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig, field));
expireDataFrom(expirationConfig, expireBaseOnRule(expirationConfig));
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
protected Instant expireBaseOnRule(DataExpirationConfig expirationConfig) {
switch (expirationConfig.getBaseOnRule()) {
case CURRENT_TIME:
return Instant.now();
Expand All @@ -282,18 +277,20 @@ protected Instant expireBaseOnRule(
* zone
*/
@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
if (instant.equals(Instant.MIN)) {
return;
}
Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
if (TableConfigurations.isInvalidDataExpirationField(
expirationConfig, field, table.spec(), table.name())) {
return;
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
LOG.info(
"Expiring data older than {} in table {} ",
Instant.ofEpochMilli(expireTimestamp)
.atZone(
getDefaultZoneId(table.schema().findField(expirationConfig.getExpirationField())))
.toLocalDateTime(),
Instant.ofEpochMilli(expireTimestamp).atZone(getDefaultZoneId(field)).toLocalDateTime(),
table.name());

Expression dataFilter = getDataExpression(table.schema(), expirationConfig, expireTimestamp);
Expand Down Expand Up @@ -695,10 +692,11 @@ CloseableIterable<FileEntry> fileScan(
}

protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
DataExpirationConfig config, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {

try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, config)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
Expand All @@ -708,7 +706,7 @@ protected ExpireFiles expiredFileScan(
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.filter(e -> willNotRetain(e, config, partitionFreshness))
.forEach(expiredFiles::addFile);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -721,6 +719,7 @@ protected ExpireFiles expiredFileScan(
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
* filter for expired files at the scanning stage
*
* @param schema table schema
* @param expirationConfig expiration configuration
* @param expireTimestamp expired timestamp
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,19 @@ public void expireData(TableRuntime tableRuntime) {
try {
DataExpirationConfig expirationConfig =
tableRuntime.getTableConfiguration().getExpiringDataConfig();
Types.NestedField field =
mixedTable.schema().findField(expirationConfig.getExpirationField());
if (!TableConfigurations.isValidDataExpirationField(
expirationConfig, field, mixedTable.name())) {
return;
}

expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig, field));
expireDataFrom(expirationConfig, expireMixedBaseOnRule(expirationConfig));
} catch (Throwable t) {
LOG.error("Unexpected purge error for table {} ", tableRuntime.getTableIdentifier(), t);
}
}

protected Instant expireMixedBaseOnRule(
DataExpirationConfig expirationConfig, Types.NestedField field) {
protected Instant expireMixedBaseOnRule(DataExpirationConfig expirationConfig) {
Instant changeInstant =
Optional.ofNullable(changeMaintainer).isPresent()
? changeMaintainer.expireBaseOnRule(expirationConfig, field)
? changeMaintainer.expireBaseOnRule(expirationConfig)
: Instant.MIN;
Instant baseInstant = baseMaintainer.expireBaseOnRule(expirationConfig, field);
Instant baseInstant = baseMaintainer.expireBaseOnRule(expirationConfig);
if (changeInstant.compareTo(baseInstant) >= 0) {
return changeInstant;
} else {
Expand All @@ -149,13 +142,17 @@ protected Instant expireMixedBaseOnRule(
}

@VisibleForTesting
public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
void expireDataFrom(DataExpirationConfig expirationConfig, Instant instant) {
if (instant.equals(Instant.MIN)) {
return;
}
Types.NestedField field = mixedTable.schema().findField(expirationConfig.getExpirationField());
if (TableConfigurations.isInvalidDataExpirationField(
expirationConfig, field, mixedTable.spec(), mixedTable.name())) {
return;
}

long expireTimestamp = instant.minusMillis(expirationConfig.getRetentionTime()).toEpochMilli();
Types.NestedField field = mixedTable.schema().findField(expirationConfig.getExpirationField());
LOG.info(
"Expiring data older than {} in mixed table {} ",
Instant.ofEpochMilli(expireTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.amoro.utils.CompatiblePropertyUtil;
import org.apache.amoro.utils.PropertyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
Expand Down Expand Up @@ -168,42 +169,52 @@ private static DataExpirationConfig.BaseOnRule parseDataExpirationBaseOnRule(Str
}

/**
* Check if the given field is valid for data expiration.
* Check if the given field is invalid for data expiration.
*
* @param config data expiration config
* @param field table nested field
* @param name table name
* @return true if field is valid
* @param tableName table name
* @return true if field is invalid
*/
public static boolean isValidDataExpirationField(
DataExpirationConfig config, Types.NestedField field, String name) {
return config.isEnabled()
&& config.getRetentionTime() > 0
&& validateExpirationField(field, name, config.getExpirationField());
public static boolean isInvalidDataExpirationField(
DataExpirationConfig config, Types.NestedField field, PartitionSpec spec, String tableName) {
return !config.isEnabled()
|| config.getRetentionTime() <= 0
|| !validateExpirationField(config, field, spec, tableName);
}

public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG);

private static boolean validateExpirationField(
Types.NestedField field, String name, String expirationField) {
DataExpirationConfig config, Types.NestedField field, PartitionSpec spec, String tableName) {
String expirationField = config.getExpirationField();

if (StringUtils.isBlank(expirationField) || null == field) {
LOG.warn(
String.format(
"Field(%s) used to determine data expiration is illegal for table(%s)",
expirationField, name));
"Field({}) used to determine data expiration is illegal for table({})",
expirationField,
tableName);
return false;
}
Type.TypeID typeID = field.type().typeId();
if (!DATA_EXPIRATION_FIELD_TYPES.contains(typeID)) {
LOG.warn(
String.format(
"Table(%s) field(%s) type(%s) is not supported for data expiration, please use the "
+ "following types: %s",
name,
expirationField,
typeID.name(),
StringUtils.join(DATA_EXPIRATION_FIELD_TYPES, ", ")));
"Table({}) field({}) type({}) is not supported for data expiration, please use the "
+ "following types: {}",
tableName,
expirationField,
typeID.name(),
StringUtils.join(DATA_EXPIRATION_FIELD_TYPES, ", "));
return false;
}
DataExpirationConfig.ExpireLevel level = config.getExpirationLevel();
if (level == DataExpirationConfig.ExpireLevel.PARTITION
&& spec.getFieldsBySourceId(field.fieldId()).isEmpty()) {
LOG.warn(
"Expiration field({}) must be a partition field for the table({})",
expirationField,
tableName);
return false;
}

Expand Down
Loading
Loading