Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pick-2183][AMORO-2168] Support hive commit protocol in mixed-hive format. #2220

Merged
merged 5 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
*/
public class FileNameGenerator {

private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*";
private static final String KEYED_FILE_NAME_PATTERN_STRING = "\\.?(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*";
private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING);

private static final String FORMAT = "%d-%s-%d-%05d-%d-%s-%05d";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private TableProperties() {

public static final String LOG_STORE_DATA_VERSION = "log-store.data-version";
public static final String LOG_STORE_DATA_VERSION_DEFAULT = "v1";

public static final String LOG_STORE_PROPERTIES_PREFIX = "properties.";

public static final String OWNER = "owner";
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/java/com/netease/arctic/io/DataTestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -122,6 +123,24 @@ public static List<DataFile> writeBaseStore(KeyedTable keyedTable, long txId, Li
}
}

public static List<DataFile> writeRecords(TaskWriter<Record> taskWriter, List<Record> records) {
try {
records.forEach(
d -> {
try {
taskWriter.write(d);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

WriteResult result = taskWriter.complete();
return Lists.newArrayList(Arrays.asList(result.dataFiles()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static List<DataFile> writeAndCommitChangeStore(
KeyedTable keyedTable, long txId, ChangeAction action,
List<Record> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand Down Expand Up @@ -137,10 +138,14 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId) :
encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<RowData> appenderFactory = TableTypeUtil.isHive(table) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand Down Expand Up @@ -135,12 +136,17 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
schema = table.schema();
}

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId) :
encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<RowData> appenderFactory = TableTypeUtil.isHive(table) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand Down Expand Up @@ -135,12 +136,17 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
schema = table.schema();
}

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId) :
encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<RowData> appenderFactory = TableTypeUtil.isHive(table) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class HiveTableProperties {
public static final String AUTO_SYNC_HIVE_DATA_WRITE = "base.hive.auto-sync-data-write";
public static final boolean AUTO_SYNC_HIVE_DATA_WRITE_DEFAULT = false;

/** enable consistent write for hive store */
public static final String HIVE_CONSISTENT_WRITE_ENABLED = "base.hive.consistent-write.enabled";

public static final boolean HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT = true;

public static final String ALLOW_HIVE_TABLE_EXISTED = "allow-hive-table-existed";

public static final String WATERMARK_HIVE = "watermark.hive";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.hive.io.writer;

import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.table.HiveLocationKind;
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.hive.utils.TableTypeUtil;
Expand Down Expand Up @@ -68,9 +69,14 @@ public class AdaptHiveGenericTaskWriterBuilder implements TaskWriterBuilder<Reco
private String customHiveSubdirectory;
private Long targetFileSize;
private boolean orderedWriter = false;
private Boolean hiveConsistentWrite;

private AdaptHiveGenericTaskWriterBuilder(ArcticTable table) {
this.table = table;
this.hiveConsistentWrite = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);
}

public AdaptHiveGenericTaskWriterBuilder withTransactionId(Long transactionId) {
Expand All @@ -97,7 +103,7 @@ public AdaptHiveGenericTaskWriterBuilder withCustomHiveSubdirectory(String custo
this.customHiveSubdirectory = customHiveSubdirectory;
return this;
}

public AdaptHiveGenericTaskWriterBuilder withTargetFileSize(long targetFileSize) {
this.targetFileSize = targetFileSize;
return this;
Expand All @@ -108,6 +114,11 @@ public AdaptHiveGenericTaskWriterBuilder withOrdered() {
return this;
}

public AdaptHiveGenericTaskWriterBuilder hiveConsistentWrite(boolean enabled) {
this.hiveConsistentWrite = enabled;
return this;
}

@Override
public TaskWriter<Record> buildWriter(WriteOperationKind writeOperationKind) {
LocationKind locationKind = AdaptHiveOperateToTableRelation.INSTANT.getLocationKindsFromOperateKind(
Expand Down Expand Up @@ -179,17 +190,41 @@ private GenericBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
schema = table.schema();
}

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat,
table.io(), encryptionManager, partitionId, taskId, transactionId, customHiveSubdirectory) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<Record> appenderFactory = TableTypeUtil.isHive(table) ?
new AdaptHiveGenericAppenderFactory(schema, table.spec()) :
new GenericAppenderFactory(schema, table.spec());
return new GenericBaseTaskWriter(fileFormat, appenderFactory,
OutputFileFactory outputFileFactory =
locationKind == HiveLocationKind.INSTANT ? new AdaptHiveOutputFileFactory(
((SupportHive) table).hiveLocation(),
table.spec(),
fileFormat,
table.io(),
encryptionManager,
partitionId,
taskId,
transactionId,
customHiveSubdirectory,
hiveConsistentWrite)
: new CommonOutputFileFactory(
baseLocation,
table.spec(),
fileFormat,
table.io(),
encryptionManager,
partitionId,
taskId,
transactionId);
FileAppenderFactory<Record> appenderFactory =
TableTypeUtil.isHive(table) ? new AdaptHiveGenericAppenderFactory(schema, table.spec())
: new GenericAppenderFactory(schema, table.spec());
return new GenericBaseTaskWriter(
fileFormat,
appenderFactory,
outputFileFactory,
table.io(), fileSizeBytes, mask, schema, table.spec(), primaryKeySpec, orderedWriter);
table.io(),
fileSizeBytes,
mask,
schema,
table.spec(),
primaryKeySpec,
orderedWriter);
}

private GenericChangeTaskWriter buildChangeWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,49 @@
*/
public class AdaptHiveOutputFileFactory implements OutputFileFactory {

private final String baseLocation;
private final String hiveLocation;
private final String hiveSubDirectory;
private final PartitionSpec partitionSpec;
private final ArcticFileIO io;
private final EncryptionManager encryptionManager;
private final FileNameGenerator fileNameGenerator;
private final boolean hiveConsistentWrite;

public AdaptHiveOutputFileFactory(
String baseLocation,
String hiveLocation,
PartitionSpec partitionSpec,
FileFormat format,
ArcticFileIO io,
EncryptionManager encryptionManager,
int partitionId,
long taskId,
Long transactionId) {
this(baseLocation, partitionSpec, format, io, encryptionManager, partitionId, taskId, transactionId, null);
Long transactionId,
boolean hiveConsistentWrite) {
this(

Check warning on line 71 in hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java

View check run for this annotation

Codecov / codecov/patch

hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java#L71

Added line #L71 was not covered by tests
hiveLocation,
partitionSpec,
format,
io,
encryptionManager,
partitionId,
taskId,
transactionId,
null,
hiveConsistentWrite);
}

public AdaptHiveOutputFileFactory(
String baseLocation,
String hiveLocation,
PartitionSpec partitionSpec,
FileFormat format,
ArcticFileIO io,
EncryptionManager encryptionManager,
int partitionId,
long taskId,
Long transactionId,
String hiveSubDirectory) {
this.baseLocation = baseLocation;
String hiveSubDirectory,
boolean hiveConsistentWrite) {
this.hiveLocation = hiveLocation;
this.partitionSpec = partitionSpec;
this.io = io;
this.encryptionManager = encryptionManager;
Expand All @@ -90,15 +103,23 @@
this.hiveSubDirectory = hiveSubDirectory;
}
this.fileNameGenerator = new FileNameGenerator(format, partitionId, taskId, transactionId);
this.hiveConsistentWrite = hiveConsistentWrite;
}

private String generateFilename(TaskWriterKey key) {
return fileNameGenerator.fileName(key);
String filename = fileNameGenerator.fileName(key);
if (hiveConsistentWrite) {
filename = "." + filename;
}
return filename;
}

private String fileLocation(StructLike partitionData, String fileName) {
return String.format("%s/%s",
HiveTableUtil.newHiveDataLocation(baseLocation, partitionSpec, partitionData, hiveSubDirectory), fileName);
return String.format(
"%s/%s",
HiveTableUtil.newHiveDataLocation(
hiveLocation, partitionSpec, partitionData, hiveSubDirectory),
fileName);
}

public EncryptedOutputFile newOutputFile(TaskWriterKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import java.util.List;
import java.util.function.Consumer;

import static com.netease.arctic.op.OverwriteBaseFiles.PROPERTIES_TRANSACTION_ID;
Expand Down Expand Up @@ -59,11 +60,10 @@

@Override
public OverwriteFiles addFile(DataFile file) {
delegate.addFile(file);
String hiveLocationRoot = table.hiveLocation();
String dataFileLocation = file.path().toString();
if (dataFileLocation.toLowerCase().contains(hiveLocationRoot.toLowerCase())) {
// only handle file in hive location
if (!isHiveDataFile(file)) {
delegate.addFile(file);

Check warning on line 64 in hive/src/main/java/com/netease/arctic/hive/op/OverwriteHiveFiles.java

View check run for this annotation

Codecov / codecov/patch

hive/src/main/java/com/netease/arctic/hive/op/OverwriteHiveFiles.java#L64

Added line #L64 was not covered by tests
} else {
// handle file in hive location when commit
this.addFiles.add(file);
}
return this;
Expand All @@ -72,9 +72,7 @@
@Override
public OverwriteFiles deleteFile(DataFile file) {
delegate.deleteFile(file);
String hiveLocation = table.hiveLocation();
String dataFileLocation = file.path().toString();
if (dataFileLocation.toLowerCase().contains(hiveLocation.toLowerCase())) {
if (isHiveDataFile(file)) {
// only handle file in hive location
this.deleteFiles.add(file);
}
Expand Down Expand Up @@ -123,6 +121,11 @@
return this;
}

@Override
protected void postHiveDataCommitted(List<DataFile> committedDataFile) {
committedDataFile.forEach(delegate::addFile);
}

@Override
public OverwriteFiles set(String property, String value) {
if (PROPERTIES_TRANSACTION_ID.equals(property)) {
Expand Down
Loading