Skip to content

Commit

Permalink
opentrace调研
Browse files Browse the repository at this point in the history
  • Loading branch information
xiyoo0812 committed Sep 26, 2023
1 parent b421870 commit d9df59a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 41 deletions.
22 changes: 13 additions & 9 deletions core/luabus/src/lua_socket_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ int lua_socket_node::call(lua_State* L, uint32_t session_id, uint8_t flag) {
//组装数据
router_header header;
header.len = length;
header.target_id = 0;
header.index_id = 0;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::remote_call << 4 | flag;
//发送数据
Expand Down Expand Up @@ -127,8 +127,9 @@ int lua_socket_node::forward_target(lua_State* L, uint32_t session_id, uint8_t f
//组装数据
router_header header;
header.len = length;
header.target_id = target_id;
header.session_id = session_id;
header.index_id = target_id & 0xffff;
header.service_id = (target_id >> 16) & 0xff;
header.context = (uint8_t)rpc_type::forward_target << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
Expand All @@ -141,7 +142,7 @@ int lua_socket_node::forward_target(lua_State* L, uint32_t session_id, uint8_t f
return 1;
}

int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id, uint16_t hash) {
int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint8_t service_id, uint16_t hash) {
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 5, &data_len);
Expand All @@ -150,8 +151,9 @@ int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t fla
//组装数据
router_header header;
header.len = length;
header.index_id = hash;
header.session_id = session_id;
header.target_id = service_id << 16 | hash;
header.service_id = service_id;
header.context = (uint8_t)rpc_type::forward_hash << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
Expand Down Expand Up @@ -180,8 +182,9 @@ int lua_socket_node::transfer_call(lua_State* L, uint32_t session_id, uint32_t t
router_header header;
header.len = length;
header.session_id = session_id;
header.index_id = target_id & 0xffff;
header.service_id = (target_id >> 16) & 0xff;
header.context = (uint8_t)rpc_type::remote_call << 4 | 0x01;
header.target_id = target_id;
if (m_router->do_forward_target(&header, data, data_len)) {
lua_pushinteger(L, length);
return 1;
Expand All @@ -191,7 +194,7 @@ int lua_socket_node::transfer_call(lua_State* L, uint32_t session_id, uint32_t t
return 0;
}

int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t service_id, uint16_t hash) {
int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint8_t service_id, uint16_t hash) {
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
Expand All @@ -200,9 +203,10 @@ int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t s
//组装数据
router_header header;
header.len = length;
header.index_id = hash;
header.session_id = session_id;
header.service_id = service_id;
header.context = (uint8_t)rpc_type::remote_call << 4 | 0x01;
header.target_id = service_id << 16 | hash;
if (m_router->do_forward_hash(&header, data, data_len)) {
lua_pushinteger(L, length);
return 1;
Expand Down Expand Up @@ -273,13 +277,13 @@ void lua_socket_node::on_recv(slice* slice) {
void lua_socket_node::on_forward_error(router_header* header, slice* slice) {
if (header->session_id > 0) {
m_codec->set_slice(slice);
m_lvm->object_call(this, "on_forward_error", nullptr, m_codec, std::tie(), header->session_id, header->target_id);
m_lvm->object_call(this, "on_forward_error", nullptr, m_codec, std::tie(), header->trace_id, header->session_id, header->service_id, header->index_id);
}
}

void lua_socket_node::on_forward_broadcast(router_header* header, size_t broadcast_num) {
if (header->session_id > 0) {
m_lvm->object_call(this, "on_forward_broadcast", nullptr, std::tie(), header->session_id, broadcast_num);
m_lvm->object_call(this, "on_forward_broadcast", nullptr, std::tie(), header->trace_id, header->session_id, broadcast_num);
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/luabus/src/lua_socket_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,22 @@ class lua_socket_node
int call(lua_State* L, uint32_t session_id, uint8_t flag);

int forward_target(lua_State* L, uint32_t session_id, uint8_t flag, uint32_t target_id);
int forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id, uint16_t hash);
int forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint8_t service_id, uint16_t hash);

int forward_transfer(lua_State* L, uint32_t session_id, uint32_t target_id, uint8_t service_id);

int transfer_call(lua_State* L, uint32_t session_id, uint32_t target_id);
int transfer_hash(lua_State* L, uint32_t session_id, uint16_t service_id, uint16_t hash);
int transfer_hash(lua_State* L, uint32_t session_id, uint8_t service_id, uint16_t hash);

template <rpc_type forward_method>
int forward_by_group(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id) {
int forward_by_group(lua_State* L, uint32_t session_id, uint8_t flag, uint8_t service_id) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= USHRT_MAX) {
router_header header;
header.len = length;
header.target_id = service_id;
header.service_id = service_id;
header.session_id = session_id;
header.context = (uint8_t)forward_method << 4 | flag;
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
Expand Down
33 changes: 8 additions & 25 deletions core/luabus/src/socket_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ uint32_t socket_router::choose_master(uint32_t service_id){
}

bool socket_router::do_forward_target(router_header* header, char* data, size_t data_len) {
uint32_t target_id = header->target_id;
uint32_t service_id = get_service_id(target_id);
uint16_t index_id = header->index_id;
uint16_t service_id = header->service_id;

auto& services = m_services[service_id];
auto& nodes = services.nodes;
auto it = std::lower_bound(nodes.begin(), nodes.end(), target_id, [](service_node& node, uint32_t id) { return node.id < id; });
if (it == nodes.end() || it->id != target_id){
auto it = std::lower_bound(nodes.begin(), nodes.end(), index_id, [](service_node& node, uint32_t id) { return node.id < id; });
if (it == nodes.end() || it->id != index_id){
return false;
}
uint8_t flag = header->context & 0xf;
Expand All @@ -67,7 +67,7 @@ bool socket_router::do_forward_target(router_header* header, char* data, size_t
}

bool socket_router::do_forward_master(router_header* header, char* data, size_t data_len) {
uint16_t service_id = (uint16_t)header->target_id;
uint16_t service_id = header->service_id;
auto token = m_services[service_id].master.token;
if (token == 0)
return false;
Expand All @@ -81,7 +81,7 @@ bool socket_router::do_forward_master(router_header* header, char* data, size_t
}

bool socket_router::do_forward_broadcast(router_header* header, int source, char* data, size_t data_len, size_t& broadcast_num) {
uint16_t service_id = (uint16_t)header->target_id;
uint16_t service_id = header->service_id;

uint8_t flag = header->context & 0xf;
header->context = (uint8_t)rpc_type::remote_call << 4 | flag;
Expand All @@ -101,8 +101,8 @@ bool socket_router::do_forward_broadcast(router_header* header, int source, char
}

bool socket_router::do_forward_hash(router_header* header, char* data, size_t data_len) {
uint16_t hash = header->target_id & 0xffff;
uint16_t service_id = header->target_id >> 16 & 0xffff;
uint16_t hash = header->index_id;
uint16_t service_id = header->service_id;

auto& services = m_services[service_id];
auto& nodes = services.nodes;
Expand All @@ -123,23 +123,6 @@ bool socket_router::do_forward_hash(router_header* header, char* data, size_t da
return false;
}

bool socket_router::do_transfer_call(transfer_header* header, char* data, size_t data_len) {
auto& services = m_services[header->service_id];
auto& nodes = services.nodes;
int count = (int)nodes.size();
if (count == 0) {
return false;
}
auto& target = nodes[header->target_id % count];
if (target.token != 0) {
sendv_item items[] = { {header, sizeof(router_header)}, {data, data_len} };
m_mgr->sendv(target.token, items, _countof(items));
m_route_count++;
return true;
}
return false;
}

uint32_t socket_router::get_route_count() {
uint32_t old = m_route_count;
m_route_count = 0;
Expand Down
6 changes: 3 additions & 3 deletions server/router/router_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ end
--accept事件
function RouterServer:on_client_accept(client)
log_info("[RouterServer][on_client_accept] new connection, token={}", client.token)
client.on_forward_error = function(session_id, target_id, source_id, rpc)
client.on_forward_error = function(trace_id, session_id, service_id, index_id, source_id, rpc)
thread_mgr:fork(function()
local target, source = id2name(target_id), id2name(source_id)
local target, source = id2name(service_id << 16 | index_id), id2name(source_id)
log_err("[RouterServer][on_client_accept] on_forward_error, ssid:{}, tar:{}, src:{}, rpc:{})", session_id, target, source, rpc)
self.rpc_server:callback(client, session_id, false, UNREACHABLE, "router con't find target!")
end)
end
client.on_forward_broadcast = function(session_id, broadcast_num)
client.on_forward_broadcast = function(trace_id, session_id, broadcast_num)
thread_mgr:fork(function()
self.rpc_server:callback(client, session_id, true, SUCCESS, broadcast_num)
end)
Expand Down

0 comments on commit d9df59a

Please sign in to comment.