Skip to content

Commit

Permalink
Merge branch 'master' into alpha
Browse files Browse the repository at this point in the history
  • Loading branch information
xiyoo0812 committed Sep 4, 2023
2 parents 74b9448 + 96c52c4 commit 295c096
Show file tree
Hide file tree
Showing 65 changed files with 1,508 additions and 1,769 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ luaext:
cd extend/lcrypt; make SOLUTION_DIR=$(CUR_DIR) -f lcrypt.mak;
cd extend/lcurl; make SOLUTION_DIR=$(CUR_DIR) -f lcurl.mak;
cd extend/ldetour; make SOLUTION_DIR=$(CUR_DIR) -f ldetour.mak;
cd extend/lhttp; make SOLUTION_DIR=$(CUR_DIR) -f lhttp.mak;
cd extend/ljson; make SOLUTION_DIR=$(CUR_DIR) -f ljson.mak;
cd extend/lstdfs; make SOLUTION_DIR=$(CUR_DIR) -f lstdfs.mak;
cd extend/ltimer; make SOLUTION_DIR=$(CUR_DIR) -f ltimer.mak;
Expand Down
98 changes: 49 additions & 49 deletions core/luabus/src/lua_socket_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lua_socket_node::lua_socket_node(uint32_t token, lua_State* L, std::shared_ptr<s
});

m_mgr->set_package_callback(token, [=](slice* data){
on_recv(data);
return on_recv(data);
});
}

Expand All @@ -48,12 +48,6 @@ int lua_socket_node::call_data(lua_State* L) {
return 1;
}

int lua_socket_node::call_text(const char* data, uint32_t data_len) {
if (data_len > SOCKET_PACKET_MAX) return 0;
m_mgr->send(m_token, data, data_len);
return data_len;
}

int lua_socket_node::call_head(uint16_t cmd_id, uint8_t flag, uint8_t type, uint8_t crc8, uint32_t session_id, const char* data, uint32_t data_len) {
size_t length = data_len + sizeof(socket_header);
if (length <= USHRT_MAX) {
Expand Down Expand Up @@ -205,65 +199,66 @@ int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t s
return 0;
}

void lua_socket_node::on_recv(slice* slice) {
int lua_socket_node::on_recv(slice* slice) {
if (eproto_type::proto_head == m_proto_type) {
on_call_head(slice);
return;
}
if (eproto_type::proto_common == m_proto_type) {
on_call_common(slice);
return;
return on_call_head(slice);
}
if (eproto_type::proto_text == m_proto_type) {
on_call_text(slice);
return;
return on_call_text(slice);
}
if (eproto_type::proto_rpc != m_proto_type) {
return on_call_data(slice);
}

size_t data_len;
size_t header_len = sizeof(router_header);
auto hdata = slice->peek(header_len);
router_header* header = (router_header*)hdata;
rpc_type msg = (rpc_type)(header->context >> 4);
if (msg == rpc_type::transfer_call) {
header_len = sizeof(transfer_header);
}

size_t data_len;
slice->erase(header_len);
auto data = (char*)slice->data(&data_len);
if (data_len == 0) {
return;
}

switch (msg) {
case rpc_type::remote_call:
on_call(header, slice);
break;
case rpc_type::transfer_call:
on_transfer((transfer_header*)header, slice);
break;
case rpc_type::forward_target:
if (!m_router->do_forward_target(header, data, data_len))
on_forward_error(header, slice);
case rpc_type::forward_target:{
auto data = (char*)slice->data(&data_len);
if (!m_router->do_forward_target(header, data, data_len)) {
on_forward_error(header, slice);
}
}
break;
case rpc_type::forward_master:
if (!m_router->do_forward_master(header, data, data_len))
on_forward_error(header, slice);
case rpc_type::forward_master: {
auto data = (char*)slice->data(&data_len);
if (!m_router->do_forward_master(header, data, data_len)) {
on_forward_error(header, slice);
}
}
break;
case rpc_type::forward_hash:
if (!m_router->do_forward_hash(header, data, data_len))
on_forward_error(header, slice);
case rpc_type::forward_hash: {
auto data = (char*)slice->data(&data_len);
if (!m_router->do_forward_hash(header, data, data_len)) {
on_forward_error(header, slice);
}
}
break;
case rpc_type::forward_broadcast: {
size_t broadcast_num = 0;
if (m_router->do_forward_broadcast(header, m_token, data, data_len, broadcast_num))
auto data = (char*)slice->data(&data_len);
if (m_router->do_forward_broadcast(header, m_token, data, data_len, broadcast_num)) {
on_forward_broadcast(header, broadcast_num);
else
} else {
on_forward_error(header, slice);
}
}
break;
default:
break;
}
return header->len;
}

void lua_socket_node::on_forward_error(router_header* header, slice* slice) {
Expand All @@ -279,21 +274,14 @@ void lua_socket_node::on_forward_broadcast(router_header* header, size_t broadca
}
}

void lua_socket_node::on_call(router_header* header, slice* slice) {
m_codec->set_slice(slice);
uint8_t flag = header->context & 0xff;
uint32_t session_id = header->session_id;
m_luakit->object_call(this, "on_call", nullptr, m_codec, std::tie(), header->len, session_id, flag);
}

void lua_socket_node::on_transfer(transfer_header* header, slice* slice) {
uint8_t service_id = header->service_id;
uint32_t target_id = header->target_id;
uint32_t session_id = header->session_id;
m_luakit->object_call(this, "on_transfer", nullptr, std::tie(), header->len, session_id, service_id, target_id, slice);
}

void lua_socket_node::on_call_head(slice* slice) {
int lua_socket_node::on_call_head(slice* slice) {
size_t header_len = sizeof(socket_header);
auto data = slice->peek(header_len);
socket_header* header = (socket_header*)data;
Expand All @@ -306,15 +294,27 @@ void lua_socket_node::on_call_head(slice* slice) {
slice->erase(header_len);
std::string body((char*)slice->head(), slice->size());
m_luakit->object_call(this, "on_call_head", nullptr, std::tie(), header->len, cmd_id, flag, type, crc8, session_id, body);
return header->len;
}

void lua_socket_node::on_call_text(slice* slice) {
std::string body((char*)slice->head(), slice->size());
m_luakit->object_call(this, "on_call_text", nullptr, std::tie(), body.size(), body);
void lua_socket_node::on_call(router_header* header, slice* slice) {
m_codec->set_slice(slice);
uint8_t flag = header->context & 0xff;
uint32_t session_id = header->session_id;
m_luakit->object_call(this, "on_call", nullptr, m_codec, std::tie(), header->len, session_id, flag);
}

void lua_socket_node::on_call_common(slice* slice) {
int lua_socket_node::on_call_data(slice* slice) {
m_codec->set_slice(slice);
m_luakit->object_call(this, "on_call_common", nullptr, m_codec, std::tie(), slice->size());
size_t buf_size = slice->size();
m_luakit->object_call(this, "on_call_data", nullptr, m_codec, std::tie(), buf_size);
return buf_size;
}

int lua_socket_node::on_call_text(slice* slice) {
bool success = true;
m_codec->set_slice(slice);
size_t buf_size = slice->size();
m_luakit->object_call(this, "on_call_data", [&](std::string_view) { success = false; }, m_codec, std::tie(), buf_size);
return success ? (buf_size - slice->size()) : -1;
}
9 changes: 4 additions & 5 deletions core/luabus/src/lua_socket_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class lua_socket_node
void set_codec(codec_base* codec) { m_codec = codec; }

int call_data(lua_State* L);
int call_text(const char* data, uint32_t data_len);
int call(lua_State* L, uint32_t session_id, uint8_t flag);
int call_head(uint16_t cmd_id, uint8_t flag, uint8_t type, uint8_t crc8, uint32_t session_id, const char* data, uint32_t data_len);
int forward_target(lua_State* L, uint32_t session_id, uint8_t flag, uint32_t target_id);
Expand Down Expand Up @@ -60,10 +59,10 @@ class lua_socket_node
uint16_t m_sindex = 1;

private:
void on_recv(slice* slice);
void on_call_head(slice* slice);
void on_call_text(slice* slice);
void on_call_common(slice* slice);
int on_recv(slice* slice);
int on_call_head(slice* slice);
int on_call_text(slice* slice);
int on_call_data(slice* slice);
void on_call(router_header* header, slice* slice);
void on_transfer(transfer_header* header, slice* slice);
void on_forward_broadcast(router_header* header, size_t target_size);
Expand Down
4 changes: 2 additions & 2 deletions core/luabus/src/luabus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ namespace luabus {
"rpc", eproto_type::proto_rpc,
"head", eproto_type::proto_head,
"text", eproto_type::proto_text,
"common", eproto_type::proto_common
"mongo", eproto_type::proto_mongo,
"mysql", eproto_type::proto_mysql
);
kit_state.new_class<socket_udp>(
"send", &socket_udp::send,
Expand Down Expand Up @@ -78,7 +79,6 @@ namespace luabus {
"set_codec", &lua_socket_node::set_codec,
"call_head", &lua_socket_node::call_head,
"call_data", &lua_socket_node::call_data,
"call_text", &lua_socket_node::call_text,
"set_nodelay", &lua_socket_node::set_nodelay,
"set_timeout", &lua_socket_node::set_timeout,
"forward_hash", &lua_socket_node::forward_hash,
Expand Down
2 changes: 1 addition & 1 deletion core/luabus/src/socket_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ void socket_mgr::set_connect_callback(uint32_t token, const std::function<void(b
}
}

void socket_mgr::set_package_callback(uint32_t token, const std::function<void(slice*)>& cb) {
void socket_mgr::set_package_callback(uint32_t token, const std::function<int(slice*)>& cb) {
auto node = get_object(token);
if (node) {
node->set_package_callback(cb);
Expand Down
15 changes: 8 additions & 7 deletions core/luabus/src/socket_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ enum class elink_status : int
// 协议类型
enum class eproto_type : int
{
proto_rpc = 0, // rpc协议
proto_head = 1, // head协议,根据协议头解析
proto_text = 2, // text协议,根据文本协议
proto_common = 3, // 通用协议,协议前4个字节为长度
proto_max = 4, // max
proto_rpc = 0, // rpc协议,根据协议头解析
proto_head = 1, // head协议,根据协议头解析
proto_text = 2, // text协议,文本协议
proto_mongo = 3, // mongo协议,协议前4个字节为长度
proto_mysql = 4, // mysql协议,协议前3个字节为长度
proto_max = 5, // max
};

struct sendv_item
Expand All @@ -51,7 +52,7 @@ struct socket_object
virtual void set_accept_callback(const std::function<void(int, eproto_type)>& cb) { }
virtual void set_connect_callback(const std::function<void(bool, const char*)>& cb) { }
virtual void set_error_callback(const std::function<void(const char*)>& cb) { }
virtual void set_package_callback(const std::function<void(slice*)>& cb) { }
virtual void set_package_callback(const std::function<int(slice*)>& cb) { }

#ifdef _MSC_VER
virtual void on_complete(WSAOVERLAPPED* ovl) = 0;
Expand Down Expand Up @@ -94,7 +95,7 @@ class socket_mgr

void set_accept_callback(uint32_t token, const std::function<void(uint32_t, eproto_type eproto_type)>& cb);
void set_connect_callback(uint32_t token, const std::function<void(bool, const char*)>& cb);
void set_package_callback(uint32_t token, const std::function<void(slice*)>& cb);
void set_package_callback(uint32_t token, const std::function<int(slice*)>& cb);
void set_error_callback(uint32_t token, const std::function<void(const char*)>& cb);

bool watch_listen(socket_t fd, socket_object* object);
Expand Down
27 changes: 22 additions & 5 deletions core/luabus/src/socket_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ void socket_stream::do_send(size_t max_len, bool is_eof) {
return;
}
total_send += send_len;
m_send_buffer->pop_space((size_t)send_len);
m_send_buffer->pop_size((size_t)send_len);
}
if (is_eof || max_len == 0) {
on_error("connection-lost");
Expand Down Expand Up @@ -496,14 +496,22 @@ void socket_stream::dispatch_package() {
}
package_size = header->len;
}
else if (eproto_type::proto_common == m_proto_type) {
else if (eproto_type::proto_mongo == m_proto_type) {
uint32_t* length = (uint32_t*)m_recv_buffer->peek_data(sizeof(uint32_t));
if (!length) {
break;
}
//头长度只包含内容,不包括长度
//package_size = length + contents
package_size = *length;
}
else if (eproto_type::proto_mysql == m_proto_type) {
uint32_t* length = (uint32_t*)m_recv_buffer->peek_data(sizeof(uint32_t));
if (!length) {
break;
}
//package_size = length + serialize_id + contents
package_size = ((*length) >> 8) + sizeof(uint32_t);
}
else if (eproto_type::proto_text == m_proto_type) {
package_size = m_recv_buffer->size();
if (data_len == 0) break;
Expand All @@ -518,9 +526,18 @@ void socket_stream::dispatch_package() {
}
// 数据包还没有收完整
if (data_len < package_size) break;
m_package_cb(m_recv_buffer->get_slice(package_size));
int read_size = m_package_cb(m_recv_buffer->get_slice(package_size));
// 数据包还没有收完整
if (read_size == 0) {
break;
}
// 数据包解析失败
if (read_size < 0) {
on_error("package-read-err");
break;
}
// 接收缓冲读游标调整
m_recv_buffer->pop_size(package_size);
m_recv_buffer->pop_size(read_size);
m_last_recv_time = ltimer::steady_ms();
// 防止单个连接处理太久,不能大于100ms
if (m_last_recv_time - now > 100) break;
Expand Down
4 changes: 2 additions & 2 deletions core/luabus/src/socket_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct socket_stream : public socket_object
void try_connect();
void close() override;
void set_accept_callback(const std::function<void(int, eproto_type)>& cb) override { m_accept_cb = cb; }
void set_package_callback(const std::function<void(slice*)>& cb) override { m_package_cb = cb; }
void set_package_callback(const std::function<int(slice*)>& cb) override { m_package_cb = cb; }
void set_error_callback(const std::function<void(const char*)>& cb) override { m_error_cb = cb; }
void set_connect_callback(const std::function<void(bool, const char*)>& cb) override { m_connect_cb = cb; }
void set_timeout(int duration) override { m_timeout = duration; }
Expand Down Expand Up @@ -72,7 +72,7 @@ struct socket_stream : public socket_object
int m_ovl_ref = 0;
#endif

std::function<void(slice*)> m_package_cb = nullptr;
std::function<int(slice*)> m_package_cb = nullptr;
std::function<void(const char*)> m_error_cb = nullptr;
std::function<void(int, eproto_type)> m_accept_cb = nullptr;
std::function<void(bool, const char*)> m_connect_cb = nullptr;
Expand Down
Loading

0 comments on commit 295c096

Please sign in to comment.