Skip to content

Commit

Permalink
[AMORO-3301] Support OSS for iceberg in InternalCatalog (#3306)
Browse files Browse the repository at this point in the history
* fix: support support OSS for iceberg in InternalCatalog

* bugfix

* bugfix

* fix: add aliyun-sdk-oss

* fix: add restcatalog api test for oss support

* fix: add license header

* fix: ci

* bugfix: oss-sdk adapt to iceberg version

* bugfix: oss-sdk adapt to iceberg version

* fix: aliyun-sdk add scope provided

* fix: aliyun-oss-sdk default provided

* fix: readme

* fix: ci

* fix: ci

* fix: add readme for htttclient

---------

Co-authored-by: ConradJam <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
  • Loading branch information
3 people authored Nov 21, 2024
1 parent 243d289 commit f88b7a6
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Amoro is built using Maven with JDK 8 and JDK 17(only for `amoro-format-mixed/am
* Build and skip tests: `mvn clean package -DskipTests`
* Build and skip dashboard: `mvn clean package -Pskip-dashboard-build`
* Build and disable disk storage, RocksDB will NOT be introduced to avoid memory overflow: `mvn clean package -DskipTests -Pno-extented-disk-storage`
* Build and enable aliyun-oss-sdk: `mvn clean package -DskipTests -Paliyun-oss-sdk`
* Build with hadoop 2.x(the default is 3.x) dependencies: `mvn clean package -DskipTests -Phadoop2`
* Specify Flink version for Flink optimizer(the default is 1.20.0): `mvn clean package -DskipTests -Dflink-optimizer.flink-version=1.20.0`
* If the version of Flink is below 1.15.0, you also need to add the `-Pflink-optimizer-pre-1.15` parameter: `mvn clean package -DskipTests -Pflink-optimizer-pre-1.15 -Dflink-optimizer.flink-version=1.14.6`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import static org.apache.amoro.properties.CatalogMetaProperties.CATALOG_TYPE_HIVE;
import static org.apache.amoro.properties.CatalogMetaProperties.KEY_WAREHOUSE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_CORE_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_HDFS_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_HIVE_SITE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_OSS_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_REGION;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_S3_ENDPOINT;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_KEY_TYPE;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_OSS;
import static org.apache.amoro.properties.CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_S3;
import static org.apache.amoro.properties.CatalogMetaProperties.TABLE_FORMATS;

Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.amoro.utils.CatalogUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aliyun.AliyunProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
Expand Down Expand Up @@ -114,6 +117,10 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_S3, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_OSS, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_AMS, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
Expand Down Expand Up @@ -145,6 +152,14 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_S3, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_OSS, PAIMON));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_GLUE, STORAGE_CONFIGS_VALUE_TYPE_OSS, MIXED_ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_OSS, ICEBERG));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(
CATALOG_TYPE_CUSTOM, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_ICEBERG));
Expand Down Expand Up @@ -182,11 +197,16 @@ private static Set<String> getHiddenCatalogProperties(
String.valueOf(authConfig.get(AUTH_CONFIGS_KEY_TYPE)))) {
hiddenProperties.add(S3FileIOProperties.ACCESS_KEY_ID);
hiddenProperties.add(S3FileIOProperties.SECRET_ACCESS_KEY);
hiddenProperties.add(AliyunProperties.CLIENT_ACCESS_KEY_ID);
hiddenProperties.add(AliyunProperties.CLIENT_ACCESS_KEY_SECRET);
}
if (STORAGE_CONFIGS_VALUE_TYPE_S3.equals(storageConfig.get(STORAGE_CONFIGS_KEY_TYPE))) {
hiddenProperties.add(AwsClientProperties.CLIENT_REGION);
hiddenProperties.add(S3FileIOProperties.ENDPOINT);
}
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageConfig.get(STORAGE_CONFIGS_KEY_TYPE))) {
hiddenProperties.add(AliyunProperties.OSS_ENDPOINT);
}
return hiddenProperties;
}

Expand All @@ -205,7 +225,10 @@ public void getCatalogTypeList(Context ctx) {
}

private void fillAuthConfigs2CatalogMeta(
CatalogMeta catalogMeta, Map<String, String> serverAuthConfig, CatalogMeta oldCatalogMeta) {
CatalogMeta catalogMeta,
Map<String, String> serverAuthConfig,
CatalogMeta oldCatalogMeta,
String storageType) {
Map<String, String> metaAuthConfig = new HashMap<>();
String authType =
serverAuthConfig
Expand Down Expand Up @@ -253,19 +276,19 @@ private void fillAuthConfigs2CatalogMeta(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
AUTH_CONFIGS_KEY_ACCESS_KEY,
S3FileIOProperties.ACCESS_KEY_ID);
getStorageAccessKey(storageType));
CatalogUtil.copyProperty(
serverAuthConfig,
catalogMeta.getCatalogProperties(),
AUTH_CONFIGS_KEY_SECRET_KEY,
S3FileIOProperties.SECRET_ACCESS_KEY);
getStorageSecretKey(storageType));
break;
}
catalogMeta.setAuthConfigs(metaAuthConfig);
}

private Map<String, Object> extractAuthConfigsFromCatalogMeta(
String catalogName, CatalogMeta catalogMeta) {
String catalogName, CatalogMeta catalogMeta, String storageType) {
Map<String, Object> serverAuthConfig = new HashMap<>();
Map<String, String> metaAuthConfig = catalogMeta.getAuthConfigs();
String authType =
Expand Down Expand Up @@ -298,24 +321,38 @@ private Map<String, Object> extractAuthConfigsFromCatalogMeta(
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
serverAuthConfig,
S3FileIOProperties.ACCESS_KEY_ID,
getStorageAccessKey(storageType),
AUTH_CONFIGS_KEY_ACCESS_KEY);
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
serverAuthConfig,
S3FileIOProperties.SECRET_ACCESS_KEY,
getStorageSecretKey(storageType),
AUTH_CONFIGS_KEY_SECRET_KEY);
break;
}

return serverAuthConfig;
}

private String getStorageAccessKey(String storageType) {
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
return AliyunProperties.CLIENT_ACCESS_KEY_ID;
}
// default s3
return S3FileIOProperties.ACCESS_KEY_ID;
}

private String getStorageSecretKey(String storageType) {
if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
return AliyunProperties.CLIENT_ACCESS_KEY_SECRET;
}
// default s3
return S3FileIOProperties.SECRET_ACCESS_KEY;
}

private Map<String, Object> extractStorageConfigsFromCatalogMeta(
String catalogName, CatalogMeta catalogMeta) {
String catalogName, CatalogMeta catalogMeta, String storageType) {
Map<String, Object> storageConfig = new HashMap<>();
Map<String, String> config = catalogMeta.getStorageConfigs();
String storageType = CatalogUtil.getCompatibleStorageType(config);
storageConfig.put(STORAGE_CONFIGS_KEY_TYPE, storageType);
if (STORAGE_CONFIGS_VALUE_TYPE_HADOOP.equals(storageType)) {
storageConfig.put(
Expand Down Expand Up @@ -354,7 +391,13 @@ private Map<String, Object> extractStorageConfigsFromCatalogMeta(
catalogMeta.getCatalogProperties(),
storageConfig,
S3FileIOProperties.ENDPOINT,
STORAGE_CONFIGS_KEY_ENDPOINT);
STORAGE_CONFIGS_KEY_S3_ENDPOINT);
} else if (STORAGE_CONFIGS_VALUE_TYPE_OSS.equals(storageType)) {
CatalogUtil.copyProperty(
catalogMeta.getCatalogProperties(),
storageConfig,
AliyunProperties.OSS_ENDPOINT,
STORAGE_CONFIGS_KEY_OSS_ENDPOINT);
}

return storageConfig;
Expand Down Expand Up @@ -387,12 +430,12 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
"Invalid table format list, " + String.join(",", info.getTableFormatList()));
}
catalogMeta.getCatalogProperties().put(CatalogMetaProperties.TABLE_FORMATS, tableFormats);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
String storageType =
info.getStorageConfig()
.getOrDefault(STORAGE_CONFIGS_KEY_TYPE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta, storageType);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
metaStorageConfig.put(STORAGE_CONFIGS_KEY_TYPE, storageType);
if (storageType.equals(STORAGE_CONFIGS_VALUE_TYPE_HADOOP)) {
List<String> metaKeyList =
Expand Down Expand Up @@ -429,8 +472,14 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
CatalogUtil.copyProperty(
info.getStorageConfig(),
catalogMeta.getCatalogProperties(),
STORAGE_CONFIGS_KEY_ENDPOINT,
STORAGE_CONFIGS_KEY_S3_ENDPOINT,
S3FileIOProperties.ENDPOINT);
} else if (storageType.equals(STORAGE_CONFIGS_VALUE_TYPE_OSS)) {
CatalogUtil.copyProperty(
info.getStorageConfig(),
catalogMeta.getCatalogProperties(),
STORAGE_CONFIGS_KEY_OSS_ENDPOINT,
AliyunProperties.OSS_ENDPOINT);
} else {
throw new RuntimeException("Invalid storage type " + storageType);
}
Expand Down Expand Up @@ -541,8 +590,10 @@ public void getCatalogDetail(Context ctx) {
} else {
info.setType(catalogMeta.getCatalogType());
}
info.setAuthConfig(extractAuthConfigsFromCatalogMeta(catalogName, catalogMeta));
info.setStorageConfig(extractStorageConfigsFromCatalogMeta(catalogName, catalogMeta));
String storageType = CatalogUtil.getCompatibleStorageType(catalogMeta.getStorageConfigs());
info.setAuthConfig(extractAuthConfigsFromCatalogMeta(catalogName, catalogMeta, storageType));
info.setStorageConfig(
extractStorageConfigsFromCatalogMeta(catalogName, catalogMeta, storageType));
// we put the table format single
String tableFormat =
catalogMeta.getCatalogProperties().get(CatalogMetaProperties.TABLE_FORMATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class InternalTableConstants {
public static final String HADOOP_FILE_IO_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
public static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
public static final String S3_PROTOCOL_PREFIX = "s3://";
public static final String OSS_FILE_IO_IMPL = "org.apache.iceberg.aliyun.oss.OSSFileIO";
public static final String OSS_PROTOCOL_PREFIX = "oss://";

public static final String CHANGE_STORE_TABLE_NAME_SUFFIX =
InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.amoro.server.table.internal.InternalTableConstants.HADOOP_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.METADATA_FOLDER_NAME;
import static org.apache.amoro.server.table.internal.InternalTableConstants.MIXED_ICEBERG_BASED_REST;
import static org.apache.amoro.server.table.internal.InternalTableConstants.OSS_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.OSS_PROTOCOL_PREFIX;
import static org.apache.amoro.server.table.internal.InternalTableConstants.S3_FILE_IO_IMPL;
import static org.apache.amoro.server.table.internal.InternalTableConstants.S3_PROTOCOL_PREFIX;

Expand Down Expand Up @@ -88,6 +90,8 @@ public static AuthenticatedFileIO newIcebergFileIo(CatalogMeta meta) {
String defaultImpl = HADOOP_FILE_IO_IMPL;
if (warehouse.toLowerCase().startsWith(S3_PROTOCOL_PREFIX)) {
defaultImpl = S3_FILE_IO_IMPL;
} else if (warehouse.toLowerCase().startsWith(OSS_PROTOCOL_PREFIX)) {
defaultImpl = OSS_FILE_IO_IMPL;
}
String ioImpl = catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, defaultImpl);
FileIO fileIO = org.apache.iceberg.CatalogUtil.loadFileIO(ioImpl, catalogProperties, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public class CatalogMetaProperties {
public static final String STORAGE_CONFIGS_KEY_CORE_SITE = "hadoop.core.site";
public static final String STORAGE_CONFIGS_KEY_HIVE_SITE = "hive.site";
public static final String STORAGE_CONFIGS_KEY_REGION = "storage.s3.region";
public static final String STORAGE_CONFIGS_KEY_ENDPOINT = "storage.s3.endpoint";
public static final String STORAGE_CONFIGS_KEY_S3_ENDPOINT = "storage.s3.endpoint";
public static final String STORAGE_CONFIGS_KEY_OSS_ENDPOINT = "storage.oss.endpoint";

public static final String STORAGE_CONFIGS_VALUE_TYPE_HDFS_LEGACY = "hdfs";
public static final String STORAGE_CONFIGS_VALUE_TYPE_HADOOP = "Hadoop";
public static final String STORAGE_CONFIGS_VALUE_TYPE_S3 = "S3";
public static final String STORAGE_CONFIGS_VALUE_TYPE_OSS = "OSS";

public static final String AUTH_CONFIGS_KEY_TYPE = "auth.type";
public static final String AUTH_CONFIGS_KEY_PRINCIPAL = "auth.kerberos.principal";
Expand Down
18 changes: 17 additions & 1 deletion amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@
<artifactId>iceberg-aws</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aliyun</artifactId>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<exclusions>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down Expand Up @@ -163,7 +179,7 @@
<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-common</artifactId>
<version>${parent.version}</version>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<include>org.apache.iceberg:iceberg-orc</include>
<include>org.apache.iceberg:iceberg-parquet</include>
<include>org.apache.iceberg:iceberg-aws</include>
<include>org.apache.iceberg:iceberg-aliyun</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-common</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@
<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-format-paimon</artifactId>
<version>${parent.version}</version>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<include>org.apache.iceberg:iceberg-orc</include>
<include>org.apache.iceberg:iceberg-parquet</include>
<include>org.apache.iceberg:iceberg-aws</include>
<include>org.apache.iceberg:iceberg-aliyun</include>
<include>org.apache.parquet:parquet-column</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-common</include>
Expand Down
Loading

0 comments on commit f88b7a6

Please sign in to comment.