Skip to content

Commit

Permalink
Merge branch 'unstable' into fishyoung/fix_expired_key_incr
Browse files Browse the repository at this point in the history
  • Loading branch information
FishYoung authored Nov 18, 2024
2 parents c0d5398 + 1dbd7d3 commit f80db1f
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ testdb

build
cmake-build-*
build-*
4 changes: 2 additions & 2 deletions cmake/cpptrace.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(cpptrace
jeremy-rifkin/cpptrace v0.7.2
MD5=4d992a22ddb80300fa2ddac097a5ce51
jeremy-rifkin/cpptrace v0.7.3
MD5=032eb39d17eb138871a760b1c2f52a74
)

if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace")
Expand Down
14 changes: 14 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ slave-read-only yes
# By default the priority is 100.
slave-priority 100

# Change the default timeout in milliseconds for socket connect during replication.
# The default value is 3100, and 0 means no timeout.
#
# If the master is unreachable before connecting, not having a timeout may block future
# 'clusterx setnodes' commands because the replication thread is blocked on connect.
replication-connect-timeout-ms 3100

# Change the default timeout in milliseconds for socket recv during fullsync.
# The default value is 3200, and 0 means no timeout.
#
# If the master is unreachable when fetching SST files, not having a timeout may block
# future 'clusterx setnodes' commands because the replication thread is blocked on recv.
replication-recv-timeout-ms 3200

# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
Expand Down
20 changes: 17 additions & 3 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,14 @@ void ReplicationThread::CallbacksStateMachine::Start() {
}

uint64_t last_connect_timestamp = 0;
int connect_timeout_ms = 3100;

while (!repl_->stop_flag_ && bev == nullptr) {
if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
// prevent frequent re-connect when the master is down with the connection refused error
sleep(1);
}
last_connect_timestamp = util::GetTimeStampMS();
auto cfd = util::SockConnect(repl_->host_, repl_->port_, connect_timeout_ms);
auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg();
continue;
Expand Down Expand Up @@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err"));
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
this->srv_->GetConfig()->replication_connect_timeout_ms,
this->srv_->GetConfig()->replication_recv_timeout_ms)
.Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
Expand Down Expand Up @@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read size");
}
continue;
Expand Down Expand Up @@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
remain -= data_len;
} else {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read sst file");
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,9 @@ class CommandLPos : public Commander {
PosSpec spec_;
};

REDIS_REGISTER_COMMANDS(List, MakeCmdAttr<CommandBLPop>("blpop", -3, "write no-script blocking", 1, -2, 1),
MakeCmdAttr<CommandBRPop>("brpop", -3, "write no-script blocking", 1, -2, 1),
MakeCmdAttr<CommandBLMPop>("blmpop", -5, "write no-script blocking",
CommandBLMPop::keyRangeGen),
REDIS_REGISTER_COMMANDS(List, MakeCmdAttr<CommandBLPop>("blpop", -3, "write blocking", 1, -2, 1),
MakeCmdAttr<CommandBRPop>("brpop", -3, "write blocking", 1, -2, 1),
MakeCmdAttr<CommandBLMPop>("blmpop", -5, "write blocking", CommandBLMPop::keyRangeGen),
MakeCmdAttr<CommandLIndex>("lindex", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandLInsert>("linsert", 5, "write slow", 1, 1, 1),
MakeCmdAttr<CommandLLen>("llen", 2, "read-only", 1, 1, 1),
Expand Down
8 changes: 4 additions & 4 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ class CommandPollUpdates : public Commander {
Format format_ = Format::Raw;
};

REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading", NO_KEY),
REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading auth", NO_KEY),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", NO_KEY),
MakeCmdAttr<CommandSelect>("select", 2, "read-only", NO_KEY),
MakeCmdAttr<CommandInfo>("info", -1, "read-only ok-loading", NO_KEY),
Expand All @@ -1343,7 +1343,7 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
MakeCmdAttr<CommandSlowlog>("slowlog", -2, "read-only", NO_KEY),
MakeCmdAttr<CommandPerfLog>("perflog", -2, "read-only", NO_KEY),
MakeCmdAttr<CommandClient>("client", -2, "read-only", NO_KEY),
MakeCmdAttr<CommandMonitor>("monitor", 1, "read-only no-multi", NO_KEY),
MakeCmdAttr<CommandMonitor>("monitor", 1, "read-only no-multi no-script", NO_KEY),
MakeCmdAttr<CommandShutdown>("shutdown", 1, "read-only no-multi no-script", NO_KEY),
MakeCmdAttr<CommandQuit>("quit", 1, "read-only", NO_KEY),
MakeCmdAttr<CommandScan>("scan", -2, "read-only", NO_KEY),
Expand All @@ -1354,7 +1354,7 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
MakeCmdAttr<CommandTime>("time", 1, "read-only ok-loading", NO_KEY),
MakeCmdAttr<CommandDisk>("disk", 3, "read-only", 2, 2, 1),
MakeCmdAttr<CommandMemory>("memory", 3, "read-only", 2, 2, 1),
MakeCmdAttr<CommandHello>("hello", -1, "read-only ok-loading", NO_KEY),
MakeCmdAttr<CommandHello>("hello", -1, "read-only ok-loading auth", NO_KEY),
MakeCmdAttr<CommandRestore>("restore", -4, "write", 1, 1, 1),

MakeCmdAttr<CommandCompact>("compact", 1, "read-only no-script", NO_KEY),
Expand All @@ -1364,7 +1364,7 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
MakeCmdAttr<CommandSlaveOf>("slaveof", 3, "read-only exclusive no-script", NO_KEY),
MakeCmdAttr<CommandStats>("stats", 1, "read-only", NO_KEY),
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", NO_KEY),
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading multi no-script", NO_KEY),
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading bypass-multi no-script", NO_KEY),
MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", NO_KEY),
MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only", NO_KEY), )
Expand Down
14 changes: 5 additions & 9 deletions src/commands/cmd_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ class CommandExec : public Commander {
class CommandWatch : public Commander {
public:
Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
if (conn->IsFlagEnabled(Connection::kMultiExec)) {
return {Status::RedisExecErr, "WATCH inside MULTI is not allowed"};
}

// If a conn is already marked as watched_keys_modified, we can skip the watch.
if (srv->IsWatchedKeysModified(conn)) {
*output = redis::RESP_OK;
Expand All @@ -123,10 +119,10 @@ class CommandUnwatch : public Commander {
}
};

REDIS_REGISTER_COMMANDS(Txn, MakeCmdAttr<CommandMulti>("multi", 1, "multi", NO_KEY),
MakeCmdAttr<CommandDiscard>("discard", 1, "multi", NO_KEY),
MakeCmdAttr<CommandExec>("exec", 1, "exclusive multi slow", NO_KEY),
MakeCmdAttr<CommandWatch>("watch", -2, "multi", 1, -1, 1),
MakeCmdAttr<CommandUnwatch>("unwatch", 1, "multi", NO_KEY), )
REDIS_REGISTER_COMMANDS(Txn, MakeCmdAttr<CommandMulti>("multi", 1, "bypass-multi", NO_KEY),
MakeCmdAttr<CommandDiscard>("discard", 1, "bypass-multi", NO_KEY),
MakeCmdAttr<CommandExec>("exec", 1, "exclusive bypass-multi slow", NO_KEY),
MakeCmdAttr<CommandWatch>("watch", -2, "no-multi", 1, -1, 1),
MakeCmdAttr<CommandUnwatch>("unwatch", 1, "no-multi", NO_KEY), )

} // namespace redis
21 changes: 15 additions & 6 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ enum CommandFlags : uint64_t {
// "ok-loading" flag, for any command that can be executed while
// the db is in loading phase
kCmdLoading = 1ULL << 5,
// "multi" flag, for commands that can end a MULTI scope
kCmdEndMulti = 1ULL << 6,
// "bypass-multi" flag, for commands that can be executed in a MULTI scope,
// but these commands will NOT be queued and will be executed immediately
kCmdBypassMulti = 1ULL << 6,
// "exclusive" flag, for commands that should be executed execlusive globally
kCmdExclusive = 1ULL << 7,
// "no-multi" flag, for commands that cannot be executed in MULTI scope
Expand All @@ -80,6 +81,8 @@ enum CommandFlags : uint64_t {
// "blocking" flag, for commands that don't perform db ops immediately,
// but block and wait for some event to happen before performing db ops
kCmdBlocking = 1ULL << 14,
// "auth" flag, for commands used for authentication
kCmdAuth = 1ULL << 15,
};

enum class CommandCategory : uint8_t {
Expand Down Expand Up @@ -318,8 +321,8 @@ inline uint64_t ParseCommandFlags(const std::string &description, const std::str
flags |= kCmdLoading;
else if (flag == "exclusive")
flags |= kCmdExclusive;
else if (flag == "multi")
flags |= kCmdEndMulti;
else if (flag == "bypass-multi")
flags |= kCmdBypassMulti;
else if (flag == "no-multi")
flags |= kCmdNoMulti;
else if (flag == "no-script")
Expand All @@ -328,9 +331,15 @@ inline uint64_t ParseCommandFlags(const std::string &description, const std::str
flags |= kCmdNoDBSizeCheck;
else if (flag == "slow")
flags |= kCmdSlow;
else if (flag == "blocking")
else if (flag == "auth")
flags |= kCmdAuth;
else if (flag == "blocking") {
flags |= kCmdBlocking;
else {

// blocking commands should always be no-script
// TODO: we can relax this restriction if scripting becomes non-exclusive
flags |= kCmdNoScript;
} else {
std::cout << fmt::format("Encountered non-existent flag '{}' in command {} in command attribute parsing", flag,
cmd_name)
<< std::endl;
Expand Down
12 changes: 10 additions & 2 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,12 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may
howmuch = BUFFER_SIZE;
}
if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) {
return {Status::NotOK, fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))};
int err = SSL_get_error(ssl, howmuch);
if (err == SSL_ERROR_ZERO_RETURN) {
return {Status::EndOfFile, "EOF encountered while reading from SSL connection"};
}
return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK,
fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))};
}

if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) {
Expand All @@ -514,8 +519,11 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may
#endif
if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) {
return ret;
} else if (ret == 0) {
return {Status::EndOfFile, "EOF encountered while reading from socket"};
} else {
return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))};
return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain : Status::NotOK,
fmt::format("failed to read from socket: {}", strerror(errno))};
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class [[nodiscard]] Status {
// Search
NoPrefixMatched,
TypeMismatched,

// IO
TryAgain,
EndOfFile,
};

Status() : impl_{nullptr} {}
Expand Down
5 changes: 5 additions & 0 deletions src/common/string_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ std::string ToLower(std::string in) {
return in;
}

std::string ToUpper(std::string in) {
std::transform(in.begin(), in.end(), in.begin(), [](char c) -> char { return static_cast<char>(std::toupper(c)); });
return in;
}

bool EqualICase(std::string_view lhs, std::string_view rhs) {
return lhs.size() == rhs.size() && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
[](char l, char r) { return std::tolower(l) == std::tolower(r); });
Expand Down
1 change: 1 addition & 0 deletions src/common/string_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace util {

std::string Float2String(double d);
std::string ToLower(std::string in);
std::string ToUpper(std::string in);
bool EqualICase(std::string_view lhs, std::string_view rhs);
std::string BytesToHuman(uint64_t n);
std::string Trim(std::string in, std::string_view chars);
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ Config::Config() {
{"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)},
{"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)},
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
{"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct Config {
bool slave_serve_stale_data = true;
bool slave_empty_db_before_fullsync = false;
int slave_priority = 100;
int replication_connect_timeout_ms = 3100;
int replication_recv_timeout_ms = 3200;
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;
Expand Down
8 changes: 4 additions & 4 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

if (GetNamespace().empty()) {
if (!password.empty()) {
if (cmd_name != "auth" && cmd_name != "hello") {
if (!(cmd_flags & kCmdAuth)) {
Reply(redis::Error({Status::RedisNoAuth, "Authentication required."}));
continue;
}
Expand All @@ -413,7 +413,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
// that can guarantee other threads can't come into critical zone, such as DEBUG,
// CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future).
// Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute commands at the same time.
if (is_multi_exec && cmd_name != "exec") {
if (is_multi_exec && !(cmd_flags & kCmdBypassMulti)) {
// No lock guard, because 'exec' command has acquired 'WorkExclusivityGuard'
} else if (cmd_flags & kCmdExclusive) {
exclusivity = srv_->WorkExclusivityGuard();
Expand Down Expand Up @@ -443,7 +443,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}

if (is_multi_exec && (cmd_flags & kCmdNoMulti)) {
Reply(redis::Error({Status::NotOK, "Can't execute " + cmd_name + " in MULTI"}));
Reply(redis::Error({Status::NotOK, fmt::format("{} inside MULTI is not allowed", util::ToUpper(cmd_name))}));
multi_error_ = true;
continue;
}
Expand All @@ -463,7 +463,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}

// We don't execute commands, but queue them, and then execute in EXEC command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) {
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdBypassMulti)) {
multi_cmds_.emplace_back(std::move(cmd_tokens));
Reply(redis::SimpleString("QUEUED"));
continue;
Expand Down
2 changes: 1 addition & 1 deletion tests/gocase/unit/multi/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestMulti(t *testing.T) {
t.Run("WATCH inside MULTI is not allowed", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "MULTI").Err())
require.EqualError(t, rdb.Do(ctx, "WATCH", "x").Err(), "ERR WATCH inside MULTI is not allowed")
require.NoError(t, rdb.Do(ctx, "EXEC").Err())
require.NoError(t, rdb.Do(ctx, "DISCARD").Err())
})

t.Run("EXEC without MULTI is not allowed", func(t *testing.T) {
Expand Down

0 comments on commit f80db1f

Please sign in to comment.