Skip to content

Commit

Permalink
合并主干代码
Browse files Browse the repository at this point in the history
  • Loading branch information
xiyoo0812 committed Sep 14, 2023
2 parents 295c096 + 5f066c4 commit 749263b
Show file tree
Hide file tree
Showing 33 changed files with 2,224 additions and 451 deletions.
1,784 changes: 1,784 additions & 0 deletions bin/proto/pbschema.json

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions core/luabus/src/lua_socket_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ int lua_socket_mgr::listen(lua_State* L, const char* ip, int port) {
}

std::string err;
eproto_type proto_type = (eproto_type)luaL_optinteger(L, 3, 0);
int token = m_mgr->listen(err, ip, port, proto_type);
int token = m_mgr->listen(err, ip, port);
if (token == 0) {
return luakit::variadic_return(L, nullptr, err);
}

auto listener = new lua_socket_node(token, m_lvm, m_mgr, m_router, true, proto_type);
auto listener = new lua_socket_node(token, m_lvm, m_mgr, m_router, true);
listener->set_codec(m_codec.get());
return luakit::variadic_return(L, listener, "ok");
}
Expand All @@ -33,13 +32,12 @@ int lua_socket_mgr::connect(lua_State* L, const char* ip, const char* port, int
}

std::string err;
eproto_type proto_type = (eproto_type)luaL_optinteger(L, 4, 0);
int token = m_mgr->connect(err, ip, port, timeout, proto_type);
int token = m_mgr->connect(err, ip, port, timeout);
if (token == 0) {
return luakit::variadic_return(L, nullptr, err);
}

auto socket_node = new lua_socket_node(token, m_lvm, m_mgr, m_router, false, proto_type);
auto socket_node = new lua_socket_node(token, m_lvm, m_mgr, m_router, false);
socket_node->set_codec(m_codec.get());
return luakit::variadic_return(L, socket_node, "ok");
}
Expand All @@ -52,7 +50,10 @@ int lua_socket_mgr::get_recvbuf_size(uint32_t token) {
return m_mgr->get_recvbuf_size(token);
}

void lua_socket_mgr::set_proto_type(uint32_t token, eproto_type type) {
return m_mgr->set_proto_type(token, type);
}

int lua_socket_mgr::map_token(uint32_t node_id, uint32_t token) {
return m_router->map_token(node_id, token);
}

1 change: 1 addition & 0 deletions core/luabus/src/lua_socket_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct lua_socket_mgr final
int map_token(uint32_t node_id, uint32_t token);
int listen(lua_State* L, const char* ip, int port);
int connect(lua_State* L, const char* ip, const char* port, int timeout);
void set_proto_type(uint32_t token, eproto_type type);

std::shared_ptr<socket_router> get_router() { return m_router; }

Expand Down
20 changes: 10 additions & 10 deletions core/luabus/src/lua_socket_node.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#include "stdafx.h"
#include "lua_socket_node.h"

lua_socket_node::lua_socket_node(uint32_t token, lua_State* L, std::shared_ptr<socket_mgr>& mgr, std::shared_ptr<socket_router>& router
, bool blisten, eproto_type proto_type) : m_token(token), m_mgr(mgr), m_router(router), m_proto_type(proto_type) {
lua_socket_node::lua_socket_node(uint32_t token, lua_State* L, std::shared_ptr<socket_mgr>& mgr, std::shared_ptr<socket_router>& router, bool blisten)
: m_token(token), m_mgr(mgr), m_router(router) {
m_stoken = (m_token & 0xffff) << 16;
m_luakit = std::make_shared<luakit::kit_state>(L);
m_mgr->get_remote_ip(m_token, m_ip);
if (blisten) {
m_mgr->set_accept_callback(token, [=](uint32_t steam_token, eproto_type proto_type) {
auto node = new lua_socket_node(steam_token, m_luakit->L(), m_mgr, m_router, false, proto_type);
m_mgr->set_accept_callback(token, [=](uint32_t steam_token) {
auto node = new lua_socket_node(steam_token, m_luakit->L(), m_mgr, m_router, false);
node->set_codec(m_codec);
m_luakit->object_call(this, "on_accept", nullptr, std::tie(), node);
});
Expand All @@ -23,8 +23,8 @@ lua_socket_node::lua_socket_node(uint32_t token, lua_State* L, std::shared_ptr<s
m_luakit->object_call(this, "on_error", nullptr, std::tie(), token, err);
});

m_mgr->set_package_callback(token, [=](slice* data){
return on_recv(data);
m_mgr->set_package_callback(token, [=](slice* data, eproto_type proto_type){
return on_recv(data, proto_type);
});
}

Expand Down Expand Up @@ -199,14 +199,14 @@ int lua_socket_node::transfer_hash(lua_State* L, uint32_t session_id, uint16_t s
return 0;
}

int lua_socket_node::on_recv(slice* slice) {
if (eproto_type::proto_head == m_proto_type) {
int lua_socket_node::on_recv(slice* slice, eproto_type proto_type) {
if (eproto_type::proto_head == proto_type) {
return on_call_head(slice);
}
if (eproto_type::proto_text == m_proto_type) {
if (eproto_type::proto_text == proto_type) {
return on_call_text(slice);
}
if (eproto_type::proto_rpc != m_proto_type) {
if (eproto_type::proto_rpc != proto_type) {
return on_call_data(slice);
}

Expand Down
9 changes: 4 additions & 5 deletions core/luabus/src/lua_socket_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
class lua_socket_node
{
public:
lua_socket_node(uint32_t token, lua_State* L, std::shared_ptr<socket_mgr>& mgr, std::shared_ptr<socket_router>& router
, bool blisten = false, eproto_type proto_type = eproto_type::proto_rpc);
lua_socket_node(uint32_t token, lua_State* L, std::shared_ptr<socket_mgr>& mgr, std::shared_ptr<socket_router>& router, bool blisten = false);
~lua_socket_node();

void close();

uint32_t build_session_id() { return m_stoken | m_sindex++; }
uint32_t get_route_count() { return m_router->get_route_count(); }
void set_codec(codec_base* codec) { m_codec = codec; }
void set_timeout(int ms) { m_mgr->set_timeout(m_token, ms); }
void set_nodelay(bool flag) { m_mgr->set_nodelay(m_token, flag); }
void set_codec(codec_base* codec) { m_codec = codec; }
void set_proto_type(eproto_type proto_type) { m_mgr->set_proto_type(m_token, proto_type); }

int call_data(lua_State* L);
int call(lua_State* L, uint32_t session_id, uint8_t flag);
Expand Down Expand Up @@ -59,10 +59,10 @@ class lua_socket_node
uint16_t m_sindex = 1;

private:
int on_recv(slice* slice);
int on_call_head(slice* slice);
int on_call_text(slice* slice);
int on_call_data(slice* slice);
int on_recv(slice* slice, eproto_type proto_type);
void on_call(router_header* header, slice* slice);
void on_transfer(transfer_header* header, slice* slice);
void on_forward_broadcast(router_header* header, size_t target_size);
Expand All @@ -72,5 +72,4 @@ class lua_socket_node
std::shared_ptr<socket_mgr> m_mgr;
std::shared_ptr<kit_state> m_luakit;
std::shared_ptr<socket_router> m_router;
eproto_type m_proto_type;
};
2 changes: 2 additions & 0 deletions core/luabus/src/luabus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace luabus {
lluabus.set_function("create_socket_mgr", create_socket_mgr);
lluabus.new_enum("eproto_type",
"rpc", eproto_type::proto_rpc,
"wss", eproto_type::proto_wss,
"head", eproto_type::proto_head,
"text", eproto_type::proto_text,
"mongo", eproto_type::proto_mongo,
Expand Down Expand Up @@ -85,6 +86,7 @@ namespace luabus {
"transfer_call", &lua_socket_node::transfer_call,
"transfer_hash", &lua_socket_node::transfer_hash,
"forward_target", &lua_socket_node::forward_target,
"set_proto_type", &lua_socket_node::set_proto_type,
"get_route_count", &lua_socket_node::get_route_count,
"build_session_id", &lua_socket_node::build_session_id,
"forward_transfer", &lua_socket_node::forward_transfer,
Expand Down
15 changes: 7 additions & 8 deletions core/luabus/src/socket_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#ifdef _MSC_VER
socket_listener::socket_listener(socket_mgr* mgr,
LPFN_ACCEPTEX accept_func, LPFN_GETACCEPTEXSOCKADDRS addrs_func, eproto_type proto_type) : m_proto_type(proto_type) {
LPFN_ACCEPTEX accept_func, LPFN_GETACCEPTEXSOCKADDRS addrs_func) {
mgr->increase_count();
m_mgr = mgr;
m_accept_func = accept_func;
Expand All @@ -17,8 +17,7 @@ socket_listener::socket_listener(socket_mgr* mgr,
#endif

#if defined(__linux) || defined(__APPLE__)
socket_listener::socket_listener(socket_mgr* mgr, eproto_type proto_type) :
m_proto_type(proto_type) {
socket_listener::socket_listener(socket_mgr* mgr) {
mgr->increase_count();
m_mgr = mgr;
}
Expand Down Expand Up @@ -103,12 +102,12 @@ void socket_listener::on_complete(WSAOVERLAPPED* ovl) {

set_no_block(node->fd);

auto token = m_mgr->accept_stream(node->fd, ip, m_accept_cb, m_proto_type);
auto token = m_mgr->accept_stream(node->fd, ip, m_proto_type);
if (token == 0) {
closesocket(node->fd);
}
else {
m_accept_cb(token, m_proto_type);
m_accept_cb(token);
}

node->fd = INVALID_SOCKET;
Expand Down Expand Up @@ -164,7 +163,7 @@ void socket_listener::queue_accept(WSAOVERLAPPED* ovl) {
(*m_addrs_func)(node->buffer, 0, sizeof(node->buffer[0]), sizeof(node->buffer[2]), &local_addr, &local_addr_len, &remote_addr, &remote_addr_len);
get_ip_string(ip, sizeof(ip), remote_addr, (size_t)remote_addr_len);

auto token = m_mgr->accept_stream(node->fd, ip, m_accept_cb, m_proto_type);
auto token = m_mgr->accept_stream(node->fd, ip, m_proto_type);
if (token == 0) {
closesocket(node->fd);
node->fd = INVALID_SOCKET;
Expand All @@ -174,7 +173,7 @@ void socket_listener::queue_accept(WSAOVERLAPPED* ovl) {
}

node->fd = INVALID_SOCKET;
m_accept_cb(token, m_proto_type);
m_accept_cb(token);
}
}
#endif
Expand Down Expand Up @@ -202,7 +201,7 @@ void socket_listener::on_can_recv(size_t max_len, bool is_eof) {
set_no_delay(fd, 1);
set_close_on_exec(fd);

auto token = m_mgr->accept_stream(fd, ip, m_accept_cb, m_proto_type);
auto token = m_mgr->accept_stream(fd, ip);
if (token != 0) {
m_accept_cb(token, m_proto_type);
}
Expand Down
10 changes: 4 additions & 6 deletions core/luabus/src/socket_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ struct socket_listener : public socket_object
#ifdef _MSC_VER
socket_listener(socket_mgr* mgr,
LPFN_ACCEPTEX accept_func,
LPFN_GETACCEPTEXSOCKADDRS addrs_func,
eproto_type proto_type = eproto_type::proto_rpc);
LPFN_GETACCEPTEXSOCKADDRS addrs_func);
#endif

#if defined(__linux) || defined(__APPLE__)
socket_listener(socket_mgr* mgr, eproto_type proto_type = eproto_type::proto_rpc);
socket_listener(socket_mgr* mgr);
#endif

~socket_listener();
bool setup(socket_t fd);
bool get_remote_ip(std::string& ip) override { return false; }
bool update(int64_t now) override;
void set_accept_callback(const std::function<void(int, eproto_type eproto_type)>& cb) override { m_accept_cb = cb; }
void set_accept_callback(const std::function<void(int)>& cb) override { m_accept_cb = cb; }
void set_error_callback(const std::function<void(const char*)>& cb) override { m_error_cb = cb; }

#ifdef _MSC_VER
Expand All @@ -39,10 +38,9 @@ struct socket_listener : public socket_object

private:
socket_mgr* m_mgr = nullptr;
eproto_type m_proto_type;
socket_t m_socket = INVALID_SOCKET;
std::function<void(int)> m_accept_cb;
std::function<void(const char*)> m_error_cb;
std::function<void(int, eproto_type eproto_type)> m_accept_cb;

#ifdef _MSC_VER
struct listen_node
Expand Down
28 changes: 18 additions & 10 deletions core/luabus/src/socket_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,18 @@ int socket_mgr::wait(int64_t now, int timeout) {
return (int)event_count;
}

int socket_mgr::listen(std::string& err, const char ip[], int port, eproto_type proto_type) {
int socket_mgr::listen(std::string& err, const char ip[], int port) {
int ret = false;
socket_t fd = INVALID_SOCKET;
sockaddr_storage addr;
size_t addr_len = 0;

#ifdef _MSC_VER
auto* listener = new socket_listener(this, m_accept_func, m_addrs_func, proto_type);
auto* listener = new socket_listener(this, m_accept_func, m_addrs_func);
#endif

#if defined(__linux) || defined(__APPLE__)
auto* listener = new socket_listener(this, proto_type);
auto* listener = new socket_listener(this);
#endif

ret = make_ip_addr(&addr, &addr_len, ip, port);
Expand Down Expand Up @@ -206,18 +206,18 @@ int socket_mgr::listen(std::string& err, const char ip[], int port, eproto_type
return 0;
}

int socket_mgr::connect(std::string& err, const char node_name[], const char service_name[], int timeout, eproto_type proto_type) {
int socket_mgr::connect(std::string& err, const char node_name[], const char service_name[], int timeout) {
if (is_full()) {
err = "too-many-connection";
return 0;
}

#ifdef _MSC_VER
socket_stream* stm = new socket_stream(this, m_connect_func, proto_type);
socket_stream* stm = new socket_stream(this, m_connect_func);
#endif

#if defined(__linux) || defined(__APPLE__)
socket_stream* stm = new socket_stream(this, proto_type);
socket_stream* stm = new socket_stream(this);
#endif

stm->connect(node_name, service_name, timeout);
Expand All @@ -241,6 +241,13 @@ void socket_mgr::set_nodelay(uint32_t token, int flag) {
}
}

void socket_mgr::set_proto_type(uint32_t token, eproto_type type) {
auto node = get_object(token);
if (node) {
node->set_proto_type(type);
}
}

void socket_mgr::send(uint32_t token, const void* data, size_t data_len) {
auto node = get_object(token);
if (node) {
Expand Down Expand Up @@ -286,7 +293,7 @@ int socket_mgr::get_recvbuf_size(uint32_t token){
return 0;
}

void socket_mgr::set_accept_callback(uint32_t token, const std::function<void(uint32_t, eproto_type eproto_type)>& cb) {
void socket_mgr::set_accept_callback(uint32_t token, const std::function<void(int)>& cb) {
auto node = get_object(token);
if (node) {
node->set_accept_callback(cb);
Expand All @@ -300,7 +307,7 @@ void socket_mgr::set_connect_callback(uint32_t token, const std::function<void(b
}
}

void socket_mgr::set_package_callback(uint32_t token, const std::function<int(slice*)>& cb) {
void socket_mgr::set_package_callback(uint32_t token, const std::function<int(slice*, eproto_type)>& cb) {
auto node = get_object(token);
if (node) {
node->set_package_callback(cb);
Expand Down Expand Up @@ -411,10 +418,11 @@ bool socket_mgr::watch_send(socket_t fd, socket_object* object, bool enable) {
#endif
}

int socket_mgr::accept_stream(socket_t fd, const char ip[], const std::function<void(int, eproto_type)>& cb, eproto_type proto_type) {
auto* stm = new socket_stream(this, proto_type);
int socket_mgr::accept_stream(socket_t fd, const char ip[], eproto_type proto_type) {
auto* stm = new socket_stream(this);
if (watch_accepted(fd, stm) && stm->accept_socket(fd, ip)) {
auto token = new_token();
stm->set_proto_type(proto_type);
m_objects[token] = stm;
return token;
}
Expand Down
Loading

0 comments on commit 749263b

Please sign in to comment.