Skip to content

Commit

Permalink
Merge branch 'unstable' into bugfix/xread-key-range
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Nov 14, 2024
2 parents 06d1ad5 + 1974d98 commit 29b08bd
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

enum class MigrationType {
/// Use Redis commands to migrate data.
/// It will trying to extract commands from existing data and log, then replay
/// It will try to extract commands from existing data and log, then replay
/// them on the destination node.
kRedisCommand = 0,
/// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
Expand Down
4 changes: 4 additions & 0 deletions src/commands/cmd_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class CommandExec : public Commander {
auto s = storage->BeginTxn();
if (s.IsOK()) {
conn->ExecuteCommands(conn->GetMultiExecCommands());
// In Redis, errors happening after EXEC instead are not handled in a special way:
// all the other commands will be executed even if some command fails during
// the transaction.
// So, if conn->IsMultiError(), the transaction should still be committed.
s = storage->CommitTxn();
}
return s;
Expand Down
2 changes: 1 addition & 1 deletion src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ constexpr const size_t GiB = 1024L * MiB;
constexpr const uint32_t kDefaultPort = 6666;

constexpr const char *kDefaultNamespace = "__namespace";
constexpr const size_t KVROCKS_MAX_LSM_LEVEL = 7;
constexpr int KVROCKS_MAX_LSM_LEVEL = 7;

enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC };

Expand Down
2 changes: 1 addition & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

// We don't execute commands, but queue them, and then execute in EXEC command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) {
multi_cmds_.emplace_back(cmd_tokens);
multi_cmds_.emplace_back(std::move(cmd_tokens));
Reply(redis::SimpleString("QUEUED"));
continue;
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/batch_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

#include "storage.h"

// WriteBatchIndexer traverses the operations in WriteBatch and appends to the specified WriteBatchWithIndex
/// WriteBatchIndexer traverses the operations in WriteBatch and appends to the
/// specified WriteBatchWithIndex.
class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
public:
explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch,
Expand All @@ -41,6 +42,7 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
}
explicit WriteBatchIndexer(engine::Context& ctx)
: WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}

rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override {
return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value);
}
Expand Down
29 changes: 16 additions & 13 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,14 +597,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
std::string *value) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else if (ctx.batch && ctx.is_txn_mode) {
} else if (ctx.batch && ctx.txn_context_enabled) {
s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
Expand All @@ -622,14 +622,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
rocksdb::PinnableSlice *value) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else if (ctx.is_txn_mode && ctx.batch) {
} else if (ctx.txn_context_enabled && ctx.batch) {
s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
} else {
s = db_->Get(options, column_family, key, value);
Expand All @@ -655,14 +655,14 @@ void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_famil

rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
auto iter = db_->NewIterator(options, column_family);
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->NewIteratorWithBase(column_family, iter, &options);
} else if (ctx.is_txn_mode && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) {
} else if (ctx.txn_context_enabled && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) {
return ctx.batch->NewIteratorWithBase(column_family, iter, &options);
}
return iter;
Expand All @@ -671,14 +671,14 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea
void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle *column_family, const size_t num_keys, const rocksdb::Slice *keys,
rocksdb::PinnableSlice *values, rocksdb::Status *statuses) {
if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
false);
} else if (ctx.is_txn_mode && ctx.batch) {
} else if (ctx.txn_context_enabled && ctx.batch) {
ctx.batch->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses, false);
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
Expand All @@ -700,18 +700,21 @@ rocksdb::Status Storage::Write(engine::Context &ctx, const rocksdb::WriteOptions

rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates) {
// Put replication id logdata at the end of write batch
// Put replication id logdata at the end of `updates`.
if (replid_.length() == kReplIdLength) {
updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode());
}

if (ctx.is_txn_mode) {
if (ctx.txn_context_enabled) {
// Extract writes from the updates and append to the ctx.batch
if (ctx.batch == nullptr) {
ctx.batch = std::make_unique<rocksdb::WriteBatchWithIndex>();
}
WriteBatchIndexer handle(ctx);
auto s = updates->Iterate(&handle);
if (!s.ok()) return s;
} else {
DCHECK(ctx.batch == nullptr);
}

return db_->Write(options, updates);
Expand Down Expand Up @@ -1278,19 +1281,19 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d

[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
rocksdb::ReadOptions read_options;
if (is_txn_mode) read_options.snapshot = GetSnapshot();
if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}

[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
if (is_txn_mode) read_options.snapshot = GetSnapshot();
if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}

[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
if (is_txn_mode) read_options.snapshot = GetSnapshot();
if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}

Expand Down
49 changes: 33 additions & 16 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,12 @@ class Storage {
rocksdb::ColumnFamilyHandle *column_family);
rocksdb::Iterator *NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options);

[[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates);
const rocksdb::WriteOptions &DefaultWriteOptions() { return default_write_opts_; }
const rocksdb::WriteOptions &DefaultWriteOptions() const { return default_write_opts_; }
rocksdb::ReadOptions DefaultScanOptions() const;
rocksdb::ReadOptions DefaultMultiGetOptions() const;

[[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates);
[[nodiscard]] rocksdb::Status Delete(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key);
[[nodiscard]] rocksdb::Status DeleteRange(engine::Context &ctx, const rocksdb::WriteOptions &options,
Expand Down Expand Up @@ -336,6 +337,9 @@ class Storage {
void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; }
bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; }

/// Redis PSYNC relies on a Unique Replication Sequence Id when use-rsid-psync
/// enabled.
/// ShiftReplId would generate an Id and write it to propagate cf.
Status ShiftReplId(engine::Context &ctx);
std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
std::string GetReplIdFromDbEngine();
Expand Down Expand Up @@ -363,6 +367,8 @@ class Storage {

std::atomic<bool> db_in_retryable_io_error_{false};

// is_txn_mode_ is used to determine whether the current Storage is in transactional mode,
// .i.e, in "EXEC" command(CommandExec).
std::atomic<bool> is_txn_mode_ = false;
// txn_write_batch_ is used as the global write batch for the transaction mode,
// all writes will be grouped in this write batch when entering the transaction mode,
Expand All @@ -380,39 +386,48 @@ class Storage {

/// Context passes fixed snapshot and batch between APIs
///
/// Limitations: Performing a large number of writes on the same Context may reduce performance.
/// Please choose to use the same Context or create a new Context based on the actual situation.
/// Limitations: Performing a large number of writes or apply operations like DeleteRange
/// on the same Context may reduce performance.
/// Please choose to use the same Context or create a new Context based on the actual
/// situation.
///
/// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs.
struct Context {
engine::Storage *storage = nullptr;

/// batch can be nullptr if
/// 1. The Context is not in transactional mode.
/// 2. The Context is in transactional mode, but no write operation is performed.
std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;

/// is_txn_mode is used to determine whether the current Context is in transactional mode,
/// txn_context_enabled is used to determine whether the current Context is in transactional mode,
/// if it is not transactional mode, then Context is equivalent to a Storage.
/// If the configuration of txn-context-enabled is no, it is false.
bool is_txn_mode = true;
bool txn_context_enabled = true;

/// NoTransactionContext returns a Context with a is_txn_mode of false
static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, false); }
static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, /*txn_mode=*/false); }

/// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, then its snapshot is specified by the
/// Context
/// GetReadOptions returns a default ReadOptions, and if txn_context_enabled = true,
/// then its snapshot is specified by the Context.
/// Otherwise it is the same as Storage::DefaultReadOptions().
[[nodiscard]] rocksdb::ReadOptions GetReadOptions();
/// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = true, then its snapshot is specified by the
/// Context. Otherwise it is the same as Storage::DefaultScanOptions
/// DefaultScanOptions returns a DefaultScanOptions, and if txn_context_enabled = true,
/// then its snapshot is specified by the Context.
/// Otherwise it is the same as Storage::DefaultScanOptions().
[[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
/// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if is_txn_mode = true, then its snapshot is specified
/// by the Context. Otherwise it is the same as Storage::DefaultMultiGetOptions
/// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if txn_context_enabled = true,
/// then its snapshot is specified by the Context.
/// Otherwise it is the same as Storage::DefaultMultiGetOptions
[[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();

void RefreshLatestSnapshot();

/// TODO: Change it to defer getting the context, and the snapshot is pinned after the first read operation
explicit Context(engine::Storage *storage)
: storage(storage), is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
: storage(storage), txn_context_enabled(storage->GetConfig()->txn_context_enabled) {}
~Context() {
// A moved-from object doesn't have `storage`.
if (storage) {
if (snapshot_ && storage->GetDB()) {
storage->GetDB()->ReleaseSnapshot(snapshot_);
Expand Down Expand Up @@ -441,14 +456,16 @@ struct Context {
// and it's not a thread-safe operation.
const rocksdb::Snapshot *GetSnapshot() {
if (snapshot_ == nullptr) {
// Should not acquire a snapshot_ on a moved-from object.
DCHECK(storage != nullptr);
snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT
}
return snapshot_;
}

private:
/// It is only used by NonTransactionContext
explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), is_txn_mode(txn_mode) {}
explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), txn_context_enabled(txn_mode) {}

/// If is_txn_mode is true, snapshot should be specified instead of nullptr when used,
/// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
Expand Down

0 comments on commit 29b08bd

Please sign in to comment.