Skip to content

Commit

Permalink
Merge branch 'unstable' into ci-arm-linux
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksraiden authored Nov 18, 2024
2 parents 0bf42e7 + 7611d73 commit 41e382f
Show file tree
Hide file tree
Showing 108 changed files with 2,149 additions and 1,325 deletions.
9 changes: 9 additions & 0 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM ubuntu:noble
RUN apt update \
&& apt install -y \
git build-essential cmake libtool python3 libssl-dev python3-pip \
wget curl clang-format-14 clang-tidy-14 golang-go ninja-build \
redis-tools vim python3-redis redis-server clang lld mold gdb fish
RUN BUILD_DIR=$(pwd) && git clone https://github.com/jsha/minica /opt/minica \
&& cd /opt/minica && git checkout 96a5c93723cf3d34b50b3e723a9f05cd3765bc67 && go build && cd $BUILD_DIR \
&& echo 'export PATH=/opt/minica:$PATH' >> $HOME/.bashrc
5 changes: 5 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"build": {
"dockerfile": "Dockerfile"
}
}
5 changes: 5 additions & 0 deletions .github/config/typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ extend-exclude = [
".git/",
"src/vendor/",
"tests/gocase/util/slot.go",

# Uses short strings for testing glob matching
"tests/cppunit/string_util_test.cc",
"tests/gocase/unit/keyspace/keyspace_test.go",
"tests/gocase/unit/scan/scan_test.go",
]
ignore-hidden = false

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check typos
uses: crate-ci/typos@v1.25.0
uses: crate-ci/typos@v1.27.0
with:
config: .github/config/typos.toml

Expand Down Expand Up @@ -267,7 +267,7 @@ jobs:
if: ${{ matrix.sonarcloud }}

- name: Install sonar-scanner and build-wrapper
uses: SonarSource/sonarcloud-github-c-cpp@v2
uses: SonarSource/sonarcloud-github-c-cpp@v3
if: ${{ matrix.sonarcloud }}

- name: Build Kvrocks
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ testdb

build
cmake-build-*
build-*
4 changes: 2 additions & 2 deletions cmake/cpptrace.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(cpptrace
jeremy-rifkin/cpptrace v0.7.2
MD5=4d992a22ddb80300fa2ddac097a5ce51
jeremy-rifkin/cpptrace v0.7.3
MD5=032eb39d17eb138871a760b1c2f52a74
)

if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace")
Expand Down
4 changes: 2 additions & 2 deletions cmake/jsoncons.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(jsoncons
danielaparker/jsoncons v0.177.0
MD5=393062889321f4f715a9302d8f49acf8
danielaparker/jsoncons v0.178.0
MD5=397410843b7c540e9dcee9b8b0c797a6
)

FetchContent_MakeAvailableWithArgs(jsoncons
Expand Down
4 changes: 2 additions & 2 deletions cmake/rocksdb.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ endif()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(rocksdb
facebook/rocksdb v9.6.1
MD5=ce31144a7e65d8f4f3f9d98986509eb1
facebook/rocksdb v9.7.4
MD5=9a08feb50e017006146bcff37059096f
)

FetchContent_GetProperties(jemalloc)
Expand Down
4 changes: 2 additions & 2 deletions cmake/tbb.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(tbb
oneapi-src/oneTBB v2021.13.0
MD5=2dd9b7cfa5de5bb3add2f7392e0c9bab
oneapi-src/oneTBB v2022.0.0
MD5=7eaeff0ddec85182afb60f2232fae2af
)

FetchContent_MakeAvailableWithArgs(tbb
Expand Down
49 changes: 49 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ bind 127.0.0.1
# unixsocket /tmp/kvrocks.sock
# unixsocketperm 777

# Allows a parent process to open a socket and pass its FD down to kvrocks as a child
# process. Useful to reserve a port and prevent race conditions.
#
# PLEASE NOTE:
# If this is overridden to a value other than -1, the bind and tls* directives will be
# ignored.
#
# Default: -1 (not overridden, defer to creating a connection to the specified port)
socket-fd -1

# Accept connections on the specified port, default is 6666.
port 6666

Expand Down Expand Up @@ -164,6 +174,20 @@ slave-read-only yes
# By default the priority is 100.
slave-priority 100

# Change the default timeout in milliseconds for socket connect during replication.
# The default value is 3100, and 0 means no timeout.
#
# If the master is unreachable before connecting, not having a timeout may block future
# 'clusterx setnodes' commands because the replication thread is blocked on connect.
replication-connect-timeout-ms 3100

# Change the default timeout in milliseconds for socket recv during fullsync.
# The default value is 3200, and 0 means no timeout.
#
# If the master is unreachable when fetching SST files, not having a timeout may block
# future 'clusterx setnodes' commands because the replication thread is blocked on recv.
replication-recv-timeout-ms 3200

# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
Expand Down Expand Up @@ -708,6 +732,16 @@ rocksdb.max_background_flushes -1
# Default: 2
rocksdb.max_subcompactions 2

# If enabled WAL records will be compressed before they are written. Only
# ZSTD (= kZSTD) is supported (until streaming support is adapted for other
# compression types). Compressed WAL records will be read in supported
# versions (>= RocksDB 7.4.0 for ZSTD) regardless of this setting when
# the WAL is read.
#
# Accept value: "no", "zstd"
# Default is no
rocksdb.wal_compression no

# In order to limit the size of WALs, RocksDB uses DBOptions::max_total_wal_size
# as the trigger of column family flush. Once WALs exceed this size, RocksDB
# will start forcing the flush of column families to allow deletion of some
Expand All @@ -729,6 +763,12 @@ rocksdb.max_subcompactions 2
# default is 512MB
rocksdb.max_total_wal_size 512

# Whether to print malloc stats together with rocksdb.stats when printing to LOG.
#
# Accepted values: "yes", "no"
# Default: yes
rocksdb.dump_malloc_stats yes

# We implement the replication with rocksdb WAL, it would trigger full sync when the seq was out of range.
# wal_ttl_seconds and wal_size_limit_mb would affect how archived logs will be deleted.
# If WAL_ttl_seconds is not 0, then WAL files will be checked every WAL_ttl_seconds / 2 and those that
Expand Down Expand Up @@ -794,6 +834,15 @@ rocksdb.compression_level 32767
# Default: 2 MB
rocksdb.compaction_readahead_size 2097152

# Enable compression from n levels of LSM-tree.
# By default compression is disabled for the first two levels (L0 and L1),
# because it may contain the frequently accessed data, so it'd be better
# to use uncompressed data to save the CPU.
# Value: [0, 7) (upper boundary is kvrocks maximum levels number)
#
# Default: 2
rocksdb.compression_start_level 2

# he limited write rate to DB if soft_pending_compaction_bytes_limit or
# level0_slowdown_writes_trigger is triggered.

Expand Down
15 changes: 8 additions & 7 deletions src/cli/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <iomanip>
#include <ostream>

#include "config.h"
#include "daemon_util.h"
#include "io_util.h"
#include "pid_util.h"
Expand All @@ -40,9 +39,7 @@
#include "storage/storage.h"
#include "string_util.h"
#include "time_util.h"
#include "unique_fd.h"
#include "vendor/crc64.h"
#include "version.h"
#include "version_util.h"

Server *srv = nullptr;
Expand Down Expand Up @@ -119,9 +116,6 @@ static void InitGoogleLog(const Config *config) {
int main(int argc, char *argv[]) {
srand(static_cast<unsigned>(util::GetTimeStamp()));

google::InitGoogleLogging("kvrocks");
auto glog_exit = MakeScopeExit(google::ShutdownGoogleLogging);

evthread_use_pthreads();
auto event_exit = MakeScopeExit(libevent_global_shutdown);

Expand All @@ -136,14 +130,21 @@ int main(int argc, char *argv[]) {
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
return 1;
}
const auto socket_fd_exit = MakeScopeExit([&config] {
if (config.socket_fd != -1) {
close(config.socket_fd);
}
});

crc64_init();
InitGoogleLog(&config);
google::InitGoogleLogging("kvrocks");
auto glog_exit = MakeScopeExit(google::ShutdownGoogleLogging);
LOG(INFO) << "kvrocks " << PrintVersion;
// Tricky: We don't expect that different instances running on the same port,
// but the server use REUSE_PORT to support the multi listeners. So we connect
// the listen port to check if the port has already listened or not.
if (!config.binds.empty()) {
if (config.socket_fd == -1 && !config.binds.empty()) {
uint32_t ports[] = {config.port, config.tls_port, 0};
for (uint32_t *port = ports; *port; ++port) {
if (util::IsPortInUse(*port)) {
Expand Down
39 changes: 22 additions & 17 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}

// Set replication relationship
if (myself_) return SetMasterSlaveRepl();

return Status::OK();
return SetMasterSlaveRepl();
}

// The reason why the new version MUST be +1 of current version is that,
Expand Down Expand Up @@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

// Set replication relationship
if (myself_) {
s = SetMasterSlaveRepl();
if (!s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}
if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}

// Clear data of migrated slots
Expand All @@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
Status Cluster::SetMasterSlaveRepl() {
if (!srv_) return Status::OK();

if (!myself_) return Status::OK();
// If the node is not in the cluster topology, remove the master replication if it's a replica.
if (!myself_) {
if (auto s = srv_->RemoveMaster(); !s.IsOK()) {
return s.Prefixed("failed to remove master");
}
return Status::OK();
}

if (myself_->role == kClusterMaster) {
// Master mode
Expand Down Expand Up @@ -832,16 +833,18 @@ bool Cluster::IsWriteForbiddenSlot(int slot) const {

Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn, lua::ScriptRunCtx *script_run_ctx) {
std::vector<int> keys_indexes;
std::vector<int> key_indexes;

// No keys
if (auto s = redis::CommandTable::GetKeysFromCommand(attributes, cmd_tokens, &keys_indexes); !s.IsOK())
return Status::OK();
attributes->ForEachKeyRange(
[&](const std::vector<std::string> &, redis::CommandKeyRange key_range) {
key_range.ForEachKeyIndex([&](int i) { key_indexes.push_back(i); }, cmd_tokens.size());
},
cmd_tokens);

if (keys_indexes.empty()) return Status::OK();
if (key_indexes.empty()) return Status::OK();

int slot = -1;
for (auto i : keys_indexes) {
for (auto i : key_indexes) {
if (i >= static_cast<int>(cmd_tokens.size())) break;

int cur_slot = GetSlotIdFromKey(cmd_tokens[i]);
Expand Down Expand Up @@ -871,6 +874,8 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
cross_slot_ok = true;
}

uint64_t flags = attributes->GenerateFlags(cmd_tokens);

if (myself_ && myself_ == slots_nodes_[slot]) {
// We use central controller to manage the topology of the cluster.
// Server can't change the topology directly, so we record the migrated slots
Expand All @@ -880,7 +885,7 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
}
// To keep data consistency, slot will be forbidden write while sending the last incremental data.
// During this phase, the requests of the migrating slot has to be rejected.
if ((attributes->flags & redis::kCmdWrite) && IsWriteForbiddenSlot(slot)) {
if ((flags & redis::kCmdWrite) && IsWriteForbiddenSlot(slot)) {
return {Status::RedisTryAgain, "Can't write to slot being migrated which is in write forbidden phase"};
}

Expand All @@ -903,7 +908,7 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving the imported slot
}

if (myself_ && myself_->role == kClusterSlave && !(attributes->flags & redis::kCmdWrite) &&
if (myself_ && myself_->role == kClusterSlave && !(flags & redis::kCmdWrite) &&
nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] &&
conn->IsFlagEnabled(redis::Connection::kReadOnly)) {
return Status::OK(); // My master is serving this slot
Expand Down
22 changes: 18 additions & 4 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status FeedSlaveThread::Start() {
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = util::SockSend(conn_->GetFD(), redis::SimpleString("OK"), conn_->GetBufferEvent());
auto s = util::SockSend(conn_->GetFD(), redis::RESP_OK, conn_->GetBufferEvent());
if (!s.IsOK()) {
LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
return;
Expand Down Expand Up @@ -252,15 +252,14 @@ void ReplicationThread::CallbacksStateMachine::Start() {
}

uint64_t last_connect_timestamp = 0;
int connect_timeout_ms = 3100;

while (!repl_->stop_flag_ && bev == nullptr) {
if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
// prevent frequent re-connect when the master is down with the connection refused error
sleep(1);
}
last_connect_timestamp = util::GetTimeStampMS();
auto cfd = util::SockConnect(repl_->host_, repl_->port_, connect_timeout_ms);
auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg();
continue;
Expand Down Expand Up @@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err"));
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
this->srv_->GetConfig()->replication_connect_timeout_ms,
this->srv_->GetConfig()->replication_recv_timeout_ms)
.Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
Expand Down Expand Up @@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read size");
}
continue;
Expand Down Expand Up @@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
remain -= data_len;
} else {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read sst file");
}
}
Expand Down
Loading

0 comments on commit 41e382f

Please sign in to comment.