Skip to content

Commit

Permalink
Fix IOError on WAL write doesn't propagate to write group follower
Browse files Browse the repository at this point in the history
Summary:
This is a simpler version of #3097 by removing all unrelated changes.

Fixing the bug where concurrent writes may get Status::OK while it actually gets IOError on WAL write. This happens when multiple writes form a write batch group, and the leader get an IOError while writing to WAL. The leader failed to pass the error to followers in the group, and the followers end up returning Status::OK() while actually writing nothing. The bug only affect writes in a batch group. Future writes after the batch group will correctly return immediately with the IOError.
Closes #3201

Differential Revision: D6421644

Pulled By: yiwu-arbug

fbshipit-source-id: 1c2a455c5b73f6842423785eb8a9dbfbb191dc0e
  • Loading branch information
Yi Wu committed Nov 28, 2017
1 parent 9e47084 commit 7513f63
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Rocksdb Change Log
* Fix IOError on WAL write doesn't propagate to write group follower

## 5.8.6 (11/20/2017)
### Bug Fixes
* Fixed aligned_alloc issues with Windows.
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
write_thread_.ExitAsBatchGroupLeader(write_group, status);
}

if (status.ok()) {
Expand Down Expand Up @@ -543,7 +543,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(status);
}
nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status);
if (status.ok()) {
status = w.FinalStatus();
}
Expand Down
50 changes: 49 additions & 1 deletion db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include <atomic>
#include <memory>
#include <thread>
#include <vector>
#include "db/db_test_util.h"
#include "db/write_batch_internal.h"
#include "db/write_thread.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/string_util.h"
#include "util/sync_point.h"

namespace rocksdb {
Expand All @@ -18,7 +23,9 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
public:
DBWriteTest() : DBTestBase("/db_write_test") {}

void Open() { DBTestBase::Reopen(GetOptions(GetParam())); }
Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }

void Open() { DBTestBase::Reopen(GetOptions()); }
};

// Sequence number should be return through input write batch.
Expand Down Expand Up @@ -67,6 +74,47 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
}
}

TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
constexpr int kNumThreads = 5;
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(Env::Default()));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
std::atomic<int> ready_count{0};
std::atomic<int> leader_count{0};
std::vector<port::Thread> threads;
mock_env->SetFilesystemActive(false);
// Wait until all threads linked to write threads, to make sure
// all threads join the same batch group.
SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
ready_count++;
auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
if (w->state == WriteThread::STATE_GROUP_LEADER) {
leader_count++;
while (ready_count < kNumThreads) {
// busy waiting
}
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < kNumThreads; i++) {
threads.push_back(port::Thread(
[&](int index) {
// All threads should fail.
ASSERT_FALSE(Put("key" + ToString(index), "value").ok());
},
i));
}
for (int i = 0; i < kNumThreads; i++) {
threads[i].join();
}
ASSERT_EQ(1, leader_count);
// Close before mock_env destruct.
Close();
}

INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
Expand Down
5 changes: 5 additions & 0 deletions db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);

// Propagate memtable write error to the whole group.
if (status.ok() && !write_group.status.ok()) {
status = write_group.status;
}

if (enable_pipelined_write_) {
// Notify writers don't write to memtable to exit.
for (Writer* w = last_writer; w != leader;) {
Expand Down

0 comments on commit 7513f63

Please sign in to comment.