diff --git a/src/config/config.cc b/src/config/config.cc index 500ac717008..1aade876d75 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -885,11 +885,20 @@ Status Config::Set(Server *srv, std::string key, const std::string &value) { if (!s.IsOK()) return s.Prefixed("invalid value"); } + auto origin_value = field->ToString(); auto s = field->Set(value); if (!s.IsOK()) return s.Prefixed("failed to set new value"); if (field->callback) { - return field->callback(srv, key, value); + s = field->callback(srv, key, value); + if (!s.IsOK()) { + // rollback the value if the callback failed + auto setStatus = field->Set(origin_value); + if (!setStatus.IsOK()) { + return setStatus.Prefixed("failed to rollback the value"); + } + } + return s; } return Status::OK(); diff --git a/src/server/namespace.cc b/src/server/namespace.cc index 504ad523681..429a37d9793 100644 --- a/src/server/namespace.cc +++ b/src/server/namespace.cc @@ -51,36 +51,47 @@ bool Namespace::IsAllowModify() const { return config->HasConfigFile() || config->repl_namespace_enabled; } +Status Namespace::loadFromDB(std::map* dbTokens) const { + std::string value; + auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value); + if (!s.ok()) { + if (s.IsNotFound()) return Status::OK(); + return {Status::NotOK, s.ToString()}; + } + + jsoncons::json j = jsoncons::json::parse(value); + for (const auto& iter : j.object_range()) { + dbTokens->insert({iter.key(), iter.value().as_string()}); + } + return Status::OK(); +} + Status Namespace::LoadAndRewrite() { auto config = storage_->GetConfig(); // Namespace is NOT allowed in the cluster mode, so we don't need to rewrite here. if (config->cluster_enabled) return Status::OK(); - // Load from the configuration file first - tokens_ = config->load_tokens; + std::map dbTokens; + auto s = loadFromDB(&dbTokens); + if (!s.IsOK()) return s; - // We would like to load namespaces from db even if repl_namespace_enabled is false, - // this can avoid missing some namespaces when turn on/off repl_namespace_enabled. - std::string value; - auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value); - if (!s.ok() && !s.IsNotFound()) { - return {Status::NotOK, s.ToString()}; + if (!dbTokens.empty() && !config->repl_namespace_enabled) { + return {Status::NotOK, "cannot switch off repl_namespace_enabled when namespaces exist in db"}; } - if (s.ok()) { - // The namespace db key is existed, so it doesn't allow to switch off repl_namespace_enabled - if (!config->repl_namespace_enabled) { - return {Status::NotOK, "cannot switch off repl_namespace_enabled when namespaces exist in db"}; - } - jsoncons::json j = jsoncons::json::parse(value); - for (const auto& iter : j.object_range()) { - if (tokens_.find(iter.key()) == tokens_.end()) { - // merge the namespace from db - tokens_[iter.key()] = iter.value().as(); - } + // Load from the configuration file first + tokens_ = config->load_tokens; + // Merge the tokens from the database if the token is not in the configuration file + for (const auto& iter : dbTokens) { + if (tokens_.find(iter.first) == tokens_.end()) { + tokens_[iter.first] = iter.second; } } + // The following rewrite is to remove namespace/token pairs from the configuration if the namespace replication + // is enabled. So we don't need to do that if no tokens are loaded or the namespace replication is disabled. + if (config->load_tokens.empty() || !config->repl_namespace_enabled) return Status::OK(); + return Rewrite(); } diff --git a/src/server/namespace.h b/src/server/namespace.h index c0556eeb161..5d76c1a1b08 100644 --- a/src/server/namespace.h +++ b/src/server/namespace.h @@ -47,4 +47,6 @@ class Namespace { engine::Storage *storage_; rocksdb::ColumnFamilyHandle *cf_ = nullptr; std::map tokens_; + + Status loadFromDB(std::map *dbTokens) const; }; diff --git a/tests/gocase/unit/namespace/namespace_test.go b/tests/gocase/unit/namespace/namespace_test.go index 1b80e5b94b0..ba863becee0 100644 --- a/tests/gocase/unit/namespace/namespace_test.go +++ b/tests/gocase/unit/namespace/namespace_test.go @@ -238,7 +238,17 @@ func TestNamespaceReplicate(t *testing.T) { }) t.Run("Turn off namespace replication is not allowed", func(t *testing.T) { + r := masterRdb.Do(ctx, "NAMESPACE", "ADD", "test-ns", "ns-token") + require.NoError(t, r.Err()) + require.Equal(t, "OK", r.Val()) util.ErrorRegexp(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err(), ".*cannot switch off repl_namespace_enabled when namespaces exist in db.*") + + r = masterRdb.Do(ctx, "NAMESPACE", "GET", "*") + // it should be allowed after deleting all namespaces + r = masterRdb.Do(ctx, "NAMESPACE", "DEL", "test-ns") + require.NoError(t, r.Err()) + require.Equal(t, "OK", r.Val()) + require.NoError(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err()) }) }