Skip to content

Commit

Permalink
[coro_http][fix][feat]fix and support ws deflate (#662)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Apr 23, 2024
1 parent 9641d2a commit 0f05b4f
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 78 deletions.
163 changes: 137 additions & 26 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "async_simple/Unit.h"
#include "async_simple/coro/FutureAwaiter.h"
#include "async_simple/coro/Lazy.h"
#ifdef CINATRA_ENABLE_GZIP
#include "gzip.hpp"
#endif
#include "cinatra_log_wrapper.hpp"
#include "http_parser.hpp"
#include "multipart.hpp"
Expand Down Expand Up @@ -273,6 +276,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
return std::move(body_);
}

#ifdef CINATRA_ENABLE_GZIP
void set_ws_deflate(bool enable_ws_deflate) {
enable_ws_deflate_ = enable_ws_deflate;
}
#endif

// only make socket connet(or handshake) to the host
async_simple::coro::Lazy<resp_data> connect(std::string uri) {
resp_data data{};
Expand All @@ -298,10 +307,30 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
add_header("Sec-WebSocket-Key", ws_sec_key_);
add_header("Sec-WebSocket-Version", "13");

#ifdef CINATRA_ENABLE_GZIP
if (enable_ws_deflate_)
add_header("Sec-WebSocket-Extensions",
"permessage-deflate; client_max_window_bits");
#endif
req_context<> ctx{};
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));

#ifdef CINATRA_ENABLE_GZIP
if (enable_ws_deflate_) {
for (auto c : data.resp_headers) {
if (c.name == "Sec-WebSocket-Extensions") {
if (c.value.find("permessage-deflate;") != std::string::npos) {
is_server_support_ws_deflate_ = true;
}
else {
is_server_support_ws_deflate_ = false;
}
break;
}
}
}
#endif
co_return data;
}
data = co_await connect(u);
Expand Down Expand Up @@ -382,37 +411,91 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

if constexpr (is_span_v<Source>) {
std::string encode_header = ws.encode_frame(source, op, true);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(source.data(), source.size())};

auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
#ifdef CINATRA_ENABLE_GZIP
if (enable_ws_deflate_ && is_server_support_ws_deflate_) {
std::string dest_buf;
if (cinatra::gzip_codec::deflate({source.data(), source.size()},
dest_buf)) {
std::span<char> msg(dest_buf.data(), dest_buf.size());
auto header = ws.encode_frame(msg, op, true, true);
std::vector<asio::const_buffer> buffers{asio::buffer(header),
asio::buffer(dest_buf)};

auto [ec, sz] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
}
}
else {
CINATRA_LOG_ERROR << "compuress data error, data: "
<< std::string(source.begin(), source.end());
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
}
}
}
else {
while (true) {
auto result = co_await source();

std::span<char> msg(result.buf.data(), result.buf.size());
std::string encode_header = ws.encode_frame(msg, op, result.eof);
else {
#endif
std::string encode_header = ws.encode_frame(source, op, true);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(msg.data(), msg.size())};
asio::buffer(source.data(), source.size())};

auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
break;
}
#ifdef CINATRA_ENABLE_GZIP
}
#endif
}
else {
while (true) {
auto result = co_await source();
#ifdef CINATRA_ENABLE_GZIP
if (enable_ws_deflate_ && is_server_support_ws_deflate_) {
std::string dest_buf;
if (cinatra::gzip_codec::deflate(
{result.buf.data(), result.buf.size()}, dest_buf)) {
std::span<char> msg(dest_buf.data(), dest_buf.size());
std::string header = ws.encode_frame(msg, op, result.eof, true);
std::vector<asio::const_buffer> buffers{asio::buffer(header),
asio::buffer(dest_buf)};
auto [ec, sz] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
}
}
else {
CINATRA_LOG_ERROR << "compuress data error, data: "
<< std::string(result.buf.data());
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
}
}
else {
#endif
std::span<char> msg(result.buf.data(), result.buf.size());
std::string encode_header = ws.encode_frame(msg, op, result.eof);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(msg.data(), msg.size())};

if (result.eof) {
break;
auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
break;
}

if (result.eof) {
break;
}
#ifdef CINATRA_ENABLE_GZIP
}
#endif
}
}

Expand Down Expand Up @@ -903,7 +986,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t rd_size =
source->read(file_data.data(), file_data.size()).gcount();
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(bufs, {file_data.data(), rd_size},
std::string size_str;
cinatra::to_chunked_buffers(bufs, size_str, {file_data.data(), rd_size},
source->eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
Expand All @@ -922,7 +1006,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
auto [rd_ec, rd_size] =
co_await file.async_read(file_data.data(), file_data.size());
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(bufs, {file_data.data(), rd_size},
std::string size_str;
cinatra::to_chunked_buffers(bufs, size_str, {file_data.data(), rd_size},
file.eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
Expand All @@ -933,8 +1018,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
while (true) {
auto result = co_await source();
std::vector<asio::const_buffer> bufs;
std::string size_str;
cinatra::to_chunked_buffers(
bufs, {result.buf.data(), result.buf.size()}, result.eof);
bufs, size_str, {result.buf.data(), result.buf.size()}, result.eof);
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
Expand Down Expand Up @@ -1839,9 +1925,28 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}

data.status = 200;
data.resp_body = {data_ptr, payload_len};
#ifdef CINATRA_ENABLE_GZIP
if (!is_close_frame && is_server_support_ws_deflate_ &&
enable_ws_deflate_) {
inflate_str_.clear();
if (!cinatra::gzip_codec::inflate({data_ptr, payload_len},
inflate_str_)) {
CINATRA_LOG_ERROR << "uncompuress data error";
data.status = 404;
data.net_err = std::make_error_code(std::errc::protocol_error);
co_return data;
}
data.status = 200;
data.resp_body = {inflate_str_.data(), inflate_str_.size()};
}
else {
#endif

data.status = 200;
data.resp_body = {data_ptr, payload_len};
#ifdef CINATRA_ENABLE_GZIP
}
#endif
read_buf.consume(read_buf.size());
header_size = 2;

Expand Down Expand Up @@ -2024,6 +2129,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string resp_chunk_str_;
std::span<char> out_buf_;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
bool is_server_support_ws_deflate_ = false;
std::string inflate_str_;
#endif

#ifdef BENCHMARK_TEST
std::string req_str_;
bool stop_bench_ = false;
Expand Down
Loading

0 comments on commit 0f05b4f

Please sign in to comment.