diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index cd7fe197a76..b8d10907117 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -30,13 +30,16 @@ #include #include #include +#include #include +#include #include #include "commands/error_constants.h" #include "event_util.h" #include "fmt/format.h" #include "io_util.h" +#include "rocksdb/write_batch.h" #include "rocksdb_crc32c.h" #include "scope_exit.h" #include "server/redis_reply.h" @@ -557,7 +560,6 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) { } ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *bev) { - char *bulk_data = nullptr; repl_state_.store(kReplConnected, std::memory_order_relaxed); auto input = bufferevent_get_input(bev); while (true) { @@ -576,31 +578,38 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent * } case Incr_batch_data: // Read bulk data (batch data) - if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data - bulk_data = reinterpret_cast(evbuffer_pullup(input, static_cast(incr_bulk_len_ + 2))); - std::string bulk_string = std::string(bulk_data, incr_bulk_len_); + if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not enough + return CBState::AGAIN; + } + + const char *bulk_data = + reinterpret_cast(evbuffer_pullup(input, static_cast(incr_bulk_len_ + 2))); + std::string bulk_string = std::string(bulk_data, incr_bulk_len_); + evbuffer_drain(input, incr_bulk_len_ + 2); + incr_state_ = Incr_batch_size; + + if (bulk_string == "ping") { // master would send the ping heartbeat packet to check whether the slave was alive or not, // don't write ping to db here. - if (bulk_string != "ping") { - auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, incr_bulk_len_)); - if (!s.IsOK()) { - LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x" - << util::StringToHex(bulk_string); - return CBState::RESTART; - } - - s = parseWriteBatch(bulk_string); - if (!s.IsOK()) { - LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(bulk_string) - << ": " << s.Msg(); - return CBState::RESTART; - } - } - evbuffer_drain(input, incr_bulk_len_ + 2); - incr_state_ = Incr_batch_size; - } else { return CBState::AGAIN; } + + rocksdb::WriteBatch batch(std::move(bulk_string)); + + auto s = storage_->ReplicaApplyWriteBatch(&batch); + if (!s.IsOK()) { + LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x" + << util::StringToHex(batch.Data()); + return CBState::RESTART; + } + + s = parseWriteBatch(batch); + if (!s.IsOK()) { + LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(batch.Data()) + << ": " << s.Msg(); + return CBState::RESTART; + } + break; } } @@ -981,8 +990,7 @@ void ReplicationThread::TimerCB(int, int16_t) { } } -Status ReplicationThread::parseWriteBatch(const std::string &batch_string) { - rocksdb::WriteBatch write_batch(batch_string); +Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch) { WriteBatchHandler write_batch_handler; auto db_status = write_batch.Iterate(&write_batch_handler); diff --git a/src/cluster/replication.h b/src/cluster/replication.h index 8325b162cc8..75c545e08c3 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -33,6 +33,7 @@ #include "event_util.h" #include "io_util.h" +#include "rocksdb/write_batch.h" #include "server/redis_connection.h" #include "status.h" #include "storage/storage.h" @@ -209,7 +210,7 @@ class ReplicationThread : private EventCallbackBase { static bool isWrongPsyncNum(std::string_view err); static bool isUnknownOption(std::string_view err); - Status parseWriteBatch(const std::string &batch_string); + Status parseWriteBatch(const rocksdb::WriteBatch &write_batch); }; /* diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 8e3140c4cd9..144c43d7ed9 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -43,6 +43,8 @@ #include "redis_db.h" #include "redis_metadata.h" #include "rocksdb/cache.h" +#include "rocksdb/options.h" +#include "rocksdb/write_batch.h" #include "rocksdb_crc32c.h" #include "server/server.h" #include "storage/batch_indexer.h" @@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write return Write(ctx, options, batch->GetWriteBatch()); } -Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) { - return ApplyWriteBatch(default_write_opts_, std::move(raw_batch)); +Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) { + return applyWriteBatch(default_write_opts_, batch); } -Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) { +Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch) { if (db_size_limit_reached_) { return {Status::NotOK, "reach space limit"}; } - auto batch = rocksdb::WriteBatch(std::move(raw_batch)); - auto s = db_->Write(options, &batch); + auto s = db_->Write(options, batch); if (!s.ok()) { return {Status::NotOK, s.ToString()}; } return Status::OK(); } +Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) { + auto batch = rocksdb::WriteBatch(std::move(raw_batch)); + return applyWriteBatch(options, &batch); +} + void Storage::RecordStat(StatType type, uint64_t v) { switch (type) { case StatType::FlushCount: diff --git a/src/storage/storage.h b/src/storage/storage.h index b09d9ef1ce5..a4563eabc8f 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -41,6 +41,7 @@ #include "config/config.h" #include "lock_manager.h" #include "observer_or_unique.h" +#include "rocksdb/write_batch.h" #include "status.h" #if defined(__sparc__) || defined(__arm__) @@ -230,7 +231,7 @@ class Storage { Status RestoreFromBackup(); Status RestoreFromCheckpoint(); Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr *iter); - Status ReplicaApplyWriteBatch(std::string &&raw_batch); + Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch); Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch); rocksdb::SequenceNumber LatestSeqNumber(); @@ -380,13 +381,14 @@ class Storage { // command, so it won't have multi transactions to be executed at the same time. std::unique_ptr txn_write_batch_; - rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions(); + rocksdb::WriteOptions default_write_opts_; // rocksdb used global block cache std::shared_ptr shared_block_cache_; rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s); + Status applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch); }; /// Context passes fixed snapshot and batch between APIs