Skip to content

Commit

Permalink
Refactoring the ParquetTools read/write APIs (deephaven#5358)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Apr 22, 2024
1 parent 87217e7 commit c6543b2
Show file tree
Hide file tree
Showing 17 changed files with 1,323 additions and 879 deletions.
30 changes: 18 additions & 12 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ public boolean accept(File dir, String name) {
};
private final static String[] EMPTY_STRING_ARRAY = new String[0];

public static final Pattern DUPLICATE_SLASH_PATTERN = Pattern.compile("//+");
public static final char URI_SEPARATOR_CHAR = '/';

public static final String URI_SEPARATOR = "" + URI_SEPARATOR_CHAR;

public static final String REPEATED_URI_SEPARATOR = URI_SEPARATOR + URI_SEPARATOR;

public static final Pattern REPEATED_URI_SEPARATOR_PATTERN = Pattern.compile("//+");

/**
* Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an
Expand Down Expand Up @@ -258,7 +264,7 @@ public boolean accept(File pathname) {

/**
* Take the file source path or URI string and convert it to a URI object. Any unnecessary path separators will be
* removed.
* removed. The URI object will always be {@link URI#isAbsolute() absolute}, i.e., will always have a scheme.
*
* @param source The file source path or URI
* @param isDirectory Whether the source is a directory
Expand All @@ -273,8 +279,8 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
uri = new URI(source);
// Replace two or more consecutive slashes in the path with a single slash
final String path = uri.getPath();
if (path.contains("//")) {
final String canonicalizedPath = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/");
if (path.contains(REPEATED_URI_SEPARATOR)) {
final String canonicalizedPath = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR);
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
uri.getQuery(), uri.getFragment());
}
Expand All @@ -300,17 +306,17 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
*/
public static URI convertToURI(final File file, final boolean isDirectory) {
String absPath = file.getAbsolutePath();
if (File.separatorChar != '/') {
absPath = absPath.replace(File.separatorChar, '/');
if (File.separatorChar != URI_SEPARATOR_CHAR) {
absPath = absPath.replace(File.separatorChar, URI_SEPARATOR_CHAR);
}
if (absPath.charAt(0) != '/') {
absPath = "/" + absPath;
if (absPath.charAt(0) != URI_SEPARATOR_CHAR) {
absPath = URI_SEPARATOR_CHAR + absPath;
}
if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') {
absPath = absPath + "/";
if (isDirectory && absPath.charAt(absPath.length() - 1) != URI_SEPARATOR_CHAR) {
absPath = absPath + URI_SEPARATOR_CHAR;
}
if (absPath.startsWith("//")) {
absPath = "//" + absPath;
if (absPath.startsWith(REPEATED_URI_SEPARATOR)) {
absPath = REPEATED_URI_SEPARATOR + absPath;
}
try {
return new URI("file", null, absPath, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ private int priorityOf(@NotNull final ColumnSource<?> keyColumn) {
@NotNull
private static Collection<ColumnSource<?>> getColumnSources(
@NotNull final Table table,
@NotNull final String... keyColumnNames) {
return Arrays.stream(keyColumnNames)
@NotNull final Collection<String> keyColumnNames) {
return keyColumnNames.stream()
.map(table::getColumnSource)
.collect(Collectors.toList());
}
Expand All @@ -107,16 +107,29 @@ private static Collection<ColumnSource<?>> getColumnSources(
* @param keyColumnNames The key column names to check
* @return Whether {@code table} has a DataIndexer with a {@link DataIndex} for the given key columns
*/
public static boolean hasDataIndex(@NotNull Table table, @NotNull final String... keyColumnNames) {
if (keyColumnNames.length == 0) {
public static boolean hasDataIndex(@NotNull final Table table, @NotNull final String... keyColumnNames) {
return hasDataIndex(table, Arrays.asList(keyColumnNames));
}

/**
* Test whether {@code table} has a DataIndexer with a usable {@link DataIndex} for the given key columns. Note that
* a result from this method is a snapshot of current state, and does not guarantee anything about future calls to
* {@link #hasDataIndex}, {@link #getDataIndex}, or {@link #getOrCreateDataIndex(Table, String...)}.
*
* @param table The {@link Table} to check
* @param keyColumnNames The key column names to check
* @return Whether {@code table} has a DataIndexer with a {@link DataIndex} for the given key columns
*/
public static boolean hasDataIndex(@NotNull final Table table, @NotNull final Collection<String> keyColumnNames) {
if (keyColumnNames.isEmpty()) {
return false;
}
table = table.coalesce();
final DataIndexer indexer = DataIndexer.existingOf(table.getRowSet());
final Table tableToUse = table.coalesce();
final DataIndexer indexer = DataIndexer.existingOf(tableToUse.getRowSet());
if (indexer == null) {
return false;
}
return indexer.hasDataIndex(getColumnSources(table, keyColumnNames));
return indexer.hasDataIndex(getColumnSources(tableToUse, keyColumnNames));
}

/**
Expand Down Expand Up @@ -152,19 +165,34 @@ public boolean hasDataIndex(@NotNull final Collection<ColumnSource<?>> keyColumn
* index is no longer live.
*
* @param table The {@link Table} to check
* @param keyColumnNames The key column for which to retrieve a DataIndex
* @param keyColumnNames The key columns for which to retrieve a DataIndex
* @return The {@link DataIndex}, or {@code null} if one does not exist
*/
public static DataIndex getDataIndex(@NotNull Table table, final String... keyColumnNames) {
if (keyColumnNames.length == 0) {
@Nullable
public static DataIndex getDataIndex(@NotNull final Table table, final String... keyColumnNames) {
return getDataIndex(table, Arrays.asList(keyColumnNames));
}

/**
* If {@code table} has a DataIndexer, return a {@link DataIndex} for the given key columns, or {@code null} if no
* such index exists, if the cached index is invalid, or if the {@link DataIndex#isRefreshing() refreshing} cached
* index is no longer live.
*
* @param table The {@link Table} to check
* @param keyColumnNames The key columns for which to retrieve a DataIndex
* @return The {@link DataIndex}, or {@code null} if one does not exist
*/
@Nullable
public static DataIndex getDataIndex(@NotNull final Table table, final Collection<String> keyColumnNames) {
if (keyColumnNames.isEmpty()) {
return null;
}
table = table.coalesce();
final DataIndexer indexer = DataIndexer.existingOf(table.getRowSet());
final Table tableToUse = table.coalesce();
final DataIndexer indexer = DataIndexer.existingOf(tableToUse.getRowSet());
if (indexer == null) {
return null;
}
return indexer.getDataIndex(getColumnSources(table, keyColumnNames));
return indexer.getDataIndex(getColumnSources(tableToUse, keyColumnNames));
}

/**
Expand Down Expand Up @@ -239,13 +267,28 @@ public static DataIndex getOptimalPartialIndex(Table table, final String... keyC
public static DataIndex getOrCreateDataIndex(
@NotNull final Table table,
@NotNull final String... keyColumnNames) {
if (keyColumnNames.length == 0) {
return getOrCreateDataIndex(table, Arrays.asList(keyColumnNames));
}

/**
* Create a {@link DataIndex} for {@code table} indexing {@code keyColumns}, if no valid, live data index already
* exists for these inputs.
*
* @param table The {@link Table} to index
* @param keyColumnNames The key column names to include
* @return The existing or newly created {@link DataIndex}
* @apiNote This method causes the returned {@link DataIndex} to be managed by the enclosing liveness manager.
*/
public static DataIndex getOrCreateDataIndex(
@NotNull final Table table,
@NotNull final Collection<String> keyColumnNames) {
if (keyColumnNames.isEmpty()) {
throw new IllegalArgumentException("Cannot create a DataIndex without any key columns");
}
final QueryTable tableToUse = (QueryTable) table.coalesce();
final DataIndexer dataIndexer = DataIndexer.of(tableToUse.getRowSet());
return dataIndexer.rootCache.computeIfAbsent(dataIndexer.pathFor(getColumnSources(tableToUse, keyColumnNames)),
() -> new TableBackedDataIndex(tableToUse, keyColumnNames));
() -> new TableBackedDataIndex(tableToUse, keyColumnNames.toArray(String[]::new)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static io.deephaven.base.FileUtils.URI_SEPARATOR;

/**
* Extracts a key-value partitioned table layout from a stream of URIs.
*/
public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocationKey>
extends KeyValuePartitionLayout<TLK, URI> {

private static final String URI_SEPARATOR = "/";

protected final URI tableRootDirectory;
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
private final int maxPartitioningLevels;
Expand Down Expand Up @@ -96,7 +96,7 @@ private void getPartitions(@NotNull final URI relativePath,
@NotNull final TIntObjectMap<ColumnNameInfo> partitionColInfo,
final boolean registered) {
final String relativePathString = relativePath.getPath();
// The following assumes that there is exactly one URI_SEPARATOR between each subdirectory in the path
// The following assumes that there is exactly one separator between each subdirectory in the path
final String[] subDirs = relativePathString.split(URI_SEPARATOR);
final int numPartitioningCol = subDirs.length - 1;
if (registered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.engine.table.impl.sources.regioned;

import io.deephaven.base.FileUtils;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.*;
import io.deephaven.stringset.ArrayStringSet;
Expand Down Expand Up @@ -226,36 +225,34 @@ public void setUp() throws Exception {
final String tableName = "TestTable";

final PartitionedTable partitionedInputData = inputData.partitionBy("PC");
final File[] partitionedInputDestinations;
final String[] partitionedInputDestinations;
try (final Stream<String> partitionNames = partitionedInputData.table()
.<String>objectColumnIterator("PC").stream()) {
partitionedInputDestinations = partitionNames.map(pcv -> new File(dataDirectory,
"IP" + File.separator + "P" + pcv + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME))
.toArray(File[]::new);
+ PARQUET_FILE_NAME)
.getPath())
.toArray(String[]::new);
}
ParquetTools.writeParquetTables(
ParquetTools.writeTables(
partitionedInputData.constituents(),
partitionedDataDefinition.getWritable(),
parquetInstructions,
partitionedInputDestinations,
CollectionUtil.ZERO_LENGTH_STRING_ARRAY_ARRAY);
parquetInstructions.withTableDefinition(partitionedDataDefinition.getWritable()));

final PartitionedTable partitionedInputMissingData = inputMissingData.view("PC", "II").partitionBy("PC");
final File[] partitionedInputMissingDestinations;
final String[] partitionedInputMissingDestinations;
try (final Stream<String> partitionNames = partitionedInputMissingData.table()
.<String>objectColumnIterator("PC").stream()) {
partitionedInputMissingDestinations = partitionNames.map(pcv -> new File(dataDirectory,
"IP" + File.separator + "P" + pcv + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME))
.toArray(File[]::new);
+ PARQUET_FILE_NAME)
.getPath())
.toArray(String[]::new);
}
ParquetTools.writeParquetTables(
ParquetTools.writeTables(
partitionedInputMissingData.constituents(),
partitionedMissingDataDefinition.getWritable(),
parquetInstructions,
partitionedInputMissingDestinations,
CollectionUtil.ZERO_LENGTH_STRING_ARRAY_ARRAY);
parquetInstructions.withTableDefinition(partitionedMissingDataDefinition.getWritable()));

expected = TableTools
.merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -37,6 +42,76 @@ public class ParquetFileReader {
private final URI rootURI;
private final MessageType type;

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) {
try {
return createChecked(parquetFile, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) {
try {
return createChecked(parquetFileURI, specialInstructions);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
@Nullable final Object specialInstructions) throws IOException {
return createChecked(convertToURI(parquetFile, false), specialInstructions);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating
* channels
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
@Nullable final Object specialInstructions) throws IOException {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
parquetFileURI, specialInstructions);
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7));
}

/**
* Create a new ParquetFileReader for the provided source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;

public final class ParquetUtils {

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
public static final String PARQUET_FILE_EXTENSION = ".parquet";
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
private static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);

Expand All @@ -35,6 +41,20 @@ public static String getPerFileMetadataKey(final String filePath) {
return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_');
}

/**
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
*/
public static boolean isParquetFile(@NotNull final String source) {
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
if (File.separatorChar != URI_SEPARATOR_CHAR) {
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
}
return ret;
}

/**
* Check if the provided path points to a non-hidden parquet file, and that none of its parents (till rootDir) are
* hidden.
Expand Down
Loading

0 comments on commit c6543b2

Please sign in to comment.