Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Table Concurrent query Failure handling in Delta Lake #24250

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location);
}
else {
setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory));
// deleteRecursivelyIfNothingExists ensures current CREATE TABLE doesn't delete directory if there's a conflict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we checking in the catch clause whether we're dealing with a TransactionConflictException instead?
By doing this, we'd likely know whether we're in a concurrency situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findinpath the rollback happens in a different thread AFAIK, the exception context needs to be passed on as well. Also the rollback initialisation happens in beginCreateTable & createTable calls which is much prior to finishCreateTable which is later.

setRollback(() -> deleteRecursivelyIfLogNotExists(fileSystem, deltaLogDirectory));
protocolEntry = protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties());
}

Expand Down Expand Up @@ -1447,7 +1448,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(
}
else {
checkPathContainsNoFiles(session, finalLocation);
setRollback(() -> deleteRecursivelyIfExists(fileSystemFactory.create(session), finalLocation));
setRollback(() -> deleteRecursivelyIfLogNotExists(fileSystemFactory.create(session), finalLocation));
protocolEntry = protocolEntryForNewTable(containsTimestampType, tableMetadata.getProperties());
}

Expand Down Expand Up @@ -1557,13 +1558,17 @@ private void checkColumnTypes(List<ColumnMetadata> columnMetadata)
}
}

private static void deleteRecursivelyIfExists(TrinoFileSystem fileSystem, Location path)
private static void deleteRecursivelyIfLogNotExists(TrinoFileSystem fileSystem, Location tablePath)
{
try {
fileSystem.deleteDirectory(path);
Location deltaLogDirectory = Location.of(getTransactionLogDir(tablePath.path()));
boolean transactionLogFileExists = fileSystem.listFiles(deltaLogDirectory).hasNext();
if (!transactionLogFileExists) {
fileSystem.deleteDirectory(tablePath);
}
}
catch (IOException e) {
LOG.warn(e, "IOException while trying to delete '%s'", path);
LOG.warn(e, "IOException while trying to delete '%s'", tablePath);
}
}

Expand Down Expand Up @@ -1704,7 +1709,11 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
// TODO perhaps it should happen in a background thread (https://github.com/trinodb/trino/issues/12011)
cleanupFailedWrite(session, handle.location(), dataFileInfos);
}
if (handle.readVersion().isEmpty()) {
// Table already exist and created by a concurrent transaction as there's conflict while writing the transaction log entry
if (e instanceof TransactionConflictException) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry, Table already exists", e);
}
else if (handle.readVersion().isEmpty()) {
Location transactionLogDir = Location.of(getTransactionLogDir(location));
try {
fileSystemFactory.create(session).deleteDirectory(transactionLogDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;

import static java.util.Objects.requireNonNull;

Expand All @@ -48,6 +49,9 @@ public void write(ConnectorSession session, String clusterId, Location newLogEnt
outputStream.write(entryContents);
}
}
catch (FileAlreadyExistsException e) {
throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.MoreFutures;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@TestInstance(PER_CLASS)
public class TestDeltaLakeLocalConcurrentCreateTableTest
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Path catalogDir = Files.createTempDirectory("catalog-dir");
closeAfterClass(() -> deleteRecursively(catalogDir, ALLOW_INSECURE));

return DeltaLakeQueryRunner.builder()
.addDeltaProperty("delta.unique-table-location", "false")
.addDeltaProperty("fs.hadoop.enabled", "true")
.addDeltaProperty("hive.metastore", "file")
.addDeltaProperty("hive.metastore.catalog.dir", catalogDir.toUri().toString())
.addDeltaProperty("hive.metastore.disable-location-checks", "true")
.build();
}

@Test
public void testConcurrentCreateTableAsSelect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this test to TestDeltaLakeLocalConcurrentWritesTest?

Copy link
Contributor Author

@vinay-kl vinay-kl Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebyhr should it be considered as write?, thought as it's a table getting created first time, thought let me keep it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we treat CTAS as a write operation. Please merge the test class.

Copy link
Contributor Author

@vinay-kl vinay-kl Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebyhr TestDeltaLakeLocalConcurrentWritesTest's queryRunner is instantiated with delta.unique-table-location=true, These test-cases which is part of CTAS requires delta.unique-table-location=true.

How to proceed here? as these aren't possible to set via session props as well.

throws Exception
{
testConcurrentCreateTableAsSelect(false);
testConcurrentCreateTableAsSelect(true);
}

private void testConcurrentCreateTableAsSelect(boolean partitioned)
throws InterruptedException
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);

try {
// Execute concurrent CTAS operations
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("CREATE TABLE test_ctas_1"
+ (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS SELECT 1 as a, 10 as part");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("CREATE TABLE test_ctas_2"
+ (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS SELECT 11 as a, 20 as part");
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
getQueryRunner().execute("CREATE TABLE test_ctas_3"
+ (partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS SELECT 21 as a, 30 as part");
return null;
})
.build())
.forEach(MoreFutures::getDone);

// Verify each table was created with correct data
assertThat(query("SELECT * FROM test_ctas_1")).matches("VALUES (1, 10)");
assertThat(query("SELECT * FROM test_ctas_2")).matches("VALUES (11, 20)");
assertThat(query("SELECT * FROM test_ctas_3")).matches("VALUES (21, 30)");

// Verify table histories
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"test_ctas_1$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"test_ctas_2$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"test_ctas_3$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
}
finally {
// Clean up
assertUpdate("DROP TABLE IF EXISTS test_ctas_1");
assertUpdate("DROP TABLE IF EXISTS test_ctas_2");
assertUpdate("DROP TABLE IF EXISTS test_ctas_3");
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}

@Test
public void testConcurrentCreateTableAsSelectSameTable()
throws Exception
{
testConcurrentCreateTableAsSelectSameTable(false);
testConcurrentCreateTableAsSelectSameTable(true);
}

private void testConcurrentCreateTableAsSelectSameTable(boolean partitioned)
throws Exception
{
int threads = 3;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
String tableName = "test_concurrent_ctas_" + randomNameSuffix();

try {
getQueryRunner().execute("create table stg_test as SELECT a, b, 20220101 as d FROM UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b)");

String selectString = " as ((select stg1.a as a, stg1.b as b, stg1.d as part from stg_test stg1, stg_test stg2 where stg1.d=stg2.d) " +
"union all (select stg1.a as a, stg1.b as b, stg1.d as part from stg_test stg1, stg_test stg2 where stg1.d=stg2.d))";

// Execute concurrent CTAS operations
executor.invokeAll(ImmutableList.<Callable<Void>>builder()
.add(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("CREATE TABLE " + tableName +
(partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + selectString);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
assertThat(trinoException).hasMessageContaining("Table already exists");

Check failure on line 155 in plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentCreateTableTest.java

View workflow job for this annotation

GitHub Actions / test (plugin/trino-delta-lake)

TestDeltaLakeLocalConcurrentCreateTableTest.testConcurrentCreateTableAsSelectSameTable

Expecting throwable message: "Expected response code from http://127.0.0.1:37713/v1/task/20241125_172542_00032_w5639.0.0.0/status to be 200, but was 408 Error 408 Timeout: Timed out (timeout delayed by 650 ms after scheduled time): AsyncCatchingFuture@1c91d213[status=PENDING, info=[inputFuture=[TimeoutFuture@50d39df2[status=PENDING, info=[inputFuture=[TransformFuture@1fbfa695[status=PENDING, info=[inputFuture=[SettableFuture@567a8006[status=PENDING]], function=[io.trino.execution.SqlTask$$Lambda/0x00007f2e483621f8@6b421c18]]]]]]], exceptionType=[class java.util.concurrent.TimeoutException], fallback=[io.airlift.concurrent.MoreFutures$$Lambda/0x00007f2e480ced30@79514a29]]]" to contain: "Table already exists" but did not. Throwable that failed the check: io.trino.spi.TrinoException: Expected response code from http://127.0.0.1:37713/v1/task/20241125_172542_00032_w5639.0.0.0/status to be 200, but was 408 Error 408 Timeout: Timed out (timeout delayed by 650 ms after scheduled time): AsyncCatchingFuture@1c91d213[status=PENDING, info=[inputFuture=[TimeoutFuture@50d39df2[status=PENDING, info=[inputFuture=[TransformFuture@1fbfa695[status=PENDING, info=[inputFuture=[SettableFuture@567a8006[status=PENDING]], function=[io.trino.execution.SqlTask$$Lambda/0x00007f2e483621f8@6b421c18]]]]]]], exceptionType=[class java.util.concurrent.TimeoutException], fallback=[io.airlift.concurrent.MoreFutures$$Lambda/0x00007f2e480ced30@79514a29]]] at io.trino.server.remotetask.SimpleHttpResponseHandler.onSuccess(SimpleHttpResponseHandler.java:62) at io.trino.server.remotetask.SimpleHttpResponseHandler.onSuccess(SimpleHttpResponseHandler.java:27) at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1137) at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1575)
}
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("CREATE TABLE " + tableName +
(partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + selectString);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
assertThat(trinoException).hasMessageContaining("Table already exists");
}
return null;
})
.add(() -> {
barrier.await(10, SECONDS);
try {
getQueryRunner().execute("CREATE TABLE " + tableName +
(partitioned ? " WITH (partitioned_by = ARRAY['part'])" : "") + selectString);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
assertThat(trinoException).hasMessageContaining("Table already exists");
}
return null;
})
.build())
.forEach(MoreFutures::getDone);

// Verify table exists and has one row
assertThat((long) computeScalar("SELECT count(*) FROM " + tableName)).isEqualTo(162036002L);

// Verify table history shows single creation
assertQuery(
"SELECT version, operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"",
"VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)");
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
assertUpdate("DROP TABLE IF EXISTS stg_test");
executor.shutdownNow();
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}
}
}
Loading