Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support count min sketch data structure and most commands #2524

Open
wants to merge 39 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c9626df
CountMinSketch Additions
jonathanc-n Sep 5, 2024
bb48c5b
adding redis_cms
jonathanc-n Sep 7, 2024
b4359b5
Merge branch 'unstable' into cms
jonathanc-n Sep 7, 2024
a1b904a
small changes
jonathanc-n Sep 7, 2024
8cab9c9
Merge branch 'cms' of https://github.com/jonathanc-n/kvrocks into cms
jonathanc-n Sep 7, 2024
ecbaa44
Resolved changes
jonathanc-n Sep 8, 2024
bf41c60
Merge branch 'unstable' into cms
jonathanc-n Sep 9, 2024
3fb9f1f
small tweaks
jonathanc-n Sep 10, 2024
a92dc08
parse change
jonathanc-n Sep 10, 2024
e59e2b7
Merge branch 'unstable' into cms
jonathanc-n Sep 10, 2024
b8b0bf2
format changes
jonathanc-n Sep 12, 2024
dd1c4f5
tweaks + lint
jonathanc-n Sep 14, 2024
509d5ab
max memory checks for initbyprob command
jonathanc-n Sep 14, 2024
f19e364
Merge branch 'unstable' into cms
jonathanc-n Sep 14, 2024
c1c1e7f
Merge branch 'unstable' into cms
jonathanc-n Sep 16, 2024
f21792c
Merge branch 'unstable' into cms
jonathanc-n Sep 17, 2024
1e4e747
Update src/types/redis_cms.h
jonathanc-n Sep 18, 2024
a71d7f7
all fixes + go test case
jonathanc-n Sep 19, 2024
202a2f5
Merge branch 'unstable' into cms
jonathanc-n Sep 20, 2024
7743f7b
Merge branch 'unstable' into cms
jonathanc-n Sep 21, 2024
ede5481
Merge branch 'unstable' into cms
jonathanc-n Sep 22, 2024
551f3c8
Added additional test cases
jonathanc-n Sep 23, 2024
a0222d7
lint fix
jonathanc-n Sep 23, 2024
9a2fe26
Merge branch 'unstable' into cms
jonathanc-n Sep 27, 2024
de0c2ad
Update src/types/redis_cms.h
jonathanc-n Sep 29, 2024
69d6ea4
Fixes
jonathanc-n Sep 29, 2024
db73151
Merge branch 'cms' of https://github.com/jonathanc-n/kvrocks into cms
jonathanc-n Sep 29, 2024
0cdfb07
Small Changes
jonathanc-n Sep 29, 2024
02347df
logic fix
jonathanc-n Sep 29, 2024
606f9b5
lint
jonathanc-n Sep 29, 2024
0445b3d
Quick Fixes
jonathanc-n Sep 29, 2024
48e9bc2
lint fix
jonathanc-n Sep 29, 2024
f577146
typing fixes
jonathanc-n Sep 30, 2024
ab277e9
one mb limit update per key
jonathanc-n Sep 30, 2024
f2f1288
Update src/types/redis_cms.cc
jonathanc-n Sep 30, 2024
ca60e13
Update src/types/redis_cms.cc
jonathanc-n Sep 30, 2024
df7a7b2
[WIP] Some codereview check
mapleFU Sep 30, 2024
e64056f
Merge branch 'unstable' into cms
aleksraiden Oct 5, 2024
bc81447
Update: making content pass test
mapleFU Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/commands/cmd_cms.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
namespace redis {

/// CMS.INCRBY key item increment [item increment ...]
///
/// The `key` should be an existing Count-Min Sketch key,
/// otherwise, the command will return an error.
class CommandCMSIncrBy final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand All @@ -39,25 +42,28 @@ class CommandCMSIncrBy final : public Commander {
redis::CMS cms(srv->storage, conn->GetNamespace());
engine::Context ctx(srv->storage);
rocksdb::Status s;
std::unordered_map<std::string, uint64_t> elements;
// <item, increment> pairs
std::vector<redis::CMS::IncrByPair> elements;
elements.reserve((args_.size() - 2) / 2);
for (size_t i = 2; i < args_.size(); i += 2) {
std::string key = args_[i];
auto parse_result = ParseInt<uint64_t>(args_[i + 1]);
std::string_view key = args_[i];
auto parse_result = ParseInt<int64_t>(args_[i + 1]);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
uint64_t value = *parse_result;
elements[key] = value;
int64_t value = *parse_result;
elements.emplace_back(redis::CMS::IncrByPair{key, value});
}

s = cms.IncrBy(ctx, args_[1], elements);
std::vector<uint32_t> counters;
s = cms.IncrBy(ctx, args_[1], elements, &counters);
if (s.IsNotFound()) {
return {Status::RedisExecErr, "Key not found"};
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
}
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

// TODO(mwish): adjust the output
*output = redis::SimpleString("OK");
return Status::OK();
}
Expand All @@ -78,7 +84,7 @@ class CommandCMSInfo final : public Commander {
return {Status::RedisExecErr, "Key not found"};
}

if (!s.ok() && !s.IsNotFound()) {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

Expand Down
25 changes: 15 additions & 10 deletions src/types/cms.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,41 @@

#include "xxhash.h"

uint64_t CMSketch::CountMinSketchHash(std::string_view item, uint64_t seed) {
return XXH64(item.data(), item.size(), seed);
}

CMSketch::CMSketchDimensions CMSketch::CMSDimFromProb(double error, double delta) {
CMSketchDimensions dims;
dims.width = std::ceil(2 / error);
dims.depth = std::ceil(std::log10(delta) / std::log10(0.5));
jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
return dims;
}

size_t CMSketch::IncrBy(std::string_view item, uint32_t value) {
size_t min_count = std::numeric_limits<size_t>::max();
uint32_t CMSketch::IncrBy(std::string_view item, uint32_t value) {
uint32_t min_count = std::numeric_limits<uint32_t>::max();

for (size_t i = 0; i < depth_; ++i) {
uint64_t hash = XXH32(item.data(), static_cast<int>(item.size()), i);
uint64_t hash = CountMinSketchHash(item, /*seed=*/i);
size_t loc = GetLocationForHash(hash, i);
if (array_[loc] > UINT32_MAX - value) {
array_[loc] = UINT32_MAX;
// Do overflow check
if (array_[loc] > std::numeric_limits<uint32_t>::max() - value) {
array_[loc] = std::numeric_limits<uint32_t>::max();
} else {
array_[loc] += value;
}
min_count = std::min(min_count, static_cast<size_t>(array_[loc]));
min_count = std::min(min_count, array_[loc]);
}
counter_ += value;
return min_count;
}

size_t CMSketch::Query(std::string_view item) const {
size_t min_count = std::numeric_limits<size_t>::max();
uint32_t CMSketch::Query(std::string_view item) const {
uint32_t min_count = std::numeric_limits<uint32_t>::max();

for (size_t i = 0; i < depth_; ++i) {
uint64_t hash = XXH32(item.data(), static_cast<int>(item.size()), i);
min_count = std::min(min_count, static_cast<size_t>(array_[GetLocationForHash(hash, i)]));
uint64_t hash = CountMinSketchHash(item, /*seed=*/i);
min_count = std::min(min_count, array_[GetLocationForHash(hash, i)]);
}
return min_count;
}
Expand Down
11 changes: 9 additions & 2 deletions src/types/cms.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ class CMSketch {

jonathanc-n marked this conversation as resolved.
Show resolved Hide resolved
static CMSketchDimensions CMSDimFromProb(double error, double delta);

size_t IncrBy(std::string_view item, uint32_t value);
/// Increment the counter of the given item by the specified increment.
///
/// \param item The item to increment. Returns UINT32_MAX if the
/// counter overflows.
uint32_t IncrBy(std::string_view item, uint32_t value);

size_t Query(std::string_view item) const;
uint32_t Query(std::string_view item) const;

static Status Merge(CMSketch* dest, size_t num_keys, std::vector<const CMSketch*> cms_array,
std::vector<uint32_t> weights);
Expand All @@ -67,6 +71,9 @@ class CMSketch {
static int CheckOverflow(CMSketch* dest, size_t quantity, const std::vector<const CMSketch*>& src,
const std::vector<uint32_t>& weights);

private:
static uint64_t CountMinSketchHash(std::string_view item, uint64_t seed);

private:
uint32_t width_;
uint32_t depth_;
Expand Down
68 changes: 30 additions & 38 deletions src/types/redis_cms.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ rocksdb::Status CMS::GetMetadata(engine::Context &ctx, const Slice &ns_key, Coun
return Database::GetMetadata(ctx, {kRedisCountMinSketch}, ns_key, metadata);
}

rocksdb::Status CMS::IncrBy(engine::Context &ctx, const Slice &user_key,
const std::unordered_map<std::string, uint64_t> &elements) {
rocksdb::Status CMS::IncrBy(engine::Context &ctx, const Slice &user_key, const std::vector<IncrByPair> &elements,
std::vector<uint32_t> *counters) {
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
Expand All @@ -52,12 +52,12 @@ rocksdb::Status CMS::IncrBy(engine::Context &ctx, const Slice &user_key,

CMSketch cms(metadata.width, metadata.depth, metadata.counter, metadata.array);

for (auto &element : elements) {
if (element.second > 0 && metadata.counter > std::numeric_limits<uint64_t>::max() - element.second) {
for (const auto &element : elements) {
if (element.value > 0 && metadata.counter > std::numeric_limits<int64_t>::max() - element.value) {
return rocksdb::Status::InvalidArgument("Overflow error: IncrBy would result in counter overflow");
}
cms.IncrBy(element.first, element.second);
metadata.counter += element.second;
uint32_t local_counter = cms.IncrBy(element.key, element.value);
metadata.counter += element.value;
}

metadata.array = std::move(cms.GetArray());
Expand All @@ -83,20 +83,20 @@ rocksdb::Status CMS::Info(engine::Context &ctx, const Slice &user_key, CMSketch:
ret->width = metadata.width;
ret->depth = metadata.depth;
ret->count = metadata.counter;

return rocksdb::Status::OK();
};
}

rocksdb::Status CMS::InitByDim(engine::Context &ctx, const Slice &user_key, uint32_t width, uint32_t depth) {
std::string ns_key = AppendNamespacePrefix(user_key);

size_t memory_used = width * depth * sizeof(uint32_t);
const size_t max_memory = 1 * 1024 * 1024;
// We firstly limit the memory usage to 1MB.
constexpr size_t kMaxMemory = 1 * 1024 * 1024;

if (memory_used == 0) {
return rocksdb::Status::InvalidArgument("Memory usage must be greater than 0.");
}
if (memory_used > max_memory) {
if (memory_used > kMaxMemory) {
return rocksdb::Status::InvalidArgument("Memory usage exceeds 1MB.");
}

Expand Down Expand Up @@ -137,21 +137,7 @@ rocksdb::Status CMS::InitByProb(engine::Context &ctx, const Slice &user_key, dou
return rocksdb::Status::InvalidArgument("Delta must be between 0 and 1 (exclusive).");
}
CMSketch::CMSketchDimensions dim = CMSketch::CMSDimFromProb(error, delta);
auto s = InitByDim(ctx, user_key, dim.width, dim.depth);
if (!s.ok()) {
return s;
}

size_t memory_used = dim.width * dim.depth * sizeof(uint32_t);
const size_t max_memory = 1 * 1024 * 1024;

if (memory_used == 0) {
return rocksdb::Status::InvalidArgument("Memory usage must be greater than 0.");
}
if (memory_used > max_memory) {
return rocksdb::Status::InvalidArgument("Memory usage exceeds 1MB.");
}
return rocksdb::Status::OK();
return InitByDim(ctx, user_key, dim.width, dim.depth);
};

rocksdb::Status CMS::MergeUserKeys(engine::Context &ctx, const Slice &user_key, const std::vector<Slice> &src_keys,
Expand All @@ -165,33 +151,34 @@ rocksdb::Status CMS::MergeUserKeys(engine::Context &ctx, const Slice &user_key,
}

std::string dest_ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), dest_ns_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause deadlock

std::vector<std::string> ns_keys{dest_ns_key};
for (const auto &src_key : src_keys) {
ns_keys.emplace_back(AppendNamespacePrefix(src_key));
}
MultiLockGuard guard(storage_->GetLockManager(), ns_keys);

CountMinSketchMetadata dest_metadata{};
rocksdb::Status dest_status = GetMetadata(ctx, dest_ns_key, &dest_metadata);
if (dest_status.IsNotFound()) {
return rocksdb::Status::InvalidArgument("Destination CMS does not exist.");
}
if (!dest_status.ok()) {
if (dest_status.IsNotFound()) {
return rocksdb::Status::InvalidArgument("Destination CMS does not exist.");
}
return dest_status;
}

CMSketch dest_cms(dest_metadata.width, dest_metadata.depth, dest_metadata.counter, dest_metadata.array);

std::vector<CMSketch> src_cms_objects;
src_cms_objects.reserve(num_sources);
std::vector<const CMSketch *> src_cms_pointers;
src_cms_pointers.reserve(num_sources);
std::vector<uint32_t> weights_long;
weights_long.reserve(num_sources);

for (size_t i = 0; i < num_sources; ++i) {
std::string src_ns_key = AppendNamespacePrefix(src_keys[i]);
LockGuard guard(storage_->GetLockManager(), src_ns_key);

const auto &src_ns_key = ns_keys[i + 1];
CountMinSketchMetadata src_metadata{};
rocksdb::Status src_status = GetMetadata(ctx, src_ns_key, &src_metadata);
if (!src_status.ok()) {
// TODO(mwish): check the not found syntax here.
if (src_status.IsNotFound()) {
return rocksdb::Status::InvalidArgument("Source CMS key not found.");
}
Expand All @@ -204,11 +191,15 @@ rocksdb::Status CMS::MergeUserKeys(engine::Context &ctx, const Slice &user_key,

CMSketch src_cms(src_metadata.width, src_metadata.depth, src_metadata.counter, src_metadata.array);
src_cms_objects.emplace_back(std::move(src_cms));
src_cms_pointers.push_back(&src_cms_objects.back());

weights_long.push_back(static_cast<uint32_t>(src_weights[i]));
}

// Initialize the destination CMS with the source CMSes after initializations
// since vector might resize and reallocate memory.
Comment on lines +198 to +199
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this might cause memory issue

std::vector<const CMSketch *> src_cms_pointers(num_sources);
for (size_t i = 0; i < num_sources; ++i) {
src_cms_pointers[i] = &src_cms_objects[i];
}
auto merge_result = CMSketch::Merge(&dest_cms, num_sources, src_cms_pointers, weights_long);
if (!merge_result.IsOK()) {
return rocksdb::Status::InvalidArgument("Merge operation failed due to overflow or invalid dimensions.");
Expand Down Expand Up @@ -240,7 +231,8 @@ rocksdb::Status CMS::Query(engine::Context &ctx, const Slice &user_key, const st

if (s.IsNotFound()) {
return rocksdb::Status::NotFound();
} else if (!s.ok()) {
}
if (!s.ok()) {
return s;
}

Expand All @@ -251,6 +243,6 @@ rocksdb::Status CMS::Query(engine::Context &ctx, const Slice &user_key, const st
}

return rocksdb::Status::OK();
};
}

} // namespace redis
15 changes: 13 additions & 2 deletions src/types/redis_cms.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#pragma once

#include <vector>

#include "cms.h"
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
Expand All @@ -30,8 +32,17 @@ class CMS : public Database {
public:
explicit CMS(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}

rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key,
const std::unordered_map<std::string, uint64_t> &elements);
struct IncrByPair {
std::string_view key;
int64_t value;
};

/// Increment the counter of the given item(s) by the specified increment(s).
///
/// \param[out] counters The counter values after the increment, if the value is UINT32_MAX,
/// it means the item does overflow.
rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, const std::vector<IncrByPair> &elements,
std::vector<uint32_t> *counters);
rocksdb::Status Info(engine::Context &ctx, const Slice &user_key, CMSketch::CMSInfo *ret);
rocksdb::Status InitByDim(engine::Context &ctx, const Slice &user_key, uint32_t width, uint32_t depth);
rocksdb::Status InitByProb(engine::Context &ctx, const Slice &user_key, double error, double delta);
Expand Down
Loading