Skip to content

Commit

Permalink
Implemented handling of COPY OUT
Browse files Browse the repository at this point in the history
Added threshold checks to manage result size
  • Loading branch information
rahim-kanji committed Nov 6, 2024
1 parent 3118c43 commit 616a8e0
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 16 deletions.
6 changes: 6 additions & 0 deletions include/PgSQL_Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,16 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder {
PgSQL_Query_Result* query_result;
PgSQL_Query_Result* query_result_reuse;
bool new_result;
bool is_copy_out;
//PgSQL_SrvC* parent;
//PgSQL_Connection_userinfo* userinfo;
//PgSQL_Data_Stream* myds;
//int fd;

private:
// Handles the COPY OUT response from the server.
// Returns true if it consumes all buffer data, or false if the threshold for result size is reached
bool handle_copy_out(const PGresult* result, uint64_t* processed_bytes);
};

#endif /* __CLASS_PGSQL_CONNECTION_H */
74 changes: 74 additions & 0 deletions include/PgSQL_Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class PgSQL_Protocol;
#define PGSQL_QUERY_RESULT_READY 0x04
#define PGSQL_QUERY_RESULT_ERROR 0x08
#define PGSQL_QUERY_RESULT_EMPTY 0x10
#define PGSQL_QUERY_RESULT_COPY_OUT 0x20

class PgSQL_Query_Result {
public:
Expand Down Expand Up @@ -435,6 +436,40 @@ class PgSQL_Query_Result {
*/
unsigned int add_ready_status(PGTransactionStatusType txn_status);

/**
* @brief Adds the start of a COPY OUT response to the packet.
*
* This function adds the initial part of a COPY OUT response to the packet.
* It uses the provided PGresult object to determine the necessary information
* to include in the response.
*
* @param result A pointer to the PGresult object containing the response data.
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_response_start(const PGresult* result);

/**
* @brief Adds a row of data to the COPY OUT response.
*
* This function adds a row of data to the ongoing COPY OUT response. The data
* is provided as a pointer to the row data and its length.
*
* @param data A pointer to the row data to be added.
* @param len The length of the row data in bytes.
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_row(const void* data, unsigned int len);

/**
* @brief Adds the end of a COPY OUT response to the packet.
*
* This function adds the final part of a COPY OUT response to the packet,
* indicating the end of the response.
*
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_response_end();

/**
* @brief Retrieves the query result set and copies it to a PtrSizeArray.
*
Expand Down Expand Up @@ -870,6 +905,45 @@ class PgSQL_Protocol : public MySQL_Protocol {
*/
unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result);

/**
* @brief Copies the start of a response to a PgSQL_Query_Result.
*
* This function copies the initial part of a response to the provided
* PgSQL_Query_Result object. It can optionally send the response.
*
* @param send Whether to send the response.
* @param pg_query_result The PgSQL_Query_Result object to copy the response to.
* @param result The PGresult object containing the response data.
* @return The number of bytes copied.
*/
unsigned int copy_out_response_start_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);

/**
* @brief Copies a row to a PgSQL_Query_Result.
*
* This function copies a single row of data to the provided PgSQL_Query_Result
* object. It can optionally send the row data.
*
* @param send Whether to send the row data.
* @param pg_query_result The PgSQL_Query_Result object to copy the row to.
* @param data The row data to copy.
* @param len The length of the row data.
* @return The number of bytes copied.
*/
unsigned int copy_out_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const unsigned char* data, unsigned int len);

/**
* @brief Copies the end of a response to a PgSQL_Query_Result.
*
* This function copies the final part of a response to the provided
* PgSQL_Query_Result object. It can optionally send the response.
*
* @param send Whether to send the response.
* @param pg_query_result The PgSQL_Query_Result object to copy the response to.
* @return The number of bytes copied.
*/
unsigned int copy_out_response_end_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result);

private:

/**
Expand Down
80 changes: 67 additions & 13 deletions lib/PgSQL_Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,7 @@ PgSQL_Connection::PgSQL_Connection() {
query_result = NULL;
query_result_reuse = NULL;
new_result = true;
is_copy_out = false;
reset_error();
}

Expand Down Expand Up @@ -1609,7 +1610,7 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) {
#if ENABLE_TIMER
Timer timer(myds->sess->thread->Timers.Connections_Handlers);
#endif // ENABLE_TIMER
unsigned long long processed_bytes = 0; // issue #527 : this variable will store the amount of bytes processed during this event
uint64_t processed_bytes = 0; // issue #527 : this variable will store the amount of bytes processed during this event
if (pgsql_conn == NULL) {
// it is the first time handler() is being called
async_state_machine = ASYNC_CONNECT_START;
Expand Down Expand Up @@ -1804,6 +1805,12 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) {
case PGRES_SINGLE_TUPLE:
break;
case PGRES_COPY_OUT:
if (handle_copy_out(result.get(), &processed_bytes) == false) {
next_event(ASYNC_USE_RESULT_CONT);
return async_state_machine; // Threashold for result size reached. Pause temporarily
}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
// NOT IMPLEMENTED
Expand Down Expand Up @@ -1920,6 +1927,7 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) {
}
// should be NULL
assert(!pgsql_result);
assert(!is_copy_out);
break;
case ASYNC_RESET_SESSION_START:
reset_session_start();
Expand Down Expand Up @@ -2188,19 +2196,21 @@ void PgSQL_Connection::fetch_result_cont(short event) {
// This situation can happen when a multi-statement query has been executed.
if (pgsql_result)
return;

switch (PShandleRowData(pgsql_conn, &ps_result)) {
case 0:
result_type = 2;
return;
case 1:
// we already have data available in buffer
if (PQisBusy(pgsql_conn) == 0) {
result_type = 1;
pgsql_result = PQgetResult(pgsql_conn);

if (is_copy_out == false) {
switch (PShandleRowData(pgsql_conn, new_result, &ps_result)) {
case 0:
result_type = 2;
return;
case 1:
// we already have data available in buffer
if (PQisBusy(pgsql_conn) == 0) {
result_type = 1;
pgsql_result = PQgetResult(pgsql_conn);
return;
}
break;
}
break;
}

if (PQconsumeInput(pgsql_conn) == 0) {
Expand All @@ -2217,7 +2227,7 @@ void PgSQL_Connection::fetch_result_cont(short event) {
return;
}

switch (PShandleRowData(pgsql_conn, &ps_result)) {
switch (PShandleRowData(pgsql_conn, new_result, &ps_result)) {
case 0:
result_type = 2;
return;
Expand Down Expand Up @@ -2936,3 +2946,47 @@ const char* PgSQL_Connection::get_pg_transaction_status_str() {
}
return "INVALID";
}

bool PgSQL_Connection::handle_copy_out(const PGresult* result, uint64_t* processed_bytes) {

if (new_result == true) {
const unsigned int bytes_recv = query_result->add_copy_out_response_start(result);
update_bytes_recv(bytes_recv);
new_result = false;
is_copy_out = true;
}

char* buffer = NULL;
int copy_data_len = 0;

while ((copy_data_len = PQgetCopyData(pgsql_conn, &buffer, 1)) > 0) {
const unsigned int bytes_recv = query_result->add_copy_out_row(buffer, copy_data_len);
update_bytes_recv(bytes_recv);
PQfreemem(buffer);
buffer = NULL;
*processed_bytes += bytes_recv; // issue #527 : this variable will store the amount of bytes processed during this event
if (
(*processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8)
||
(pgsql_thread___throttle_ratio_server_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (*processed_bytes > (uint64_t)pgsql_thread___throttle_max_bytes_per_second_to_client / 10 * (uint64_t)pgsql_thread___throttle_ratio_server_to_client))
)
{
return false;
}
}

if (copy_data_len == -1) {
const unsigned int bytes_recv = query_result->add_copy_out_response_end();
update_bytes_recv(bytes_recv);
is_copy_out = false;
} else if (copy_data_len < 0) {
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
if (result || is_error_present() == false) {
set_error_from_result(result);
proxy_error("PQgetCopyData failed. %s\n", get_error_code_with_message().c_str());
}
is_copy_out = false;
}

return true;
}
Loading

0 comments on commit 616a8e0

Please sign in to comment.