Skip to content

Commit

Permalink
summer-ospp: support import data as column storage only by load data …
Browse files Browse the repository at this point in the history
…and export data as orc format by select into outfile
  • Loading branch information
junyue authored and lijiu99999 committed Oct 24, 2023
1 parent cdd9768 commit ea280c4
Show file tree
Hide file tree
Showing 35 changed files with 3,192 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public enum OSSFileType {
OTHER("", "", ""),
TABLE_FORMAT("format", "%s_%s_%s.%s", "/tmp/%s_%s_%s.%s"),
TABLE_FILE("orc", "%s_%s_%s.%s", "/tmp/%s_%s_%s.%s"),
TABLE_META("bf", "%s_%s_%s_%s_%s.%s", "/tmp/%s_%s_%s_%s_%s.%s");
TABLE_META("bf", "%s_%s_%s_%s_%s.%s", "/tmp/%s_%s_%s_%s_%s.%s"),
EXPORT_ORC_FILE("orc", "%s_%s.%s", "../spill/temp/%s");

String suffix;
String remotePathFormat;
String localPathFormat;

OSSFileType(String suffix, String remotePathFormat, String localPathFormat) {
this.suffix = suffix;
this.remotePathFormat = remotePathFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.polardbx.common.oss.OSSFileType;

import static com.alibaba.polardbx.common.oss.OSSFileType.EXPORT_ORC_FILE;
import static com.alibaba.polardbx.common.oss.OSSFileType.TABLE_FILE;
import static com.alibaba.polardbx.common.oss.OSSFileType.TABLE_FORMAT;
import static com.alibaba.polardbx.common.oss.OSSFileType.TABLE_META;
Expand All @@ -28,7 +29,6 @@
public class OSSKey {
private static final String FILE_SUFFIX_FORMAT = "%s_%s";
private final OSSFileType fileType;

private String schemaName;
private String tableName;
private String tableFileId;
Expand All @@ -37,7 +37,8 @@ public class OSSKey {
private String columnName;
private long stripeIndex;

private OSSKey(OSSFileType fileType, String schemaName, String tableName, String tableFileId, String columnName, long stripeIndex) {
private OSSKey(OSSFileType fileType, String schemaName, String tableName, String tableFileId, String columnName,
long stripeIndex) {
this.fileType = fileType;
this.schemaName = schemaName;
this.tableName = tableName;
Expand All @@ -47,26 +48,36 @@ private OSSKey(OSSFileType fileType, String schemaName, String tableName, String
this.suffix = fileType.getSuffix();
}

public static OSSKey createBloomFilterFileOSSKey(String schemaName, String tableName, String tableFileId, String columnName, long stripeIndex) {
public static OSSKey createBloomFilterFileOSSKey(String schemaName, String tableName, String tableFileId,
String columnName, long stripeIndex) {
return new OSSKey(TABLE_META, schemaName, tableName, tableFileId, columnName, stripeIndex);
}

public static String localMetaPath(String schemaName, String tableName, String tableFileId, String columnName, long stripeIndex) {
return String.format(TABLE_META.getLocalPathFormat(), schemaName, tableName, tableFileId, columnName, stripeIndex, TABLE_META.getSuffix());
public static String localMetaPath(String schemaName, String tableName, String tableFileId, String columnName,
long stripeIndex) {
return String.format(TABLE_META.getLocalPathFormat(), schemaName, tableName, tableFileId, columnName,
stripeIndex, TABLE_META.getSuffix());
}

public static OSSKey createTableFileOSSKey(String schemaName, String tableName, String tableFileId) {
return new OSSKey(OSSFileType.TABLE_FILE, schemaName, tableName, tableFileId, null, 0);
}

public static OSSKey createTableFileOSSKey(String schemaName, String tableName, String fileSuffix, String tableFileId) {
public static OSSKey createTableFileOSSKey(String schemaName, String tableName, String fileSuffix,
String tableFileId) {
return new OSSKey(OSSFileType.TABLE_FILE, schemaName, tableName,
String.format(FILE_SUFFIX_FORMAT, fileSuffix, tableFileId),
null, 0);
}

public static OSSKey createExportOrcFileOSSKey(String path, String uniqueId) {
return new OSSKey(OSSFileType.EXPORT_ORC_FILE, path, null, uniqueId,
null, 0);
}

public static String localFilePath(String schemaName, String tableName, String tableFileId) {
return String.format(TABLE_FILE.getLocalPathFormat(), schemaName, tableName, tableFileId, TABLE_FILE.getSuffix());
return String.format(TABLE_FILE.getLocalPathFormat(), schemaName, tableName, tableFileId,
TABLE_FILE.getSuffix());
}

public static OSSKey createFormatFileOSSKey(String schemaName, String tableName, String tableFileId) {
Expand All @@ -80,11 +91,16 @@ public static String localFormatPath(String schemaName, String tableName) {
public String localPath() {
switch (fileType) {
case TABLE_FILE:
return String.format(TABLE_FILE.getLocalPathFormat(), schemaName, tableName, tableFileId, TABLE_FILE.getSuffix());
return String.format(TABLE_FILE.getLocalPathFormat(), schemaName, tableName, tableFileId,
TABLE_FILE.getSuffix());
case TABLE_FORMAT:
return String.format(TABLE_FORMAT.getLocalPathFormat(), schemaName, tableName, tableFileId, TABLE_FORMAT.getSuffix());
return String.format(TABLE_FORMAT.getLocalPathFormat(), schemaName, tableName, tableFileId,
TABLE_FORMAT.getSuffix());
case TABLE_META:
return String.format(TABLE_META.getLocalPathFormat(), schemaName, tableName, tableFileId, columnName, stripeIndex, TABLE_META.getSuffix());
return String.format(TABLE_META.getLocalPathFormat(), schemaName, tableName, tableFileId, columnName,
stripeIndex, TABLE_META.getSuffix());
case EXPORT_ORC_FILE:
return String.format(EXPORT_ORC_FILE.getLocalPathFormat(), schemaName, EXPORT_ORC_FILE.getSuffix());
}
return null;
}
Expand Down Expand Up @@ -125,7 +141,10 @@ public String toString() {
case TABLE_FILE:
return String.format(fileType.getRemotePathFormat(), schemaName, tableName, tableFileId, suffix);
case TABLE_META:
return String.format(fileType.getRemotePathFormat(), schemaName, tableName, tableFileId, columnName, stripeIndex, suffix);
return String.format(fileType.getRemotePathFormat(), schemaName, tableName, tableFileId, columnName,
stripeIndex, suffix);
case EXPORT_ORC_FILE:
return String.format(fileType.getRemotePathFormat(), schemaName, tableFileId, suffix);
}
return super.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,18 @@ public static void addSupportedParam(ConfigParam param) {
public static final IntConfigParam PARALLELISM = new IntConfigParam(
ConnectionProperties.PARALLELISM, -1, Integer.MAX_VALUE, -1, true);

public static final IntConfigParam OSS_LOAD_DATA_PRODUCERS =
new IntConfigParam(ConnectionProperties.OSS_LOAD_DATA_PRODUCERS, -1, Integer.MAX_VALUE, 2, true);

public static final IntConfigParam OSS_LOAD_DATA_MAX_CONSUMERS =
new IntConfigParam(ConnectionProperties.OSS_LOAD_DATA_MAX_CONSUMERS, -1, Integer.MAX_VALUE, 32, true);

public static final IntConfigParam OSS_LOAD_DATA_FLUSHERS =
new IntConfigParam(ConnectionProperties.OSS_LOAD_DATA_FLUSHERS, -1, Integer.MAX_VALUE, 16, true);

public static final IntConfigParam OSS_LOAD_DATA_UPLOADERS =
new IntConfigParam(ConnectionProperties.OSS_LOAD_DATA_UPLOADERS, -1, Integer.MAX_VALUE, 2, true);

public static final IntConfigParam PREFETCH_SHARDS = new IntConfigParam(
ConnectionProperties.PREFETCH_SHARDS, -1, Integer.MAX_VALUE, -1, true);

Expand Down Expand Up @@ -2575,6 +2587,10 @@ public static class ConnectionParamValues {
ConnectionProperties.OSS_BLOOM_FILTER_FPP, .01f, .05f, .01f, true);
public static final LongConfigParam OSS_MAX_ROWS_PER_FILE = new LongConfigParam(
ConnectionProperties.OSS_MAX_ROWS_PER_FILE, 1_000L, 1000_000_000L, 5_000_000L, true);

public static final LongConfigParam OSS_EXPORT_MAX_ROWS_PER_FILE = new LongConfigParam(
ConnectionProperties.OSS_EXPORT_MAX_ROWS_PER_FILE, 1_000L, 1000_000_000L, 1000_000L, true);

public static final BooleanConfigParam OSS_REMOVE_TMP_FILES = new BooleanConfigParam(
ConnectionProperties.OSS_REMOVE_TMP_FILES, true, true);
public static final StringConfigParam OSS_ORC_COMPRESSION = new StringConfigParam(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,29 @@ public class ConnectionProperties {

public static final String PARALLELISM = "PARALLELISM";

/**
* Number of producers to run oss load data
*/
public static final String OSS_LOAD_DATA_PRODUCERS = "OSS_LOAD_DATA_PRODUCERS";

/**
* Number of max consumers to run oss load data
*/
public static final String OSS_LOAD_DATA_MAX_CONSUMERS = "OSS_LOAD_DATA_MAX_CONSUMERS";

/**
* Number of uploaders to run oss load data
*/
public static final String OSS_LOAD_DATA_FLUSHERS = "OSS_LOAD_DATA_FLUSHERS";

/**
* Number of uploaders to run oss load data
*/
public static final String OSS_LOAD_DATA_UPLOADERS = "OSS_LOAD_DATA_UPLOADERS";

/**
* Number of shards to prefetch (only take effect under parallel query)
*/
public static final String PREFETCH_SHARDS = "PREFETCH_SHARDS";

public static final String MAX_CACHE_PARAMS = "MAX_CACHE_PARAMS";
Expand Down Expand Up @@ -1714,6 +1737,9 @@ protected Object clone() throws CloneNotSupportedException {
public static final String OSS_ORC_INDEX_STRIDE = "OSS_ORC_INDEX_STRIDE";
public static final String OSS_BLOOM_FILTER_FPP = "OSS_BLOOM_FILTER_FPP";
public static final String OSS_MAX_ROWS_PER_FILE = "OSS_MAX_ROWS_PER_FILE";

public static final String OSS_EXPORT_MAX_ROWS_PER_FILE = "OSS_EXPORT_MAX_ROWS_PER_FILE";

public static final String OSS_REMOVE_TMP_FILES = "OSS_REMOVE_TMP_FILES";
public static final String OSS_ORC_COMPRESSION = "OSS_ORC_COMPRESSION";
/* ================ For OSS Table File System ================ */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.alibaba.polardbx.executor.cursor.impl;

import com.alibaba.polardbx.common.exception.MemoryNotEnoughException;
import com.alibaba.polardbx.common.exception.TddlNestableRuntimeException;
import com.alibaba.polardbx.common.properties.ConnectionParams;
import com.alibaba.polardbx.common.utils.logger.Logger;
import com.alibaba.polardbx.common.utils.logger.LoggerFactory;
import com.alibaba.polardbx.executor.chunk.BlockBuilder;
import com.alibaba.polardbx.executor.chunk.BlockBuilders;
import com.alibaba.polardbx.executor.chunk.Chunk;
import com.alibaba.polardbx.executor.cursor.AbstractCursor;
import com.alibaba.polardbx.executor.cursor.Cursor;
import com.alibaba.polardbx.executor.operator.spill.OrcWriter;
import com.alibaba.polardbx.optimizer.config.table.ColumnMeta;
import com.alibaba.polardbx.optimizer.context.ExecutionContext;
import com.alibaba.polardbx.optimizer.core.CursorMeta;
import com.alibaba.polardbx.optimizer.core.datatype.DataType;
import com.alibaba.polardbx.optimizer.core.row.ArrayRow;
import com.alibaba.polardbx.optimizer.core.row.Row;
import com.alibaba.polardbx.optimizer.memory.MemoryAllocatorCtx;
import com.alibaba.polardbx.optimizer.memory.MemoryPool;
import com.alibaba.polardbx.optimizer.memory.MemoryType;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.calcite.sql.OutFileParams;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class OutOrcFileCursor extends AbstractCursor {
protected static final Logger log = LoggerFactory.getLogger(OutOrcFileCursor.class);
protected DataType[] dataTypes;
private Cursor cursor;
private BlockBuilder[] blockBuilders;
protected CursorMeta cursorMeta;
protected ExecutionContext context;
protected int affectRow = 0;
protected MemoryPool pool;
protected MemoryAllocatorCtx memoryAllocator;
protected OrcWriter orcWriter;
protected ListenableFuture<?> orcWriterFuture;

public OutOrcFileCursor(
ExecutionContext context, Cursor cursor, OutFileParams outFileParams) {
super(false);
this.cursorMeta = CursorMeta.build(cursor.getReturnColumns());
this.dataTypes = new DataType[cursor.getReturnColumns().size()];
for (int i = 0; i < dataTypes.length; i++) {
dataTypes[i] = cursor.getReturnColumns().get(i).getDataType();
}
this.cursor = cursor;
outFileParams.setColumnMeata(cursor.getReturnColumns());
this.context = context;
createBlockBuilders();

orcWriter = new OrcWriter(context, outFileParams, dataTypes);
String memoryName = getClass().getSimpleName() + "@" + System.identityHashCode(this);
this.pool = context.getMemoryPool().getOrCreatePool(
memoryName,
context.getParamManager().getLong(ConnectionParams.SPILL_OUTPUT_MAX_BUFFER_SIZE),
MemoryType.OPERATOR);
memoryAllocator = pool.getMemoryAllocatorCtx();
}

protected final void createBlockBuilders() {
if (blockBuilders == null) {
blockBuilders = new BlockBuilder[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
blockBuilders[i] = BlockBuilders.create(dataTypes[i], context);
}
}
}

protected final void getOrcWriterFuture() {
if (orcWriterFuture != null) {
try {
orcWriterFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

@Override
protected Row doNext() {
Row currentRow;
if (null == cursor || (currentRow = cursor.next()) == null) {
return null;
}

List<Row> bufferRows = new ArrayList<>();
while (currentRow != null) {
try {
long rowSize = currentRow.estimateSize();

if (currentRow instanceof Chunk.ChunkRow) {
bufferRows.add(currentRow);
memoryAllocator.allocateReservedMemory(rowSize);
} else {
bufferRows.add(new ArrayRow(cursorMeta, currentRow.getValues().toArray(), rowSize));
memoryAllocator.allocateReservedMemory(rowSize);
}

} catch (MemoryNotEnoughException t) {
affectRow += bufferRows.size();
orcWriter.writeRows(bufferRows);
// getOrcWriterFuture();
bufferRows = new ArrayList<>();
memoryAllocator.releaseReservedMemory(memoryAllocator.getReservedAllocated(), true);
} catch (Throwable t) {
throw new TddlNestableRuntimeException(t);
}

currentRow = cursor.next();
}

if (bufferRows.size() > 0) {
affectRow += bufferRows.size();
orcWriter.writeRows(bufferRows);
}

orcWriter.writeRowsFinish();

orcWriter.uploadToOss();

memoryAllocator.releaseReservedMemory(memoryAllocator.getReservedAllocated(), true);

if (orcWriter != null) {
orcWriter.close(false);
}

ArrayRow arrayRow = new ArrayRow(1, cursorMeta);
arrayRow.setObject(0, affectRow);
arrayRow.setCursorMeta(cursorMeta);
return arrayRow;
}

@Override
protected List<Throwable> doClose(List<Throwable> exceptions) {
boolean needClearFile = exceptions.size() > 0;
try {
if (orcWriterFuture != null && !orcWriterFuture.isDone()) {
orcWriterFuture.cancel(true);
needClearFile = true;
}
} catch (Throwable t) {
exceptions.add(t);
}
try {
orcWriter.close(needClearFile);
} catch (Throwable t) {
exceptions.add(t);
} finally {
if (cursor != null) {
exceptions = cursor.close(exceptions);
}
}
return exceptions;
}

@Override
public List<ColumnMeta> getReturnColumns() {
return cursorMeta.getColumns();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ public static void addOssFileMeta(Connection metaDbConn, String tableSchema, Str
tableInfoManager.addOssFile(tableSchema, tableName, filesRecord);
}

public static void addOssFileWithTso(Connection metaDbConn, String tableSchema, String tableName,
FilesRecord filesRecord) {
TableInfoManager tableInfoManager = new TableInfoManager();
tableInfoManager.setConnection(metaDbConn);
tableInfoManager.addOssFileWithTso(tableSchema, tableName, filesRecord);
}

public static void changeOssFile(Connection metaDbConn, Long primaryKey, Long fileSize) {
changeOssFile(metaDbConn, primaryKey, new byte[] {}, fileSize, 0L);
}
Expand Down
Loading

0 comments on commit ea280c4

Please sign in to comment.