Skip to content

Commit

Permalink
move to HiveTableProperties
Browse files Browse the repository at this point in the history
  • Loading branch information
baiyangtx committed Nov 1, 2023
1 parent 9e75274 commit 07cf5f2
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ private TableProperties() {

public static final String OWNER = "owner";

/** enable consistent write for hive store */
public static final String HIVE_CONSISTENT_WRITE_ENABLED = "hive.consistent-write.enabled";

public static final String HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT = "true";

/**
* Protected properties which should not be read by user.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,4 @@ public static long getTableWatermark(Map<String, String> properties) {
return Long.parseLong(watermarkValue);
}
}

public static boolean hiveConsistentWriteEnabled(Map<String, String> properties) {
return Boolean.parseBoolean(
properties.getOrDefault(
TableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
TableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand All @@ -37,7 +38,6 @@
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.table.WriteOperationKind;
import com.netease.arctic.utils.SchemaUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -138,8 +138,10 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled =
TablePropertyUtil.hiveConsistentWriteEnabled(table.properties());
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand All @@ -37,7 +38,6 @@
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.table.WriteOperationKind;
import com.netease.arctic.utils.SchemaUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -139,8 +139,10 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled =
TablePropertyUtil.hiveConsistentWriteEnabled(table.properties());
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand All @@ -37,7 +38,6 @@
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.table.WriteOperationKind;
import com.netease.arctic.utils.SchemaUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -139,8 +139,10 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled =
TablePropertyUtil.hiveConsistentWriteEnabled(table.properties());
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class HiveTableProperties {
public static final String AUTO_SYNC_HIVE_DATA_WRITE = "base.hive.auto-sync-data-write";
public static final boolean AUTO_SYNC_HIVE_DATA_WRITE_DEFAULT = false;

/** enable consistent write for hive store */
public static final String HIVE_CONSISTENT_WRITE_ENABLED = "base.hive.consistent-write.enabled";

public static final boolean HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT = true;

public static final String ALLOW_HIVE_TABLE_EXISTED = "allow-hive-table-existed";

public static final String WATERMARK_HIVE = "watermark.hive";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.hive.io.writer;

import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.table.HiveLocationKind;
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.hive.utils.TableTypeUtil;
Expand All @@ -38,7 +39,6 @@
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.table.WriteOperationKind;
import com.netease.arctic.utils.SchemaUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetricsModes;
Expand Down Expand Up @@ -73,7 +73,10 @@ public class AdaptHiveGenericTaskWriterBuilder implements TaskWriterBuilder<Reco

private AdaptHiveGenericTaskWriterBuilder(ArcticTable table) {
this.table = table;
this.hiveConsistentWrite = TablePropertyUtil.hiveConsistentWriteEnabled(table.properties());
this.hiveConsistentWrite = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);
}

public AdaptHiveGenericTaskWriterBuilder withTransactionId(Long transactionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.hive.io;

import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveGenericTaskWriterBuilder;
import com.netease.arctic.hive.table.HiveLocationKind;
import com.netease.arctic.hive.table.SupportHive;
Expand All @@ -29,7 +30,6 @@
import com.netease.arctic.table.LocationKind;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.TableFileUtils;
import com.netease.arctic.utils.TablePropertyUtil;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
Expand All @@ -38,6 +38,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.junit.Assert;

public class HiveDataTestHelpers {
Expand Down Expand Up @@ -140,8 +141,10 @@ public static List<DataFile> lastedAddedFiles(Table tableStore) {
* file.
*/
public static void assertWriteConsistentFilesName(SupportHive table, List<DataFile> files) {
boolean consistentWriteEnabled =
TablePropertyUtil.hiveConsistentWriteEnabled(table.properties());
boolean consistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);
String hiveLocation = table.hiveLocation();
for (DataFile f : files) {
String filename = TableFileUtils.getFileName(f.path().toString());
Expand Down
10 changes: 5 additions & 5 deletions site/docs/ch/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ Self-optimizing 配置对 Iceberg format, Mixed streaming format 都会生效。

### Mixed Hive format 相关配置

| 配置名称 | 默认值 | 描述 |
|-----------------------------------|-------|---------------------------------------------|
| base.hive.auto-sync-schema-change | true | 是否从 HMS 自动同步 Hive 的 schema 变更 |
| base.hive.auto-sync-data-write | false | 是否自动同步 Hive 的原生的数据写入,有 Hive 原生数据写入时需要打开 |
| hive.consistent-write.enabled | true | 写入 hive 路径的文件会先写隐藏文件,commit 期间 rename 为可见文件 |
| 配置名称 | 默认值 | 描述 |
|------------------------------------|-------|---------------------------------------------|
| base.hive.auto-sync-schema-change | true | 是否从 HMS 自动同步 Hive 的 schema 变更 |
| base.hive.auto-sync-data-write | false | 是否自动同步 Hive 的原生的数据写入,有 Hive 原生数据写入时需要打开 |
| base.hive.consistent-write.enabled | true | 写入 hive 路径的文件会先写隐藏文件,commit 期间 rename 为可见文件 |

### Trash 相关配置

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.spark.io;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.io.writer.ChangeTaskWriter;
Expand Down Expand Up @@ -134,7 +135,10 @@ public TaskWriter<InternalRow> newBaseWriter(boolean isOverwrite) {
.builderFor(icebergTable, schema, dsSchema)
.writeHive(isHiveTable)
.build();
boolean consistentWriteEnabled = TablePropertyUtil.hiveConsistentWriteEnabled(table.properties());
boolean consistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory;
if (isHiveTable && isOverwrite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package com.netease.arctic.spark.io;

import com.netease.arctic.catalog.ArcticCatalog;
import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.HiveDataTestHelpers;
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.spark.SparkTestBase;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.PrimaryKeySpec;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.table.TableProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void after() {
public void testConsistentWrite() {
ArcticTable table = catalog.newTableBuilder(
TableIdentifier.of(catalogName, database, tableName), schema)
.withProperty(TableProperties.HIVE_CONSISTENT_WRITE_ENABLED, consistentWriteEnabled + "")
.withProperty(HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, consistentWriteEnabled + "")
.withPrimaryKeySpec(this.keySpec)
.withPartitionSpec(this.partitionSpec)
.create();
Expand Down

0 comments on commit 07cf5f2

Please sign in to comment.