Skip to content

Commit

Permalink
代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
xiyoo0812 committed Sep 15, 2023
1 parent d620034 commit b90b62e
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 102 deletions.
200 changes: 109 additions & 91 deletions core/luabus/src/lua_socket_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,115 +34,131 @@ void lua_socket_node::close() {
m_token = 0;
}
m_router = nullptr;
m_codec = nullptr;
m_mgr = nullptr;
}

int lua_socket_node::call_data(lua_State* L) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 1, &data_len);
if (data_len > SOCKET_PACKET_MAX) return 0;
m_mgr->send(m_token, data, data_len);
lua_pushinteger(L, data_len);
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 1, &data_len);
if (data_len <= SOCKET_PACKET_MAX){
m_mgr->send(m_token, data, data_len);
lua_pushinteger(L, data_len);
return 1;
}
}
lua_pushinteger(L, 0);
return 1;
}

int lua_socket_node::call_pb(lua_State* L, uint32_t session_id) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 2, &data_len);
socket_header* header = (socket_header*)data;
if (data_len <= USHRT_MAX) {
//组装数据
header->len = data_len;
header->session_id = (session_id & 0xffff);
//发送数据
m_mgr->send(m_token, data, data_len);
lua_pushinteger(L, data_len);
return 1;
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 2, &data_len);
socket_header* header = (socket_header*)data;
if (data_len <= USHRT_MAX) {
//组装数据
header->len = data_len;
header->session_id = (session_id & 0xffff);
//发送数据
m_mgr->send(m_token, data, data_len);
lua_pushinteger(L, data_len);
return 1;
}
}
lua_pushinteger(L, 0);
return 1;
}

int lua_socket_node::call(lua_State* L, uint32_t session_id, uint8_t flag) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 3, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.target_id = 0;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::remote_call << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len}};
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 3, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.target_id = 0;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::remote_call << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len}};
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
}
}
lua_pushinteger(L, 0);
return 1;
}

int lua_socket_node::forward_transfer(lua_State* L, uint32_t session_id, uint32_t target_id, uint8_t service_id) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(transfer_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
transfer_header header;
header.len = length;
header.target_id = target_id;
header.service_id = service_id;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::transfer_call << 4;
//发送数据
sendv_item items[] = { { &header, sizeof(transfer_header)}, {data, data_len}};
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
}
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(transfer_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
transfer_header header;
header.len = length;
header.target_id = target_id;
header.service_id = service_id;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::transfer_call << 4;
//发送数据
sendv_item items[] = { { &header, sizeof(transfer_header)}, {data, data_len}};
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
}
}
lua_pushinteger(L, 0);
return 1;
}

int lua_socket_node::forward_target(lua_State* L, uint32_t session_id, uint8_t flag, uint32_t target_id) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.target_id = target_id;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::forward_target << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.target_id = target_id;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::forward_target << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
}
}
lua_pushinteger(L, 0);
return 1;
}

int lua_socket_node::forward_hash(lua_State* L, uint32_t session_id, uint8_t flag, uint16_t service_id, uint16_t hash) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 5, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.session_id = session_id;
header.target_id = service_id << 16 | hash;
header.context = (uint8_t)rpc_type::forward_hash << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 5, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.session_id = session_id;
header.target_id = service_id << 16 | hash;
header.context = (uint8_t)rpc_type::forward_hash << 4 | flag;
//发送数据
sendv_item items[] = { { &header, sizeof(router_header)}, {data, data_len} };
m_mgr->sendv(m_token, items, _countof(items));
lua_pushinteger(L, length);
return 1;
}
}
lua_pushinteger(L, 0);
return 1;
Expand Down Expand Up @@ -176,19 +192,21 @@ int lua_socket_node::transfer_call(lua_State* L, uint32_t session_id, uint32_t t
}

int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t service_id, uint16_t hash) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::remote_call << 4 | 0x01;
header.target_id = service_id << 16 | hash;
if (m_router->do_forward_hash(&header, data, data_len)) {
lua_pushinteger(L, length);
return 1;
if (m_codec) {
size_t data_len = 0;
char* data = (char*)m_codec->encode(L, 4, &data_len);
size_t length = data_len + sizeof(router_header);
if (length <= SOCKET_PACKET_MAX) {
//组装数据
router_header header;
header.len = length;
header.session_id = session_id;
header.context = (uint8_t)rpc_type::remote_call << 4 | 0x01;
header.target_id = service_id << 16 | hash;
if (m_router->do_forward_hash(&header, data, data_len)) {
lua_pushinteger(L, length);
return 1;
}
}
}
lua_pushinteger(L, 0);
Expand Down
6 changes: 3 additions & 3 deletions extend/lbson/src/lbson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace lbson {

thread_local bson thread_bson;
thread_local mgocodec thread_codec;

static int encode(lua_State* L) {
return thread_bson.encode(L);
Expand Down Expand Up @@ -38,11 +37,12 @@ namespace lbson {
bson_numstr_len[i] = sprintf(tmp, "%d", i);
memcpy(bson_numstrs[i], tmp, bson_numstr_len[i]);
}
thread_codec.set_bson(&thread_bson);
}

static codec_base* mongo_codec() {
return &thread_codec;
mgocodec* codec = new mgocodec();
codec->set_bson(&thread_bson);
return codec;
}

luakit::lua_table open_lbson(lua_State* L) {
Expand Down
8 changes: 4 additions & 4 deletions extend/lcodec/src/lcodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@ namespace lcodec {
thread_local ketama thread_ketama;
thread_local luakit::luabuf thread_buff;

static rdscodec* rds_codec(codec_base* codec) {
static codec_base* rds_codec(codec_base* codec) {
rdscodec* rcodec = new rdscodec();
rcodec->set_codec(codec);
rcodec->set_buff(&thread_buff);
return rcodec;
}

static wsscodec* wss_codec(codec_base* codec) {
static codec_base* wss_codec(codec_base* codec) {
wsscodec* wcodec = new wsscodec();
wcodec->set_codec(codec);
wcodec->set_buff(&thread_buff);
return wcodec;
}

static httpcodec* http_codec(codec_base* codec) {
static codec_base* http_codec(codec_base* codec) {
httpcodec* hcodec = new httpcodec();
hcodec->set_codec(codec);
hcodec->set_buff(&thread_buff);
return hcodec;
}

static mysqlscodec* mysql_codec(size_t session_id) {
static codec_base* mysql_codec(size_t session_id) {
mysqlscodec* codec = new mysqlscodec(session_id);
codec->set_buff(&thread_buff);
return codec;
Expand Down
2 changes: 1 addition & 1 deletion extend/ljson/src/ljson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace ljson {
thread_local yyjson thread_json;

static jsoncodec* json_codec() {
static codec_base* json_codec() {
jsoncodec* codec = new jsoncodec();
codec->set_json(&thread_json);
return codec;
Expand Down
2 changes: 1 addition & 1 deletion extend/luapb/src/luapb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ namespace luapb {
string m_pbenum;
};

static pbcodec* pb_codec(const char* pkgname, const char* pbenum) {
static codec_base* pb_codec(const char* pkgname, const char* pbenum) {
pbcodec* codec = new pbcodec(pkgname, pbenum);
codec->set_buff(&thread_buff);
return codec;
Expand Down
4 changes: 4 additions & 0 deletions script/driver/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ function Socket:set_codec(codec)
self.codec = codec
self.session.set_codec(codec)
end
if self.listener then
self.codec = codec
self.listener.set_codec(codec)
end
end

function Socket:connect(ip, port, ptype)
Expand Down
6 changes: 4 additions & 2 deletions script/network/http_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ local log_debug = logger.debug
local tunpack = table.unpack
local signalquit = signal.quit
local saddr = qstring.addr
local jsoncodec = json.jsoncodec
local httpcodec = codec.httpcodec

local HttpServer = class()
local prop = property(HttpServer)
Expand All @@ -25,8 +27,8 @@ prop:reader("del_handlers", {}) --del_handlers
prop:reader("post_handlers", {}) --post_handlers

function HttpServer:__init(http_addr)
self.jcodec = json.jsoncodec()
self.hcodec = codec.httpcodec(self.jcodec)
self.jcodec = jsoncodec()
self.hcodec = httpcodec(self.jcodec)
self:setup(http_addr)
end

Expand Down

0 comments on commit b90b62e

Please sign in to comment.