From d9df59a09d92b4b02440eff692f01a9278f0b793 Mon Sep 17 00:00:00 2001 From: xiyoo0812 Date: Tue, 26 Sep 2023 17:30:34 +0800 Subject: [PATCH] =?UTF-8?q?opentrace=E8=B0=83=E7=A0=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/luabus/src/lua_socket_node.cpp | 22 +++++++++++-------- core/luabus/src/lua_socket_node.h | 8 +++---- core/luabus/src/socket_router.cpp | 33 +++++++---------------------- server/router/router_server.lua | 6 +++--- 4 files changed, 28 insertions(+), 41 deletions(-) diff --git a/core/luabus/src/lua_socket_node.cpp b/core/luabus/src/lua_socket_node.cpp index d18f2cc6..a0267953 100644 --- a/core/luabus/src/lua_socket_node.cpp +++ b/core/luabus/src/lua_socket_node.cpp @@ -80,7 +80,7 @@ int lua_socket_node::call(lua_State* L, uint32_t session_id, uint8_t flag) { //组装数据 router_header header; header.len = length; - header.target_id = 0; + header.index_id = 0; header.session_id = session_id; header.context = (uint8_t)rpc_type::remote_call << 4 | flag; //发送数据 @@ -127,8 +127,9 @@ int lua_socket_node::forward_target(lua_State* L, uint32_t session_id, uint8_t f //组装数据 router_header header; header.len = length; - header.target_id = target_id; header.session_id = session_id; + header.index_id = target_id & 0xffff; + header.service_id = (target_id >> 16) & 0xff; header.context = (uint8_t)rpc_type::forward_target << 4 | flag; //发送数据 sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} }; @@ -141,7 +142,7 @@ int lua_socket_node::forward_target(lua_State* L, uint32_t session_id, uint8_t f return 1; } -int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id, uint16_t hash) { +int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint8_t service_id, uint16_t hash) { if (m_codec) { size_t data_len = 0; char* data = (char*)m_codec->encode(L, 5, &data_len); @@ -150,8 +151,9 @@ int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t fla //组装数据 router_header header; header.len = length; + header.index_id = hash; header.session_id = session_id; - header.target_id = service_id << 16 | hash; + header.service_id = service_id; header.context = (uint8_t)rpc_type::forward_hash << 4 | flag; //发送数据 sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} }; @@ -180,8 +182,9 @@ int lua_socket_node::transfer_call(lua_State* L, uint32_t session_id, uint32_t t router_header header; header.len = length; header.session_id = session_id; + header.index_id = target_id & 0xffff; + header.service_id = (target_id >> 16) & 0xff; header.context = (uint8_t)rpc_type::remote_call << 4 | 0x01; - header.target_id = target_id; if (m_router->do_forward_target(&header, data, data_len)) { lua_pushinteger(L, length); return 1; @@ -191,7 +194,7 @@ int lua_socket_node::transfer_call(lua_State* L, uint32_t session_id, uint32_t t return 0; } -int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t service_id, uint16_t hash) { +int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint8_t service_id, uint16_t hash) { if (m_codec) { size_t data_len = 0; char* data = (char*)m_codec->encode(L, 4, &data_len); @@ -200,9 +203,10 @@ int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t s //组装数据 router_header header; header.len = length; + header.index_id = hash; header.session_id = session_id; + header.service_id = service_id; header.context = (uint8_t)rpc_type::remote_call << 4 | 0x01; - header.target_id = service_id << 16 | hash; if (m_router->do_forward_hash(&header, data, data_len)) { lua_pushinteger(L, length); return 1; @@ -273,13 +277,13 @@ void lua_socket_node::on_recv(slice* slice) { void lua_socket_node::on_forward_error(router_header* header, slice* slice) { if (header->session_id > 0) { m_codec->set_slice(slice); - m_lvm->object_call(this, "on_forward_error", nullptr, m_codec, std::tie(), header->session_id, header->target_id); + m_lvm->object_call(this, "on_forward_error", nullptr, m_codec, std::tie(), header->trace_id, header->session_id, header->service_id, header->index_id); } } void lua_socket_node::on_forward_broadcast(router_header* header, size_t broadcast_num) { if (header->session_id > 0) { - m_lvm->object_call(this, "on_forward_broadcast", nullptr, std::tie(), header->session_id, broadcast_num); + m_lvm->object_call(this, "on_forward_broadcast", nullptr, std::tie(), header->trace_id, header->session_id, broadcast_num); } } diff --git a/core/luabus/src/lua_socket_node.h b/core/luabus/src/lua_socket_node.h index d717ca17..468127ae 100644 --- a/core/luabus/src/lua_socket_node.h +++ b/core/luabus/src/lua_socket_node.h @@ -39,22 +39,22 @@ class lua_socket_node int call(lua_State* L, uint32_t session_id, uint8_t flag); int forward_target(lua_State* L, uint32_t session_id, uint8_t flag, uint32_t target_id); - int forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id, uint16_t hash); + int forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint8_t service_id, uint16_t hash); int forward_transfer(lua_State* L, uint32_t session_id, uint32_t target_id, uint8_t service_id); int transfer_call(lua_State* L, uint32_t session_id, uint32_t target_id); - int transfer_hash(lua_State* L, uint32_t session_id, uint16_t service_id, uint16_t hash); + int transfer_hash(lua_State* L, uint32_t session_id, uint8_t service_id, uint16_t hash); template - int forward_by_group(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id) { + int forward_by_group(lua_State* L, uint32_t session_id, uint8_t flag, uint8_t service_id) { size_t data_len = 0; char* data = (char*)m_codec->encode(L, 4, &data_len); size_t length = data_len + sizeof(router_header); if (length <= USHRT_MAX) { router_header header; header.len = length; - header.target_id = service_id; + header.service_id = service_id; header.session_id = session_id; header.context = (uint8_t)forward_method << 4 | flag; sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} }; diff --git a/core/luabus/src/socket_router.cpp b/core/luabus/src/socket_router.cpp index 60a58fc4..f8f9b8a2 100644 --- a/core/luabus/src/socket_router.cpp +++ b/core/luabus/src/socket_router.cpp @@ -49,13 +49,13 @@ uint32_t socket_router::choose_master(uint32_t service_id){ } bool socket_router::do_forward_target(router_header* header, char* data, size_t data_len) { - uint32_t target_id = header->target_id; - uint32_t service_id = get_service_id(target_id); + uint16_t index_id = header->index_id; + uint16_t service_id = header->service_id; auto& services = m_services[service_id]; auto& nodes = services.nodes; - auto it = std::lower_bound(nodes.begin(), nodes.end(), target_id, [](service_node& node, uint32_t id) { return node.id < id; }); - if (it == nodes.end() || it->id != target_id){ + auto it = std::lower_bound(nodes.begin(), nodes.end(), index_id, [](service_node& node, uint32_t id) { return node.id < id; }); + if (it == nodes.end() || it->id != index_id){ return false; } uint8_t flag = header->context & 0xf; @@ -67,7 +67,7 @@ bool socket_router::do_forward_target(router_header* header, char* data, size_t } bool socket_router::do_forward_master(router_header* header, char* data, size_t data_len) { - uint16_t service_id = (uint16_t)header->target_id; + uint16_t service_id = header->service_id; auto token = m_services[service_id].master.token; if (token == 0) return false; @@ -81,7 +81,7 @@ bool socket_router::do_forward_master(router_header* header, char* data, size_t } bool socket_router::do_forward_broadcast(router_header* header, int source, char* data, size_t data_len, size_t& broadcast_num) { - uint16_t service_id = (uint16_t)header->target_id; + uint16_t service_id = header->service_id; uint8_t flag = header->context & 0xf; header->context = (uint8_t)rpc_type::remote_call << 4 | flag; @@ -101,8 +101,8 @@ bool socket_router::do_forward_broadcast(router_header* header, int source, char } bool socket_router::do_forward_hash(router_header* header, char* data, size_t data_len) { - uint16_t hash = header->target_id & 0xffff; - uint16_t service_id = header->target_id >> 16 & 0xffff; + uint16_t hash = header->index_id; + uint16_t service_id = header->service_id; auto& services = m_services[service_id]; auto& nodes = services.nodes; @@ -123,23 +123,6 @@ bool socket_router::do_forward_hash(router_header* header, char* data, size_t da return false; } -bool socket_router::do_transfer_call(transfer_header* header, char* data, size_t data_len) { - auto& services = m_services[header->service_id]; - auto& nodes = services.nodes; - int count = (int)nodes.size(); - if (count == 0) { - return false; - } - auto& target = nodes[header->target_id % count]; - if (target.token != 0) { - sendv_item items[] = { {header, sizeof(router_header)}, {data, data_len} }; - m_mgr->sendv(target.token, items, _countof(items)); - m_route_count++; - return true; - } - return false; -} - uint32_t socket_router::get_route_count() { uint32_t old = m_route_count; m_route_count = 0; diff --git a/server/router/router_server.lua b/server/router/router_server.lua index 0065fcd4..d2fc7a83 100644 --- a/server/router/router_server.lua +++ b/server/router/router_server.lua @@ -45,14 +45,14 @@ end --accept事件 function RouterServer:on_client_accept(client) log_info("[RouterServer][on_client_accept] new connection, token={}", client.token) - client.on_forward_error = function(session_id, target_id, source_id, rpc) + client.on_forward_error = function(trace_id, session_id, service_id, index_id, source_id, rpc) thread_mgr:fork(function() - local target, source = id2name(target_id), id2name(source_id) + local target, source = id2name(service_id << 16 | index_id), id2name(source_id) log_err("[RouterServer][on_client_accept] on_forward_error, ssid:{}, tar:{}, src:{}, rpc:{})", session_id, target, source, rpc) self.rpc_server:callback(client, session_id, false, UNREACHABLE, "router con't find target!") end) end - client.on_forward_broadcast = function(session_id, broadcast_num) + client.on_forward_broadcast = function(trace_id, session_id, broadcast_num) thread_mgr:fork(function() self.rpc_server:callback(client, session_id, true, SUCCESS, broadcast_num) end)