From d31d027cba60760462d638a3d0dc0e6dbe712e27 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 1 Jul 2024 11:13:33 +0100 Subject: [PATCH 1/5] Escape loguru tags in request body (#6319) --- tests/infra/clients.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/infra/clients.py b/tests/infra/clients.py index 2e89480575e2..629cd81f3998 100644 --- a/tests/infra/clients.py +++ b/tests/infra/clients.py @@ -161,7 +161,7 @@ def __str__(self): if self.headers: string += f" {truncate(str(self.headers), max_len=25)}" if self.body is not None: - string += f' {truncate(f"{self.body}")}' + string += escape_loguru_tags(f' {truncate(f"{self.body}")}') return string From ec469a4a6b47a12f538da8bcba1790699652b795 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 1 Jul 2024 16:28:58 +0100 Subject: [PATCH 2/5] Remove `serdes` namespace and automatic detection/conversion for msgpack (#6297) --- CHANGELOG.md | 1 + include/ccf/http_consts.h | 1 - include/ccf/json_handler.h | 40 +--- include/ccf/serdes.h | 66 ------- samples/apps/logging/logging.cpp | 12 +- src/clients/perf/perf_client.h | 36 ---- src/clients/rpc_tls_client.h | 28 +-- src/endpoints/common_endpoint_registry.cpp | 5 +- src/endpoints/json_handler.cpp | 168 ++++------------ src/node/jwt_key_auto_refresh.h | 10 +- src/node/node_state.h | 17 +- src/node/rpc/test/frontend_test.cpp | 185 +++++++++--------- src/node/rpc/test/frontend_test_infra.h | 11 +- src/node/rpc/test/node_frontend_test.cpp | 8 +- .../custom_authorization.py | 83 ++++---- 15 files changed, 202 insertions(+), 469 deletions(-) delete mode 100644 include/ccf/serdes.h diff --git a/CHANGELOG.md b/CHANGELOG.md index fd2157218cde..77cff1fd9b2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Removed - Removed the existing metrics endpoint and API (`GET /api/metrics`, `get_metrics_v1`). Stats for request execution can instead be gathered by overriding the `EndpointRegistry::handle_event_request_completed()` method. +- Removed automatic msgpack support from JSON endpoint adapters, and related `include/ccf/serdes.h` file. ## [5.0.0-dev18] diff --git a/include/ccf/http_consts.h b/include/ccf/http_consts.h index 71a9d6dcabe3..7f93b217b647 100644 --- a/include/ccf/http_consts.h +++ b/include/ccf/http_consts.h @@ -31,7 +31,6 @@ namespace ccf namespace contenttype { static constexpr auto JSON = "application/json"; - static constexpr auto MSGPACK = "application/msgpack"; static constexpr auto TEXT = "text/plain"; static constexpr auto OCTET_STREAM = "application/octet-stream"; static constexpr auto GRPC = "application/grpc"; diff --git a/include/ccf/json_handler.h b/include/ccf/json_handler.h index e29ee50f4f2d..09576200516c 100644 --- a/include/ccf/json_handler.h +++ b/include/ccf/json_handler.h @@ -3,7 +3,6 @@ #pragma once #include "ccf/endpoint_registry.h" -#include "ccf/serdes.h" #include #include @@ -11,18 +10,16 @@ namespace ccf { /* - * For simple app methods which expect a JSON request, potentially msgpack'd, - * these functions do the common decoding of the input and setting of response - * fields, to reduce handler complexity and repetition. + * For simple app methods which expect a JSON request these functions do the + * common decoding of the input and setting of response fields, to reduce + * handler complexity and repetition. * * Rather than: * auto foo = [](auto& ctx) { * nlohmann::json params; - * serdes::Pack pack_type; * if () * { - * params = unpack(ctx.rpc_ctx->get_request_body()); - * pack_type = Text; + * params = nlohmann::json::parse(ctx.rpc_ctx->get_request_body()); * } * else * { @@ -35,15 +32,8 @@ namespace ccf * ctx.rpc_ctx->set_response_header(content_type, Text); * ctx.rpc_ctx->set_response_body(error_msg(result)); * } - * if (pack_type == Text) - * { - * ctx.rpc_ctx->set_response_header(content_type, JSON); - * ctx.rpc_ctx->set_response_body(pack(result, Text)); - * } - * else - * { - * ... - * } + * ctx.rpc_ctx->set_response_header(content_type, JSON); + * ctx.rpc_ctx->set_response_body(result.dump()); * }; * * it is possible to write the shorter, clearer, return-based lambda: @@ -65,24 +55,10 @@ namespace ccf using JsonAdapterResponse = std::variant; - char const* pack_to_content_type(serdes::Pack p); - - serdes::Pack detect_json_pack(const std::shared_ptr& ctx); - - serdes::Pack get_response_pack( - const std::shared_ptr& ctx, - serdes::Pack request_pack = serdes::Pack::Text); - - nlohmann::json get_params_from_body( - const std::shared_ptr& ctx, serdes::Pack pack); - - std::pair get_json_params( - const std::shared_ptr& ctx); + nlohmann::json get_json_params(const std::shared_ptr& ctx); void set_response( - JsonAdapterResponse&& res, - std::shared_ptr& ctx, - serdes::Pack request_packing); + JsonAdapterResponse&& res, std::shared_ptr& ctx); } jsonhandler::JsonAdapterResponse make_success(); diff --git a/include/ccf/serdes.h b/include/ccf/serdes.h deleted file mode 100644 index 1fa9d34bd180..000000000000 --- a/include/ccf/serdes.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the Apache 2.0 License. -#pragma once - -#include "ccf/ds/json.h" - -#include -#include - -namespace serdes -{ - enum class Pack - { - Text, - MsgPack - }; - - inline std::vector pack(const nlohmann::json& j, Pack pack) - { - switch (pack) - { - case Pack::Text: - { - auto s = j.dump(); - return std::vector{s.begin(), s.end()}; - } - - case Pack::MsgPack: - return nlohmann::json::to_msgpack(j); - } - - throw std::logic_error("Invalid serdes::Pack"); - } - - inline nlohmann::json unpack(const std::vector& data, Pack pack) - { - switch (pack) - { - case Pack::Text: - return nlohmann::json::parse(data); - - case Pack::MsgPack: - return nlohmann::json::from_msgpack(data); - } - - throw std::logic_error("Invalid serdes::Pack"); - } - - inline std::optional detect_pack( - const std::vector& input) - { - if (input.size() == 0) - { - return std::nullopt; - } - - if (input[0] == '{') - { - return serdes::Pack::Text; - } - else - { - return serdes::Pack::MsgPack; - } - } -} diff --git a/samples/apps/logging/logging.cpp b/samples/apps/logging/logging.cpp index f1071cdbc502..9ab2807b79b0 100644 --- a/samples/apps/logging/logging.cpp +++ b/samples/apps/logging/logging.cpp @@ -1238,8 +1238,6 @@ namespace loggingapp auto get_historical = [this]( ccf::endpoints::ReadOnlyEndpointContext& ctx, ccf::historical::StatePtr historical_state) { - const auto pack = ccf::jsonhandler::detect_json_pack(ctx.rpc_ctx); - // Parse id from query const auto parsed_query = ccf::http::parse_query(ctx.rpc_ctx->get_request_query()); @@ -1265,7 +1263,7 @@ namespace loggingapp LoggingGetHistorical::Out out; out.msg = v.value(); nlohmann::json j = out; - ccf::jsonhandler::set_response(std::move(j), ctx.rpc_ctx, pack); + ccf::jsonhandler::set_response(std::move(j), ctx.rpc_ctx); } else { @@ -1295,8 +1293,6 @@ namespace loggingapp [this]( ccf::endpoints::ReadOnlyEndpointContext& ctx, ccf::historical::StatePtr historical_state) { - const auto pack = ccf::jsonhandler::detect_json_pack(ctx.rpc_ctx); - // Parse id from query const auto parsed_query = ccf::http::parse_query(ctx.rpc_ctx->get_request_query()); @@ -1323,7 +1319,7 @@ namespace loggingapp out.msg = v.value(); assert(historical_state->receipt); out.receipt = ccf::describe_receipt_v1(*historical_state->receipt); - ccf::jsonhandler::set_response(std::move(out), ctx.rpc_ctx, pack); + ccf::jsonhandler::set_response(std::move(out), ctx.rpc_ctx); } else { @@ -1346,8 +1342,6 @@ namespace loggingapp [this]( ccf::endpoints::ReadOnlyEndpointContext& ctx, ccf::historical::StatePtr historical_state) { - const auto pack = ccf::jsonhandler::detect_json_pack(ctx.rpc_ctx); - // Parse id from query const auto parsed_query = ccf::http::parse_query(ctx.rpc_ctx->get_request_query()); @@ -1381,7 +1375,7 @@ namespace loggingapp out.receipt = full_receipt; out.receipt["leaf_components"].erase("claims_digest"); // SNIPPET_END: claims_digest_in_receipt - ccf::jsonhandler::set_response(std::move(out), ctx.rpc_ctx, pack); + ccf::jsonhandler::set_response(std::move(out), ctx.rpc_ctx); } else { diff --git a/src/clients/perf/perf_client.h b/src/clients/perf/perf_client.h index c9d28d1b3cc8..e7cafce92e7a 100644 --- a/src/clients/perf/perf_client.h +++ b/src/clients/perf/perf_client.h @@ -353,42 +353,6 @@ namespace client append_prepared_tx(tx, index); } - void add_prepared_tx( - const std::string& method, - const nlohmann::json& params, - bool expects_commit, - const std::optional& index, - const serdes::Pack& serdes) - { - auto body = serdes::pack(params, serdes); - - const PreparedTx tx{ - rpc_connection->gen_request( - method, - body, - serdes == serdes::Pack::Text ? - ccf::http::headervalues::contenttype::JSON : - ccf::http::headervalues::contenttype::MSGPACK, - HTTP_POST, - options.bearer_token.size() == 0 ? nullptr : - options.bearer_token.c_str()), - method, - expects_commit}; - - append_prepared_tx(tx, index); - } - - void add_prepared_tx( - const std::string& method, - const nlohmann::json& params, - bool expects_commit, - const std::optional& index) - { - const PreparedTx tx{ - rpc_connection->gen_request(method, params), method, expects_commit}; - append_prepared_tx(tx, index); - } - static size_t total_byte_size(const PreparedTxs& txs) { return std::accumulate( diff --git a/src/clients/rpc_tls_client.h b/src/clients/rpc_tls_client.h index 107565b658a4..3ca1924fc865 100644 --- a/src/clients/rpc_tls_client.h +++ b/src/clients/rpc_tls_client.h @@ -4,7 +4,6 @@ #include "ccf/crypto/key_pair.h" #include "ccf/http_consts.h" -#include "ccf/serdes.h" #include "http/http_builder.h" #include "http/http_parser.h" #include "tls_client.h" @@ -131,31 +130,24 @@ namespace client next_send_id++}; } - PreparedRpc gen_request( + Response call( const std::string& method, const nlohmann::json& params = nullptr, - llhttp_method verb = HTTP_POST, - const char* auth_token = nullptr) + llhttp_method verb = HTTP_POST) { std::vector body; if (!params.is_null()) { - body = serdes::pack(params, serdes::Pack::MsgPack); + auto body_s = params.dump(); + body = {body_s.begin(), body_s.end()}; } - return gen_request( + + return call_raw(gen_request( method, - {body.data(), body.size()}, - ccf::http::headervalues::contenttype::MSGPACK, + body, + ccf::http::headervalues::contenttype::JSON, verb, - auth_token); - } - - Response call( - const std::string& method, - const nlohmann::json& params = nullptr, - llhttp_method verb = HTTP_POST) - { - return call_raw(gen_request(method, params, verb, nullptr)); + nullptr)); } Response call( @@ -202,7 +194,7 @@ namespace client { const auto& content_type = resp.headers.find(ccf::http::headers::CONTENT_TYPE); - return serdes::unpack(resp.body, serdes::Pack::MsgPack); + return nlohmann::json::parse(resp.body); } else { diff --git a/src/endpoints/common_endpoint_registry.cpp b/src/endpoints/common_endpoint_registry.cpp index a449a9bc64b2..cc87a9e610fc 100644 --- a/src/endpoints/common_endpoint_registry.cpp +++ b/src/endpoints/common_endpoint_registry.cpp @@ -285,13 +285,12 @@ namespace ccf auto get_receipt = [](auto& ctx, ccf::historical::StatePtr historical_state) { - const auto [pack, params] = - ccf::jsonhandler::get_json_params(ctx.rpc_ctx); + const auto params = ccf::jsonhandler::get_json_params(ctx.rpc_ctx); assert(historical_state->receipt); auto out = ccf::describe_receipt_v1(*historical_state->receipt); ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); - ccf::jsonhandler::set_response(out, ctx.rpc_ctx, pack); + ccf::jsonhandler::set_response(out, ctx.rpc_ctx); }; make_read_only_endpoint( diff --git a/src/endpoints/json_handler.cpp b/src/endpoints/json_handler.cpp index 31f82ba6089c..6e9a7e76387f 100644 --- a/src/endpoints/json_handler.cpp +++ b/src/endpoints/json_handler.cpp @@ -15,128 +15,26 @@ namespace ccf { namespace jsonhandler { - char const* pack_to_content_type(serdes::Pack p) + nlohmann::json get_json_params(const std::shared_ptr& ctx) { - switch (p) - { - case serdes::Pack::Text: - { - return http::headervalues::contenttype::JSON; - } - case serdes::Pack::MsgPack: - { - return http::headervalues::contenttype::MSGPACK; - } - default: - { - return nullptr; - } - } - } - - serdes::Pack detect_json_pack(const std::shared_ptr& ctx) - { - std::optional packing = std::nullopt; - - const auto content_type_it = - ctx->get_request_header(http::headers::CONTENT_TYPE); - if (content_type_it.has_value()) - { - const auto& content_type = content_type_it.value(); - if (content_type == http::headervalues::contenttype::JSON) - { - packing = serdes::Pack::Text; - } - else if (content_type == http::headervalues::contenttype::MSGPACK) - { - packing = serdes::Pack::MsgPack; - } - else - { - throw RpcException( - HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, - ccf::errors::UnsupportedContentType, - fmt::format( - "Unsupported content type {}. Only {} and {} are currently " - "supported", - content_type, - http::headervalues::contenttype::JSON, - http::headervalues::contenttype::MSGPACK)); - } - } - else - { - packing = serdes::detect_pack(ctx->get_request_body()); - } - - return packing.value_or(serdes::Pack::Text); - } - - serdes::Pack get_response_pack( - const std::shared_ptr& ctx, serdes::Pack request_pack) - { - const auto accept_it = ctx->get_request_header(http::headers::ACCEPT); - if (accept_it.has_value()) - { - const auto accept_options = - ::http::parse_accept_header(accept_it.value()); - for (const auto& option : accept_options) - { - if (option.matches(http::headervalues::contenttype::JSON)) - { - return serdes::Pack::Text; - } - if (option.matches(http::headervalues::contenttype::MSGPACK)) - { - return serdes::Pack::MsgPack; - } - } - - throw RpcException( - HTTP_STATUS_NOT_ACCEPTABLE, - ccf::errors::UnsupportedContentType, - fmt::format( - "No supported content type in accept header: {}\nOnly {} and {} " - "are currently supported", - accept_it.value(), - http::headervalues::contenttype::JSON, - http::headervalues::contenttype::MSGPACK)); - } - - return request_pack; - } - - nlohmann::json get_params_from_body( - const std::shared_ptr& ctx, serdes::Pack pack) - { - return serdes::unpack(ctx->get_request_body(), pack); - } - - std::pair get_json_params( - const std::shared_ptr& ctx) - { - const auto pack = detect_json_pack(ctx); - nlohmann::json params = nullptr; if ( !ctx->get_request_body().empty() // Body of GET is ignored && ctx->get_request_verb() != HTTP_GET) { - params = get_params_from_body(ctx, pack); + params = nlohmann::json::parse(ctx->get_request_body()); } else { params = nlohmann::json::object(); } - return std::make_pair(pack, params); + return params; } void set_response( - JsonAdapterResponse&& res, - std::shared_ptr& ctx, - serdes::Pack request_packing) + JsonAdapterResponse&& res, std::shared_ptr& ctx) { auto error = std::get_if(&res); if (error != nullptr) @@ -160,28 +58,41 @@ namespace ccf else { ctx->set_response_status(HTTP_STATUS_OK); - const auto packing = get_response_pack(ctx, request_packing); - switch (packing) + const auto accept_it = + ctx->get_request_header(http::headers::ACCEPT); + if (accept_it.has_value()) { - case serdes::Pack::Text: - { - const auto s = body->dump(); - ctx->set_response_body( - std::vector(s.begin(), s.end())); - break; - } - case serdes::Pack::MsgPack: + const auto accept_options = + ::http::parse_accept_header(accept_it.value()); + bool matched = false; + for (const auto& option : accept_options) { - ctx->set_response_body(nlohmann::json::to_msgpack(*body)); - break; + if (option.matches(http::headervalues::contenttype::JSON)) + { + matched = true; + break; + } } - default: + + if (!matched) { - throw std::logic_error("Unhandled serdes::Pack"); + throw RpcException( + HTTP_STATUS_NOT_ACCEPTABLE, + ccf::errors::UnsupportedContentType, + fmt::format( + "No supported content type in accept header: {}\nOnly {} " + "is currently supported", + accept_it.value(), + http::headervalues::contenttype::JSON)); } } + + const auto s = body->dump(); + ctx->set_response_body(std::vector(s.begin(), s.end())); + ctx->set_response_header( - http::headers::CONTENT_TYPE, pack_to_content_type(packing)); + http::headers::CONTENT_TYPE, + http::headervalues::contenttype::JSON); } } } @@ -220,9 +131,8 @@ namespace ccf endpoints::EndpointFunction json_adapter(const HandlerJsonParamsAndForward& f) { return [f](endpoints::EndpointContext& ctx) { - auto [packing, params] = jsonhandler::get_json_params(ctx.rpc_ctx); - jsonhandler::set_response( - f(ctx, std::move(params)), ctx.rpc_ctx, packing); + auto params = jsonhandler::get_json_params(ctx.rpc_ctx); + jsonhandler::set_response(f(ctx, std::move(params)), ctx.rpc_ctx); }; } @@ -230,9 +140,8 @@ namespace ccf const ReadOnlyHandlerWithJson& f) { return [f](endpoints::ReadOnlyEndpointContext& ctx) { - auto [packing, params] = jsonhandler::get_json_params(ctx.rpc_ctx); - jsonhandler::set_response( - f(ctx, std::move(params)), ctx.rpc_ctx, packing); + auto params = jsonhandler::get_json_params(ctx.rpc_ctx); + jsonhandler::set_response(f(ctx, std::move(params)), ctx.rpc_ctx); }; } @@ -240,9 +149,8 @@ namespace ccf const CommandHandlerWithJson& f) { return [f](endpoints::CommandEndpointContext& ctx) { - auto [packing, params] = jsonhandler::get_json_params(ctx.rpc_ctx); - jsonhandler::set_response( - f(ctx, std::move(params)), ctx.rpc_ctx, packing); + auto params = jsonhandler::get_json_params(ctx.rpc_ctx); + jsonhandler::set_response(f(ctx, std::move(params)), ctx.rpc_ctx); }; } } diff --git a/src/node/jwt_key_auto_refresh.h b/src/node/jwt_key_auto_refresh.h index 5b223b6a0df3..b81129f79b6d 100644 --- a/src/node/jwt_key_auto_refresh.h +++ b/src/node/jwt_key_auto_refresh.h @@ -2,7 +2,6 @@ // Licensed under the Apache 2.0 License. #pragma once -#include "ccf/serdes.h" #include "ccf/service/tables/jwt.h" #include "http/http_builder.h" #include "http/http_rpc_context.h" @@ -105,16 +104,15 @@ namespace ccf template void send_refresh_jwt_keys(T msg) { - auto body = serdes::pack(msg, serdes::Pack::Text); - ::http::Request request(fmt::format( "/{}/{}", ccf::get_actor_prefix(ccf::ActorsType::nodes), "jwt_keys/refresh")); request.set_header( - ccf::http::headers::CONTENT_TYPE, - ccf::http::headervalues::contenttype::JSON); - request.set_body(&body); + http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON); + + auto body = nlohmann::json(msg).dump(); + request.set_body(body); auto packed = request.build_request(); diff --git a/src/node/node_state.h b/src/node/node_state.h index ff1898473f78..485eb21d086e 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -11,7 +11,6 @@ #include "ccf/pal/attestation.h" #include "ccf/pal/locking.h" #include "ccf/pal/platform.h" -#include "ccf/serdes.h" #include "ccf/service/node_info_network.h" #include "ccf/service/tables/acme_certificates.h" #include "ccf/service/tables/service.h" @@ -619,7 +618,7 @@ namespace ccf return; } - auto j = serdes::unpack(data, serdes::Pack::Text); + auto j = nlohmann::json::parse(data); JoinNetworkNodeToNode::Out resp; try @@ -773,16 +772,15 @@ namespace ccf LOG_DEBUG_FMT( "Sending join request to {}", config.join.target_rpc_address); - const auto body = serdes::pack(join_params, serdes::Pack::Text); + const auto body = nlohmann::json(join_params).dump(); - LOG_DEBUG_FMT( - "Sending join request body: {}", std::string(body.begin(), body.end())); + LOG_DEBUG_FMT("Sending join request body: {}", body); ::http::Request r( fmt::format("/{}/{}", get_actor_prefix(ActorsType::nodes), "join")); r.set_header( http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON); - r.set_body(&body); + r.set_body(body); join_client->send_request(std::move(r)); } @@ -1905,7 +1903,7 @@ namespace ccf create_params.service_data = config.service_data; create_params.create_txid = {create_view, last_recovered_signed_idx + 1}; - const auto body = serdes::pack(create_params, serdes::Pack::Text); + const auto body = nlohmann::json(create_params).dump(); ::http::Request request( fmt::format("/{}/{}", get_actor_prefix(ActorsType::nodes), "create")); @@ -1913,7 +1911,7 @@ namespace ccf ccf::http::headers::CONTENT_TYPE, ccf::http::headervalues::contenttype::JSON); - request.set_body(&body); + request.set_body(body); return request.build_request(); } @@ -1936,8 +1934,7 @@ namespace ccf return false; } - const auto body = - serdes::unpack(ctx->get_response_body(), serdes::Pack::Text); + const auto body = nlohmann::json::parse(ctx->get_response_body()); if (!body.is_boolean()) { LOG_FAIL_FMT("Expected boolean body in create response"); diff --git a/src/node/rpc/test/frontend_test.cpp b/src/node/rpc/test/frontend_test.cpp index 21609252df48..21d5fe93b600 100644 --- a/src/node/rpc/test/frontend_test.cpp +++ b/src/node/rpc/test/frontend_test.cpp @@ -8,7 +8,6 @@ #include "ccf/ds/logger.h" #include "ccf/json_handler.h" #include "ccf/kv/map.h" -#include "ccf/serdes.h" #include "crypto/openssl/hash.h" #include "ds/files.h" #include "enclave/enclave_time.h" @@ -187,7 +186,7 @@ class TestExplicitCommitability : public BaseTestFrontend auto maybe_commit = [this](ccf::endpoints::EndpointContext& ctx) { const auto parsed = - serdes::unpack(ctx.rpc_ctx->get_request_body(), default_pack); + nlohmann::json::parse(ctx.rpc_ctx->get_request_body()); const auto new_value = parsed["value"].get(); auto vs = ctx.tx.rw(values); @@ -414,14 +413,12 @@ class TestForwardingMemberFrontEnd : public MemberRpcFrontend, } }; -auto create_simple_request( - const std::string& method = "/empty_function", - serdes::Pack pack = default_pack) +auto create_simple_request(const std::string& method = "/empty_function") { ::http::Request request(method); request.set_header( ccf::http::headers::CONTENT_TYPE, - ccf::jsonhandler::pack_to_content_type(pack)); + ccf::http::headervalues::contenttype::JSON); return request; } @@ -437,10 +434,9 @@ ::http::SimpleResponseProcessor::Response parse_response( return processor.received.front(); } -nlohmann::json parse_response_body( - const vector& body, serdes::Pack pack = default_pack) +nlohmann::json parse_response_body(const vector& body) { - return serdes::unpack(body, pack); + return nlohmann::json::parse(body); } // callers used throughout @@ -662,106 +658,103 @@ TEST_CASE("JsonWrappedEndpointFunction") NetworkState network; prepare_callers(network); TestJsonWrappedEndpointFunction frontend(*network.tables); - for (const auto pack_type : {serdes::Pack::Text, serdes::Pack::MsgPack}) + {{INFO("Calling echo, with params in body"); + auto echo_call = create_simple_request("/echo"); + const nlohmann::json j_body = { + {"data", {"nested", "Some string"}}, {"other", "Another string"}}; + const auto serialized_body = j_body.dump(); + echo_call.set_body(serialized_body); + const auto serialized_call = echo_call.build_request(); + + auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); + CHECK(response.status == HTTP_STATUS_OK); + + const auto response_body = parse_response_body(response.body); + CHECK(response_body == j_body); +} + +{ + INFO("Calling echo_query, with params in query"); + auto echo_call = create_simple_request("/echo_parsed_query"); + const std::map query_params = { + {"foo", "helloworld"}, + {"bar", "1"}, + {"fooz", "\"2\""}, + {"baz", "\"awkward\"\"escapes"}}; + for (const auto& [k, v] : query_params) { - { - INFO("Calling echo, with params in body"); - auto echo_call = create_simple_request("/echo", pack_type); - const nlohmann::json j_body = { - {"data", {"nested", "Some string"}}, {"other", "Another string"}}; - const auto serialized_body = serdes::pack(j_body, pack_type); - echo_call.set_body(serialized_body.data(), serialized_body.size()); - const auto serialized_call = echo_call.build_request(); - - auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - frontend.process(rpc_ctx); - auto response = parse_response(rpc_ctx->serialise_response()); - CHECK(response.status == HTTP_STATUS_OK); + echo_call.set_query_param(k, v); + } - const auto response_body = parse_response_body(response.body, pack_type); - CHECK(response_body == j_body); - } + const auto serialized_call = echo_call.build_request(); - { - INFO("Calling echo_query, with params in query"); - auto echo_call = create_simple_request("/echo_parsed_query", pack_type); - const std::map query_params = { - {"foo", "helloworld"}, - {"bar", "1"}, - {"fooz", "\"2\""}, - {"baz", "\"awkward\"\"escapes"}}; - for (const auto& [k, v] : query_params) - { - echo_call.set_query_param(k, v); - } + auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); + CHECK(response.status == HTTP_STATUS_OK); - const auto serialized_call = echo_call.build_request(); + const auto response_body = parse_response_body(response.body); + const auto response_map = response_body.get(); + CHECK(response_map == query_params); +} - auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - frontend.process(rpc_ctx); - auto response = parse_response(rpc_ctx->serialise_response()); - CHECK(response.status == HTTP_STATUS_OK); +{ + INFO("Calling get_caller"); + const auto get_caller = create_simple_request("/get_caller"); + const auto serialized_call = get_caller.build_request(); - const auto response_body = parse_response_body(response.body, pack_type); - const auto response_map = response_body.get(); - CHECK(response_map == query_params); - } + auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); + CHECK(response.status == HTTP_STATUS_OK); - { - INFO("Calling get_caller"); - const auto get_caller = create_simple_request("/get_caller", pack_type); - const auto serialized_call = get_caller.build_request(); + const auto response_body = parse_response_body(response.body); + CHECK(response_body == user_id); +} +} - auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - frontend.process(rpc_ctx); - auto response = parse_response(rpc_ctx->serialise_response()); - CHECK(response.status == HTTP_STATUS_OK); +{ + INFO("Calling failable, without failing"); + auto dont_fail = create_simple_request("/failable"); + const auto serialized_call = dont_fail.build_request(); - const auto response_body = parse_response_body(response.body, pack_type); - CHECK(response_body == user_id); - } - } + auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); + frontend.process(rpc_ctx); + auto response = parse_response(rpc_ctx->serialise_response()); + CHECK(response.status == HTTP_STATUS_OK); +} +{ + for (const auto err : { + HTTP_STATUS_INTERNAL_SERVER_ERROR, + HTTP_STATUS_BAD_REQUEST, + (http_status)418 // Teapot + }) { - INFO("Calling failable, without failing"); - auto dont_fail = create_simple_request("/failable"); - const auto serialized_call = dont_fail.build_request(); + INFO("Calling failable, with error"); + const auto msg = fmt::format("An error message about {}", err); + auto fail = create_simple_request("/failable"); + const nlohmann::json j_body = { + {"error", {{"code", err}, {"message", msg}}}}; + const auto serialized_body = j_body.dump(); + fail.set_body(serialized_body); + const auto serialized_call = fail.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); frontend.process(rpc_ctx); auto response = parse_response(rpc_ctx->serialise_response()); - CHECK(response.status == HTTP_STATUS_OK); - } - - { - for (const auto err : { - HTTP_STATUS_INTERNAL_SERVER_ERROR, - HTTP_STATUS_BAD_REQUEST, - (http_status)418 // Teapot - }) - { - INFO("Calling failable, with error"); - const auto msg = fmt::format("An error message about {}", err); - auto fail = create_simple_request("/failable"); - const nlohmann::json j_body = { - {"error", {{"code", err}, {"message", msg}}}}; - const auto serialized_body = serdes::pack(j_body, default_pack); - fail.set_body(serialized_body.data(), serialized_body.size()); - const auto serialized_call = fail.build_request(); - - auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_call); - frontend.process(rpc_ctx); - auto response = parse_response(rpc_ctx->serialise_response()); - CHECK(response.status == err); - CHECK( - response.headers[ccf::http::headers::CONTENT_TYPE] == - ccf::http::headervalues::contenttype::JSON); - const std::string body_s(response.body.begin(), response.body.end()); - auto body_j = nlohmann::json::parse(body_s); - CHECK(body_j["error"]["message"] == msg); - } + CHECK(response.status == err); + CHECK( + response.headers[ccf::http::headers::CONTENT_TYPE] == + ccf::http::headervalues::contenttype::JSON); + const std::string body_s(response.body.begin(), response.body.end()); + auto body_j = nlohmann::json::parse(body_s); + CHECK(body_j["error"]["message"] == msg); } } +} TEST_CASE("Restricted verbs") { @@ -887,8 +880,8 @@ TEST_CASE("Explicit commitability") const nlohmann::json request_body = { {"value", new_value}, {"status", status}}; - const auto serialized_body = serdes::pack(request_body, default_pack); - request.set_body(&serialized_body); + const auto serialized_body = request_body.dump(); + request.set_body(serialized_body); const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); @@ -923,8 +916,8 @@ TEST_CASE("Explicit commitability") const nlohmann::json request_body = { {"value", new_value}, {"apply", apply}, {"status", status}}; - const auto serialized_body = serdes::pack(request_body, default_pack); - request.set_body(&serialized_body); + const auto serialized_body = request_body.dump(); + request.set_body(serialized_body); const auto serialized_request = request.build_request(); auto rpc_ctx = ccf::make_rpc_context(user_session, serialized_request); diff --git a/src/node/rpc/test/frontend_test_infra.h b/src/node/rpc/test/frontend_test_infra.h index 0d54fbb1f597..815dbb7d4264 100644 --- a/src/node/rpc/test/frontend_test_infra.h +++ b/src/node/rpc/test/frontend_test_infra.h @@ -5,7 +5,6 @@ #include "ccf/app_interface.h" #include "ccf/crypto/rsa_key_pair.h" #include "ccf/ds/logger.h" -#include "ccf/serdes.h" #include "ccf/service/signed_req.h" #include "ds/files.h" #include "kv/test/null_encryptor.h" @@ -22,7 +21,6 @@ using namespace ccf; using namespace ccf; using namespace std; -using namespace serdes; using namespace nlohmann; using TResponse = ::http::SimpleResponseProcessor::Response; @@ -43,15 +41,13 @@ auto dummy_enc_pubk = ccf::crypto::make_rsa_key_pair() -> public_key_pem(); auto encryptor = std::make_shared(); -constexpr auto default_pack = serdes::Pack::Text; - template T parse_response_body(const TResponse& r) { nlohmann::json body_j; try { - body_j = serdes::unpack(r.body, serdes::Pack::Text); + body_j = nlohmann::json::parse(r.body); } catch (const nlohmann::json::parse_error& e) { @@ -82,9 +78,8 @@ std::vector create_request( const json& params, const string& method_name, llhttp_method verb = HTTP_POST) { ::http::Request r(fmt::format("/gov/{}", method_name), verb); - const auto body = params.is_null() ? std::vector() : - serdes::pack(params, default_pack); - r.set_body(&body); + const auto body = params.is_null() ? std::string() : params.dump(); + r.set_body(body); return r.build_request(); } diff --git a/src/node/rpc/test/node_frontend_test.cpp b/src/node/rpc/test/node_frontend_test.cpp index c99ea27ffd1e..81324870b315 100644 --- a/src/node/rpc/test/node_frontend_test.cpp +++ b/src/node/rpc/test/node_frontend_test.cpp @@ -4,7 +4,6 @@ #include "ccf/crypto/pem.h" #include "ccf/crypto/verifier.h" #include "ccf/ds/logger.h" -#include "ccf/serdes.h" #include "crypto/openssl/hash.h" #include "frontend_test_infra.h" #include "kv/test/null_encryptor.h" @@ -15,7 +14,6 @@ using namespace ccf; using namespace nlohmann; -using namespace serdes; using TResponse = ::http::SimpleResponseProcessor::Response; @@ -28,10 +26,8 @@ TResponse frontend_process( const ccf::crypto::Pem& caller) { ::http::Request r(method); - const auto body = json_params.is_null() ? - std::vector() : - serdes::pack(json_params, Pack::Text); - r.set_body(&body); + const auto body = json_params.is_null() ? std::string() : json_params.dump(); + r.set_body(body); auto serialise_request = r.build_request(); auto session = diff --git a/tests/js-custom-authorization/custom_authorization.py b/tests/js-custom-authorization/custom_authorization.py index 0448237ba21c..36b5bf894e23 100644 --- a/tests/js-custom-authorization/custom_authorization.py +++ b/tests/js-custom-authorization/custom_authorization.py @@ -770,10 +770,6 @@ def test_accept_header(network, args): assert r.status_code == http.HTTPStatus.OK.value assert r.headers["content-type"] == "application/json" - r = c.get("/node/commit", headers={"accept": "application/msgpack"}) - assert r.status_code == http.HTTPStatus.OK.value - assert r.headers["content-type"] == "application/msgpack" - r = c.get( "/node/commit", headers={"accept": "text/html;q=0.9,image/jpeg;video/mpeg;q=0.8,*/*;q=0.1"}, @@ -789,15 +785,6 @@ def test_accept_header(network, args): assert r.status_code == http.HTTPStatus.OK.value assert r.headers["content-type"] == "application/json" - r = c.get( - "/node/commit", - headers={ - "accept": "text/html;q=0.9,image/jpeg;video/mpeg;q=0.8,application/msgpack;q=0.1" - }, - ) - assert r.status_code == http.HTTPStatus.OK.value - assert r.headers["content-type"] == "application/msgpack" - return network @@ -1310,28 +1297,28 @@ def run_interpreter_reuse(args): if __name__ == "__main__": cr = ConcurrentRunner() - cr.add( - "authz", - run, - nodes=infra.e2e_args.nodes(cr.args, 1), - js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-custom-authorization"), - ) - - cr.add( - "limits", - run_limits, - nodes=infra.e2e_args.nodes(cr.args, 1), - js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-limits"), - ) - - cr.add( - "authn", - run_authn, - nodes=infra.e2e_args.nodes(cr.args, 1), - js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-authentication"), - initial_user_count=4, - initial_member_count=2, - ) + # cr.add( + # "authz", + # run, + # nodes=infra.e2e_args.nodes(cr.args, 1), + # js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-custom-authorization"), + # ) + + # cr.add( + # "limits", + # run_limits, + # nodes=infra.e2e_args.nodes(cr.args, 1), + # js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-limits"), + # ) + + # cr.add( + # "authn", + # run_authn, + # nodes=infra.e2e_args.nodes(cr.args, 1), + # js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-authentication"), + # initial_user_count=4, + # initial_member_count=2, + # ) cr.add( "content_types", @@ -1340,18 +1327,18 @@ def run_interpreter_reuse(args): js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-content-types"), ) - cr.add( - "api", - run_api, - nodes=infra.e2e_args.nodes(cr.args, 1), - js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-api"), - ) - - cr.add( - "interpreter_reuse", - run_interpreter_reuse, - nodes=infra.e2e_args.min_nodes(cr.args, f=1), - js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-interpreter-reuse"), - ) + # cr.add( + # "api", + # run_api, + # nodes=infra.e2e_args.nodes(cr.args, 1), + # js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-api"), + # ) + + # cr.add( + # "interpreter_reuse", + # run_interpreter_reuse, + # nodes=infra.e2e_args.min_nodes(cr.args, f=1), + # js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-interpreter-reuse"), + # ) cr.run() From 4a4252cab005c05b8047ccc6a5214e2113a61d5b Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 1 Jul 2024 18:29:04 +0100 Subject: [PATCH 3/5] Historical queries cache size soft limit (#6282) Co-authored-by: Amaury Chamayou Co-authored-by: Amaury Chamayou --- .daily_canary | 2 +- doc/build_apps/api.rst | 2 +- include/ccf/historical_queries_interface.h | 9 + samples/apps/logging/logging.cpp | 6 + src/node/historical_queries.h | 187 +++++++++++++++++++-- src/node/rpc/test/node_stub.h | 2 + src/node/test/historical_queries.cpp | 141 ++++++++++++++++ 7 files changed, 334 insertions(+), 15 deletions(-) diff --git a/.daily_canary b/.daily_canary index 899c44f0baf3..b0c8ecafff33 100644 --- a/.daily_canary +++ b/.daily_canary @@ -3,4 +3,4 @@ ( V ) / . \ | +---=---' /--x-m- /--n-n---xXx--/--yY------>>>----<<<>>]]{{}}---||-/\---.. 2024__ -!..!! \ No newline at end of file +!..! diff --git a/doc/build_apps/api.rst b/doc/build_apps/api.rst index ea18460c7044..7b944c5ede28 100644 --- a/doc/build_apps/api.rst +++ b/doc/build_apps/api.rst @@ -125,7 +125,7 @@ Historical Queries .. doxygenclass:: ccf::historical::AbstractStateCache :project: CCF - :members: set_default_expiry_duration, get_state_at, get_store_at, get_store_range, drop_cached_states + :members: set_default_expiry_duration, set_soft_cache_limit, get_state_at, get_store_at, get_store_range, drop_cached_states .. doxygenstruct:: ccf::historical::State :project: CCF diff --git a/include/ccf/historical_queries_interface.h b/include/ccf/historical_queries_interface.h index f94516d81aa8..f4bc6ba9f4d4 100644 --- a/include/ccf/historical_queries_interface.h +++ b/include/ccf/historical_queries_interface.h @@ -49,6 +49,8 @@ namespace ccf::historical using ExpiryDuration = std::chrono::seconds; + using CacheSize = size_t; + /** Stores the progress of historical query requests. * * A request will generally need to be made multiple times (with the same @@ -79,6 +81,13 @@ namespace ccf::historical virtual void set_default_expiry_duration( ExpiryDuration seconds_until_expiry) = 0; + /** Set the cache limit (in bytes) to evict least recently used requests + * from the cache after its size grows beyond this limit. The limit is not + * strict. It is estimated based on serialized states' sizes approximation + * and is checked once per tick, and so it can overflow for a short time. + */ + virtual void set_soft_cache_limit(CacheSize cache_limit) = 0; + /** EXPERIMENTAL: Set the tracking of deletes on missing keys for historical * queries. * diff --git a/samples/apps/logging/logging.cpp b/samples/apps/logging/logging.cpp index 9ab2807b79b0..59081fdd90d6 100644 --- a/samples/apps/logging/logging.cpp +++ b/samples/apps/logging/logging.cpp @@ -460,6 +460,12 @@ namespace loggingapp PUBLIC_RECORDS, context, 10000, 20); context.get_indexing_strategies().install_strategy(index_per_public_key); + // According to manual obvervation it's enough to start evicting old + // requests on historical perf test, but not too small to get stuck + // because of a single request being larget than the cache. + constexpr size_t cache_limit = 1024 * 1024 * 10; // MB + context.get_historical_state().set_soft_cache_limit(cache_limit); + const ccf::AuthnPolicies auth_policies = { ccf::jwt_auth_policy, ccf::user_cert_auth_policy, diff --git a/src/node/historical_queries.h b/src/node/historical_queries.h index 0eec326355f0..46599b5123fe 100644 --- a/src/node/historical_queries.h +++ b/src/node/historical_queries.h @@ -64,6 +64,7 @@ FMT_END_NAMESPACE namespace ccf::historical { static constexpr auto slow_fetch_threshold = std::chrono::milliseconds(1000); + static constexpr size_t soft_to_raw_ratio{5}; static std::optional get_signature( const ccf::kv::StorePtr& sig_store) @@ -95,10 +96,9 @@ namespace ccf::historical // whether to keep all the writes so that we can build a diff later bool track_deletes_on_missing_keys_v = false; - enum class RequestStage + enum class StoreStage { Fetching, - Untrusted, Trusted, }; @@ -132,7 +132,7 @@ namespace ccf::historical struct StoreDetails { std::chrono::milliseconds time_until_fetch = {}; - RequestStage current_stage = RequestStage::Fetching; + StoreStage current_stage = StoreStage::Fetching; ccf::crypto::Sha256Hash entry_digest = {}; ccf::ClaimsDigest claims_digest = {}; ccf::kv::StorePtr store = nullptr; @@ -234,11 +234,13 @@ namespace ccf::historical return {}; } - void adjust_ranges( + std::pair, std::vector> adjust_ranges( const SeqNoCollection& new_seqnos, bool should_include_receipts, SeqNo earliest_ledger_secret_seqno) { + std::vector removed{}, added{}; + bool any_diff = false; // If a seqno is earlier than the earliest known ledger secret, we will @@ -266,6 +268,7 @@ namespace ccf::historical { // No longer looking for a seqno which was previously requested. // Remove it from my_stores + removed.push_back(prev_it->first); prev_it = my_stores.erase(prev_it); any_diff |= true; } @@ -279,6 +282,7 @@ namespace ccf::historical { // If this is too early for known secrets, just record that it // was requested but don't add it to all_stores yet + added.push_back(*new_it); prev_it = my_stores.insert_or_assign(prev_it, *new_it, nullptr); any_too_early = true; } @@ -293,6 +297,7 @@ namespace ccf::historical details = std::make_shared(); all_stores.insert_or_assign(all_it, *new_it, details); } + added.push_back(*new_it); prev_it = my_stores.insert_or_assign(prev_it, *new_it, details); } any_diff |= true; @@ -311,7 +316,7 @@ namespace ccf::historical if (!any_diff && (should_include_receipts == include_receipts)) { HISTORICAL_LOG("Identical to previous request"); - return; + return {removed, added}; } include_receipts = should_include_receipts; @@ -331,6 +336,7 @@ namespace ccf::historical populate_receipts(seqno); } } + return {removed, added}; } void populate_receipts(ccf::SeqNo new_seqno) @@ -493,6 +499,136 @@ namespace ccf::historical ExpiryDuration default_expiry_duration = std::chrono::seconds(1800); + // These two combine into an effective O(log(N)) lookup/add/remove by + // handle. + std::list lru_requests; + std::map::iterator> lru_lookup; + + // To maintain the estimated size consumed by all requests. Gets updated + // when ledger entries are fetched, and when requests are dropped. + std::unordered_map> store_to_requests; + std::unordered_map raw_store_sizes{}; + + CacheSize soft_store_cache_limit{1ll * 1024 * 1024 * 512 /*512 MB*/}; + CacheSize soft_store_cache_limit_raw = + soft_store_cache_limit / soft_to_raw_ratio; + CacheSize estimated_store_cache_size{0}; + + void add_request_ref(SeqNo seq, CompoundHandle handle) + { + auto it = store_to_requests.find(seq); + + if (it == store_to_requests.end()) + { + store_to_requests.insert({seq, {handle}}); + auto size = raw_store_sizes.find(seq); + if (size != raw_store_sizes.end()) + { + estimated_store_cache_size += size->second; + } + } + else + { + it->second.insert(handle); + } + } + + void add_request_refs(CompoundHandle handle) + { + for (const auto& [seq, _] : requests.at(handle).my_stores) + { + add_request_ref(seq, handle); + } + } + + void remove_request_ref(SeqNo seq, CompoundHandle handle) + { + auto it = store_to_requests.find(seq); + assert(it != store_to_requests.end()); + + it->second.erase(handle); + if (it->second.empty()) + { + store_to_requests.erase(it); + auto size = raw_store_sizes.find(seq); + if (size != raw_store_sizes.end()) + { + estimated_store_cache_size -= size->second; + raw_store_sizes.erase(size); + } + } + } + + void remove_request_refs(CompoundHandle handle) + { + for (const auto& [seq, _] : requests.at(handle).my_stores) + { + remove_request_ref(seq, handle); + } + } + + void lru_promote(CompoundHandle handle) + { + auto it = lru_lookup.find(handle); + if (it != lru_lookup.end()) + { + lru_requests.erase(it->second); + it->second = lru_requests.insert(lru_requests.begin(), handle); + } + else + { + lru_lookup[handle] = lru_requests.insert(lru_requests.begin(), handle); + add_request_refs(handle); + } + } + + void lru_shrink_to_fit(size_t threshold) + { + while (estimated_store_cache_size > threshold) + { + if (lru_requests.empty()) + { + LOG_FAIL_FMT( + "LRU shrink to {} requested but cache is already empty", threshold); + return; + } + + const auto handle = lru_requests.back(); + LOG_DEBUG_FMT( + "Cache size shrinking (reached {} / {}). Dropping {}", + estimated_store_cache_size, + threshold, + handle); + + remove_request_refs(handle); + lru_lookup.erase(handle); + + requests.erase(handle); + lru_requests.pop_back(); + } + } + + void lru_evict(CompoundHandle handle) + { + auto it = lru_lookup.find(handle); + if (it != lru_lookup.end()) + { + remove_request_refs(handle); + lru_requests.erase(it->second); + lru_lookup.erase(it); + } + } + + void update_store_raw_size(SeqNo seq, size_t new_size) + { + auto& stored_size = raw_store_sizes[seq]; + assert(!stored_size || stored_size == new_size); + + estimated_store_cache_size -= stored_size; + estimated_store_cache_size += new_size; + stored_size = new_size; + } + void fetch_entry_at(ccf::SeqNo seqno) { fetch_entries_range(seqno, seqno); @@ -577,7 +713,7 @@ namespace ccf::historical { // Deserialisation includes a GCM integrity check, so all entries // have been verified by the time we get here. - details->current_stage = RequestStage::Trusted; + details->current_stage = StoreStage::Trusted; details->has_commit_evidence = has_commit_evidence; details->entry_digest = entry_digest; @@ -760,6 +896,8 @@ namespace ccf::historical HISTORICAL_LOG("First time I've seen handle {}", handle); } + lru_promote(handle); + Request& request = it->second; update_earliest_known_ledger_secret(); @@ -772,9 +910,18 @@ namespace ccf::historical seqnos.size(), *seqnos.begin(), include_receipts); - request.adjust_ranges( + auto [removed, added] = request.adjust_ranges( seqnos, include_receipts, earliest_secret_.valid_from); + for (auto seq : removed) + { + remove_request_ref(seq, handle); + } + for (auto seq : added) + { + add_request_ref(seq, handle); + } + // If the earliest target entry cannot be deserialised with the earliest // known ledger secret, record the target seqno and begin fetching the // previous historical ledger secret. @@ -789,10 +936,9 @@ namespace ccf::historical for (auto seqno : seqnos) { auto target_details = request.get_store_details(seqno); - if ( target_details != nullptr && - target_details->current_stage == RequestStage::Trusted && + target_details->current_stage == StoreStage::Trusted && (!request.include_receipts || target_details->receipt != nullptr)) { // Have this store, associated txid and receipt and trust it - add @@ -823,6 +969,7 @@ namespace ccf::historical { if (request_it->second.get_store_details(seqno) != nullptr) { + lru_evict(request_it->first); request_it = requests.erase(request_it); } else @@ -977,6 +1124,12 @@ namespace ccf::historical default_expiry_duration = duration; } + void set_soft_cache_limit(CacheSize cache_limit) + { + soft_store_cache_limit = cache_limit; + soft_store_cache_limit_raw = soft_store_cache_limit / soft_to_raw_ratio; + } + void track_deletes_on_missing_keys(bool track) { track_deletes_on_missing_keys_v = track; @@ -985,8 +1138,8 @@ namespace ccf::historical bool drop_cached_states(const CompoundHandle& handle) { std::lock_guard guard(requests_lock); + lru_evict(handle); const auto erased_count = requests.erase(handle); - HISTORICAL_LOG("Dropping historical request {}", handle); return erased_count > 0; } @@ -1000,8 +1153,7 @@ namespace ccf::historical std::lock_guard guard(requests_lock); const auto it = all_stores.find(seqno); auto details = it == all_stores.end() ? nullptr : it->second.lock(); - if ( - details == nullptr || details->current_stage != RequestStage::Fetching) + if (details == nullptr || details->current_stage != StoreStage::Fetching) { // Unexpected entry, we already have it or weren't asking for it - // ignore this resubmission @@ -1094,6 +1246,7 @@ namespace ccf::historical std::move(claims_digest), has_commit_evidence); + update_store_raw_size(seqno, size); return true; } @@ -1246,6 +1399,7 @@ namespace ccf::historical { LOG_DEBUG_FMT( "Dropping expired historical query with handle {}", it->first); + lru_evict(it->first); it = requests.erase(it); } else @@ -1256,6 +1410,8 @@ namespace ccf::historical } } + lru_shrink_to_fit(soft_store_cache_limit_raw); + { auto it = all_stores.begin(); std::optional> range_to_request = @@ -1269,7 +1425,7 @@ namespace ccf::historical } else { - if (details->current_stage == RequestStage::Fetching) + if (details->current_stage == StoreStage::Fetching) { details->time_until_fetch -= elapsed_ms; if (details->time_until_fetch.count() <= 0) @@ -1435,6 +1591,11 @@ namespace ccf::historical StateCacheImpl::set_default_expiry_duration(duration); } + void set_soft_cache_limit(CacheSize cache_limit) override + { + StateCacheImpl::set_soft_cache_limit(cache_limit); + } + void track_deletes_on_missing_keys(bool track) override { StateCacheImpl::track_deletes_on_missing_keys(track); diff --git a/src/node/rpc/test/node_stub.h b/src/node/rpc/test/node_stub.h index 8c62e15ca057..fd539f19e47a 100644 --- a/src/node/rpc/test/node_stub.h +++ b/src/node/rpc/test/node_stub.h @@ -161,6 +161,8 @@ namespace ccf historical::ExpiryDuration seconds_until_expiry) {} + void set_soft_cache_limit(historical::CacheSize cache_limit){}; + void track_deletes_on_missing_keys(bool track) {} ccf::kv::ReadOnlyStorePtr get_store_at( diff --git a/src/node/test/historical_queries.cpp b/src/node/test/historical_queries.cpp index e7fa424d1b64..8ae0c8203d4d 100644 --- a/src/node/test/historical_queries.cpp +++ b/src/node/test/historical_queries.cpp @@ -237,6 +237,19 @@ std::map> construct_host_ledger( return ledger; } +size_t get_cache_limit_for_entries( + const std::vector>& entries) +{ + return ccf::historical::soft_to_raw_ratio * + std::accumulate( + entries.begin(), + entries.end(), + 0ll, + [&](size_t prev, const std::vector& entry) { + return prev + entry.size(); + }); +} + TEST_CASE("StateCache point queries") { auto state = create_and_init_state(); @@ -949,6 +962,134 @@ TEST_CASE("Incremental progress") } } +TEST_CASE("StateCache soft zero limit with increasing") +{ + // Try get two states. Shouldn't be able to retrieve anything with 0 cache + // limit. After increasing to the size of first state only that one is + // available, but later overwritten by the second one, and finally all are + // evicted after setting again to 0. + // + // ! DISCLAIMER ! If you change this bear in mind that each attempt to get the + // store promotes the handle, and so requests eviction order changes, + // therefore this test in particular is quite fragile. + + auto state = create_and_init_state(); + auto& kv_store = *state.kv_store; + + auto seq_low = write_transactions_and_signature(kv_store, 1); + auto seq_high = write_transactions_and_signature(kv_store, 1); + REQUIRE(kv_store.current_version() == seq_high); + + auto ledger = construct_host_ledger(state.kv_store->get_consensus()); + REQUIRE(ledger.size() == seq_high); + + auto stub_writer = std::make_shared(); + ccf::historical::StateCache cache( + kv_store, state.ledger_secrets, stub_writer); + + cache.set_soft_cache_limit(0); + + REQUIRE(!cache.get_state_at(0, seq_low)); + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_low, ledger.at(seq_low)); + cache.tick(std::chrono::milliseconds(100)); + + // Ledger entry fecthed and instantly removed because of the parent request + // eviction due to zero cache limit. + REQUIRE(!cache.get_state_at(0, seq_low)); + + cache.set_soft_cache_limit( + get_cache_limit_for_entries({ledger.at(seq_low), ledger.at(seq_high)}) - 1); + + cache.handle_ledger_entry(seq_low, ledger.at(seq_low)); + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_high)); + + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_high, ledger.at(seq_high)); + + // Both available because tick hasn't been called yet. + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_high)); + + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(!cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_high)); + + cache.set_soft_cache_limit(0); + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(!cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_high)); +} + +TEST_CASE("StateCache dropping state explicitly") +{ + // Adding two states to the cache (limit is hit). Drop state2 and add state3. + // State1 should remain available, as well as state3, state2 was force-evicted + // from LRU because it's been dropped explicitly. + // + // ! DISCLAIMER ! If you change this bear in mind that each attempt to get the + // store promotes the handle, and so requests eviction order changes, + // therefore this test in particular is quite fragile. + + auto state = create_and_init_state(); + auto& kv_store = *state.kv_store; + + auto seq_low = write_transactions_and_signature(kv_store, 1); + auto seq_mid = write_transactions_and_signature(kv_store, 1); + auto seq_high = write_transactions_and_signature(kv_store, 1); + REQUIRE(kv_store.current_version() == seq_high); + + auto ledger = construct_host_ledger(state.kv_store->get_consensus()); + REQUIRE(ledger.size() == seq_high); + + auto stub_writer = std::make_shared(); + ccf::historical::StateCache cache( + kv_store, state.ledger_secrets, stub_writer); + + REQUIRE(!cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_mid)); + + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_low, ledger.at(seq_low)); + cache.handle_ledger_entry(seq_mid, ledger.at(seq_mid)); + + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_mid)); + + cache.set_soft_cache_limit( + get_cache_limit_for_entries( + {ledger.at(seq_low), ledger.at(seq_mid), ledger.at(seq_high)}) - + 1); + + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(!cache.get_state_at(2, seq_high)); + cache.tick(std::chrono::milliseconds(100)); + + // Ledger entries not provided yet, so previous are not evicted. + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(cache.get_state_at(1, seq_mid)); + REQUIRE(!cache.get_state_at(2, seq_high)); + + cache.drop_cached_states(1); + cache.tick(std::chrono::milliseconds(100)); + + cache.handle_ledger_entry(seq_high, ledger.at(seq_high)); + cache.tick(std::chrono::milliseconds(100)); + + REQUIRE(cache.get_state_at(0, seq_low)); + REQUIRE(!cache.get_state_at(1, seq_mid)); + REQUIRE(cache.get_state_at(2, seq_high)); +} + TEST_CASE("StateCache sparse queries") { auto state = create_and_init_state(); From 9586f73222b9d01de2f879a8e7cf57978176dc6c Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Mon, 1 Jul 2024 20:04:16 +0100 Subject: [PATCH 4/5] Streamline CodeQL pipeline (#6320) --- .github/workflows/codeql-analysis.yml | 40 ++++----------------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index e048734cbdcb..53b2a077c54c 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -31,11 +31,7 @@ jobs: strategy: fail-fast: false matrix: - # Override automatic language detection by changing the below list - # Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python'] language: ["cpp"] - # Learn more... - # https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection steps: - name: Checkout repository @@ -43,50 +39,26 @@ jobs: with: fetch-depth: 0 - # Initializes the CodeQL tools for scanning. + # Done before CodeQL init to let it find the commit successfully + - name: Work around git warning + run: git config --global --add safe.directory /__w/CCF/CCF + - name: Initialize CodeQL uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} queries: security-extended - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main - - # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - #- name: Autobuild - # uses: github/codeql-action/autobuild@v2 - - # ℹī¸ Command-line programs to run using the OS shell. - # 📚 https://git.io/JvXDl - - # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project - # uses a compiled language - - #- run: | - # make bootstrap - # make release - - run: | - cd getting_started/setup_vm - sudo apt update - sudo apt install -y ansible software-properties-common bsdmainutils dnsutils - sudo ansible-playbook ccf-dev.yml --extra-vars "platform=virtual" --extra-vars "require_open_enclave=false" - name: Install dependencies - run: | set -ex - git config --global --add safe.directory /__w/CCF/CCF mkdir build cd build - cmake -DCOMPILE_TARGET=virtual -DREQUIRE_OPENENCLAVE=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTS=OFF -DLVI_MITIGATIONS=OFF -DCMAKE_C_COMPILER=`which clang-11` -DCMAKE_CXX_COMPILER=`which clang++-11` .. + cmake -DCOMPILE_TARGET=virtual -DREQUIRE_OPENENCLAVE=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTS=OFF -DLVI_MITIGATIONS=OFF .. name: Run CMake - run: | cd build - make + make -j16 name: Run Make - name: Perform CodeQL Analysis From 329288c2219ac6e5fadbd1c14b6675d09d6e33a5 Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 1 Jul 2024 20:04:51 +0100 Subject: [PATCH 5/5] Fixup historical perf test shutdown (#6307) --- tests/historical_query_perf.py | 6 ++++++ tests/infra/remote.py | 14 +++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/historical_query_perf.py b/tests/historical_query_perf.py index 1f6d133cce27..acd3542c6112 100644 --- a/tests/historical_query_perf.py +++ b/tests/historical_query_perf.py @@ -167,6 +167,12 @@ def test_historical_query_range(network, args): entries = {} node = network.find_node_by_role(role=infra.network.NodeRole.BACKUP, log_capture=[]) + + # During this test enclave may get very busy due to increasing amount of stores fetched in + # memory, so tick messages may stack up, resulting in a delayed 'stop' msg processing, therefore + # leading to a delayed shutdown. + node.remote.remote.shutdown_timeout *= 10 + with node.client(common_headers={"authorization": f"Bearer {jwt}"}) as c: # Index is currently built lazily to avoid impacting other perf tests using the same app # So pre-fetch to ensure index is fully constructed diff --git a/tests/infra/remote.py b/tests/infra/remote.py index d8b320edabab..4a8f9c698136 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -379,6 +379,15 @@ def __init__( self.name = name self.out = os.path.join(self.root, "out") self.err = os.path.join(self.root, "err") + self._shutdown_timeout = 10 + + @property + def shutdown_timeout(self): + return self._shutdown_timeout + + @shutdown_timeout.setter + def shutdown_timeout(self, value): + self._shutdown_timeout = value @staticmethod def make_host(host): @@ -515,12 +524,11 @@ def stop(self): LOG.info("[{}] closing".format(self.hostname)) if self.proc: self.proc.terminate() - timeout = 10 try: - self.proc.wait(timeout) + self.proc.wait(self._shutdown_timeout) except subprocess.TimeoutExpired: LOG.exception( - f"Process didn't finish within {timeout} seconds. Trying to get stack trace..." + f"Process didn't finish within {self._shutdown_timeout} seconds. Trying to get stack trace..." ) self._print_stack_trace() raise