From 1974d9868f1230763fe760a590105c6500fca4cc Mon Sep 17 00:00:00 2001
From: mwish <maplewish117@gmail.com>
Date: Thu, 14 Nov 2024 12:33:57 +0800
Subject: [PATCH] chore(enhancement): rename is_txn_mode to txn_context_enabled
 (#2644)

---
 src/cluster/slot_migrate.h     |  2 +-
 src/commands/cmd_txn.cc        |  4 +++
 src/config/config.h            |  2 +-
 src/server/redis_connection.cc |  2 +-
 src/storage/batch_indexer.h    |  4 ++-
 src/storage/storage.cc         | 29 +++++++++++---------
 src/storage/storage.h          | 49 +++++++++++++++++++++++-----------
 7 files changed, 59 insertions(+), 33 deletions(-)

diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 1114f2a1b55..6d323b6bdf2 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -45,7 +45,7 @@
 
 enum class MigrationType {
   /// Use Redis commands to migrate data.
-  /// It will trying to extract commands from existing data and log, then replay
+  /// It will try to extract commands from existing data and log, then replay
   /// them on the destination node.
   kRedisCommand = 0,
   /// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc
index 4f3cb3a4d4f..3d88d9a2ec4 100644
--- a/src/commands/cmd_txn.cc
+++ b/src/commands/cmd_txn.cc
@@ -85,6 +85,10 @@ class CommandExec : public Commander {
     auto s = storage->BeginTxn();
     if (s.IsOK()) {
       conn->ExecuteCommands(conn->GetMultiExecCommands());
+      // In Redis, errors happening after EXEC instead are not handled in a special way:
+      // all the other commands will be executed even if some command fails during
+      // the transaction.
+      // So, if conn->IsMultiError(), the transaction should still be committed.
       s = storage->CommitTxn();
     }
     return s;
diff --git a/src/config/config.h b/src/config/config.h
index 61ac0cf88cd..3b9d99de2e9 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -54,7 +54,7 @@ constexpr const size_t GiB = 1024L * MiB;
 constexpr const uint32_t kDefaultPort = 6666;
 
 constexpr const char *kDefaultNamespace = "__namespace";
-constexpr const size_t KVROCKS_MAX_LSM_LEVEL = 7;
+constexpr int KVROCKS_MAX_LSM_LEVEL = 7;
 
 enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC };
 
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 54899afe153..e6bef06d59f 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -464,7 +464,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
 
     // We don't execute commands, but queue them, and then execute in EXEC command
     if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) {
-      multi_cmds_.emplace_back(cmd_tokens);
+      multi_cmds_.emplace_back(std::move(cmd_tokens));
       Reply(redis::SimpleString("QUEUED"));
       continue;
     }
diff --git a/src/storage/batch_indexer.h b/src/storage/batch_indexer.h
index 99fc7f879f0..7d0fe1702a1 100644
--- a/src/storage/batch_indexer.h
+++ b/src/storage/batch_indexer.h
@@ -29,7 +29,8 @@
 
 #include "storage.h"
 
-// WriteBatchIndexer traverses the operations in WriteBatch and appends to the specified WriteBatchWithIndex
+/// WriteBatchIndexer traverses the operations in WriteBatch and appends to the
+/// specified WriteBatchWithIndex.
 class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
  public:
   explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch,
@@ -41,6 +42,7 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
   }
   explicit WriteBatchIndexer(engine::Context& ctx)
       : WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}
+
   rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override {
     return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value);
   }
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index a446e3f0b2d..36b027a6694 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -597,14 +597,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
 rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
                              rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
                              std::string *value) {
-  if (ctx.is_txn_mode) {
+  if (ctx.txn_context_enabled) {
     DCHECK_NOTNULL(options.snapshot);
     DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
   }
   rocksdb::Status s;
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
     s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
-  } else if (ctx.batch && ctx.is_txn_mode) {
+  } else if (ctx.batch && ctx.txn_context_enabled) {
     s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
   } else {
     s = db_->Get(options, column_family, key, value);
@@ -622,14 +622,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
 rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &options,
                              rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Slice &key,
                              rocksdb::PinnableSlice *value) {
-  if (ctx.is_txn_mode) {
+  if (ctx.txn_context_enabled) {
     DCHECK_NOTNULL(options.snapshot);
     DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
   }
   rocksdb::Status s;
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
     s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
-  } else if (ctx.is_txn_mode && ctx.batch) {
+  } else if (ctx.txn_context_enabled && ctx.batch) {
     s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
   } else {
     s = db_->Get(options, column_family, key, value);
@@ -655,14 +655,14 @@ void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_famil
 
 rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options,
                                         rocksdb::ColumnFamilyHandle *column_family) {
-  if (ctx.is_txn_mode) {
+  if (ctx.txn_context_enabled) {
     DCHECK_NOTNULL(options.snapshot);
     DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
   }
   auto iter = db_->NewIterator(options, column_family);
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
     return txn_write_batch_->NewIteratorWithBase(column_family, iter, &options);
-  } else if (ctx.is_txn_mode && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) {
+  } else if (ctx.txn_context_enabled && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) {
     return ctx.batch->NewIteratorWithBase(column_family, iter, &options);
   }
   return iter;
@@ -671,14 +671,14 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea
 void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options,
                        rocksdb::ColumnFamilyHandle *column_family, const size_t num_keys, const rocksdb::Slice *keys,
                        rocksdb::PinnableSlice *values, rocksdb::Status *statuses) {
-  if (ctx.is_txn_mode) {
+  if (ctx.txn_context_enabled) {
     DCHECK_NOTNULL(options.snapshot);
     DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
   }
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
     txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
                                              false);
-  } else if (ctx.is_txn_mode && ctx.batch) {
+  } else if (ctx.txn_context_enabled && ctx.batch) {
     ctx.batch->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses, false);
   } else {
     db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
@@ -700,18 +700,21 @@ rocksdb::Status Storage::Write(engine::Context &ctx, const rocksdb::WriteOptions
 
 rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options,
                                    rocksdb::WriteBatch *updates) {
-  // Put replication id logdata at the end of write batch
+  // Put replication id logdata at the end of `updates`.
   if (replid_.length() == kReplIdLength) {
     updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode());
   }
 
-  if (ctx.is_txn_mode) {
+  if (ctx.txn_context_enabled) {
+    // Extract writes from the updates and append to the ctx.batch
     if (ctx.batch == nullptr) {
       ctx.batch = std::make_unique<rocksdb::WriteBatchWithIndex>();
     }
     WriteBatchIndexer handle(ctx);
     auto s = updates->Iterate(&handle);
     if (!s.ok()) return s;
+  } else {
+    DCHECK(ctx.batch == nullptr);
   }
 
   return db_->Write(options, updates);
@@ -1278,19 +1281,19 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d
 
 [[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
   rocksdb::ReadOptions read_options;
-  if (is_txn_mode) read_options.snapshot = GetSnapshot();
+  if (txn_context_enabled) read_options.snapshot = GetSnapshot();
   return read_options;
 }
 
 [[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
   rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
-  if (is_txn_mode) read_options.snapshot = GetSnapshot();
+  if (txn_context_enabled) read_options.snapshot = GetSnapshot();
   return read_options;
 }
 
 [[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
   rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
-  if (is_txn_mode) read_options.snapshot = GetSnapshot();
+  if (txn_context_enabled) read_options.snapshot = GetSnapshot();
   return read_options;
 }
 
diff --git a/src/storage/storage.h b/src/storage/storage.h
index c29b4689de5..3a45ccd43c1 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -248,11 +248,12 @@ class Storage {
                                  rocksdb::ColumnFamilyHandle *column_family);
   rocksdb::Iterator *NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options);
 
-  [[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options,
-                                      rocksdb::WriteBatch *updates);
-  const rocksdb::WriteOptions &DefaultWriteOptions() { return default_write_opts_; }
+  const rocksdb::WriteOptions &DefaultWriteOptions() const { return default_write_opts_; }
   rocksdb::ReadOptions DefaultScanOptions() const;
   rocksdb::ReadOptions DefaultMultiGetOptions() const;
+
+  [[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const rocksdb::WriteOptions &options,
+                                      rocksdb::WriteBatch *updates);
   [[nodiscard]] rocksdb::Status Delete(engine::Context &ctx, const rocksdb::WriteOptions &options,
                                        rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key);
   [[nodiscard]] rocksdb::Status DeleteRange(engine::Context &ctx, const rocksdb::WriteOptions &options,
@@ -336,6 +337,9 @@ class Storage {
   void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ = yes_or_no; }
   bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; }
 
+  /// Redis PSYNC relies on a Unique Replication Sequence Id when use-rsid-psync
+  /// enabled.
+  /// ShiftReplId would generate an Id and write it to propagate cf.
   Status ShiftReplId(engine::Context &ctx);
   std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
   std::string GetReplIdFromDbEngine();
@@ -363,6 +367,8 @@ class Storage {
 
   std::atomic<bool> db_in_retryable_io_error_{false};
 
+  // is_txn_mode_ is used to determine whether the current Storage is in transactional mode,
+  // .i.e, in "EXEC" command(CommandExec).
   std::atomic<bool> is_txn_mode_ = false;
   // txn_write_batch_ is used as the global write batch for the transaction mode,
   // all writes will be grouped in this write batch when entering the transaction mode,
@@ -380,39 +386,48 @@ class Storage {
 
 /// Context passes fixed snapshot and batch between APIs
 ///
-/// Limitations: Performing a large number of writes on the same Context may reduce performance.
-/// Please choose to use the same Context or create a new Context based on the actual situation.
+/// Limitations: Performing a large number of writes or apply operations like DeleteRange
+/// on the same Context may reduce performance.
+/// Please choose to use the same Context or create a new Context based on the actual
+/// situation.
 ///
 /// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs.
 struct Context {
   engine::Storage *storage = nullptr;
 
+  /// batch can be nullptr if
+  /// 1. The Context is not in transactional mode.
+  /// 2. The Context is in transactional mode, but no write operation is performed.
   std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;
 
-  /// is_txn_mode is used to determine whether the current Context is in transactional mode,
+  /// txn_context_enabled is used to determine whether the current Context is in transactional mode,
   /// if it is not transactional mode, then Context is equivalent to a Storage.
   /// If the configuration of txn-context-enabled is no, it is false.
-  bool is_txn_mode = true;
+  bool txn_context_enabled = true;
 
   /// NoTransactionContext returns a Context with a is_txn_mode of false
-  static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, false); }
+  static Context NoTransactionContext(engine::Storage *storage) { return Context(storage, /*txn_mode=*/false); }
 
-  /// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, then its snapshot is specified by the
-  /// Context
+  /// GetReadOptions returns a default ReadOptions, and if txn_context_enabled = true,
+  /// then its snapshot is specified by the Context.
+  /// Otherwise it is the same as Storage::DefaultReadOptions().
   [[nodiscard]] rocksdb::ReadOptions GetReadOptions();
-  /// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = true, then its snapshot is specified by the
-  /// Context. Otherwise it is the same as Storage::DefaultScanOptions
+  /// DefaultScanOptions returns a DefaultScanOptions, and if txn_context_enabled = true,
+  /// then its snapshot is specified by the Context.
+  /// Otherwise it is the same as Storage::DefaultScanOptions().
   [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
-  /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if is_txn_mode = true, then its snapshot is specified
-  /// by the Context. Otherwise it is the same as Storage::DefaultMultiGetOptions
+  /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if txn_context_enabled = true,
+  /// then its snapshot is specified by the Context.
+  /// Otherwise it is the same as Storage::DefaultMultiGetOptions
   [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();
 
   void RefreshLatestSnapshot();
 
   /// TODO: Change it to defer getting the context, and the snapshot is pinned after the first read operation
   explicit Context(engine::Storage *storage)
-      : storage(storage), is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
+      : storage(storage), txn_context_enabled(storage->GetConfig()->txn_context_enabled) {}
   ~Context() {
+    // A moved-from object doesn't have `storage`.
     if (storage) {
       if (snapshot_ && storage->GetDB()) {
         storage->GetDB()->ReleaseSnapshot(snapshot_);
@@ -441,6 +456,8 @@ struct Context {
   // and it's not a thread-safe operation.
   const rocksdb::Snapshot *GetSnapshot() {
     if (snapshot_ == nullptr) {
+      // Should not acquire a snapshot_ on a moved-from object.
+      DCHECK(storage != nullptr);
       snapshot_ = storage->GetDB()->GetSnapshot();  // NOLINT
     }
     return snapshot_;
@@ -448,7 +465,7 @@ struct Context {
 
  private:
   /// It is only used by NonTransactionContext
-  explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), is_txn_mode(txn_mode) {}
+  explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), txn_context_enabled(txn_mode) {}
 
   /// If is_txn_mode is true, snapshot should be specified instead of nullptr when used,
   /// and should be consistent with snapshot in ReadOptions to avoid ambiguity.