diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index 8620f3e50..68e1fb0cf 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -655,10 +655,16 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder { PgSQL_Query_Result* query_result; PgSQL_Query_Result* query_result_reuse; bool new_result; + bool is_copy_out; //PgSQL_SrvC* parent; //PgSQL_Connection_userinfo* userinfo; //PgSQL_Data_Stream* myds; //int fd; + +private: + // Handles the COPY OUT response from the server. + // Returns true if it consumes all buffer data, or false if the threshold for result size is reached + bool handle_copy_out(const PGresult* result, uint64_t* processed_bytes); }; #endif /* __CLASS_PGSQL_CONNECTION_H */ diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 25f485bd7..38ba95894 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -290,6 +290,7 @@ class PgSQL_Protocol; #define PGSQL_QUERY_RESULT_READY 0x04 #define PGSQL_QUERY_RESULT_ERROR 0x08 #define PGSQL_QUERY_RESULT_EMPTY 0x10 +#define PGSQL_QUERY_RESULT_COPY_OUT 0x20 class PgSQL_Query_Result { public: @@ -435,6 +436,40 @@ class PgSQL_Query_Result { */ unsigned int add_ready_status(PGTransactionStatusType txn_status); + /** + * @brief Adds the start of a COPY OUT response to the packet. + * + * This function adds the initial part of a COPY OUT response to the packet. + * It uses the provided PGresult object to determine the necessary information + * to include in the response. + * + * @param result A pointer to the PGresult object containing the response data. + * @return The number of bytes added to the packet. + */ + unsigned int add_copy_out_response_start(const PGresult* result); + + /** + * @brief Adds a row of data to the COPY OUT response. + * + * This function adds a row of data to the ongoing COPY OUT response. The data + * is provided as a pointer to the row data and its length. + * + * @param data A pointer to the row data to be added. + * @param len The length of the row data in bytes. + * @return The number of bytes added to the packet. + */ + unsigned int add_copy_out_row(const void* data, unsigned int len); + + /** + * @brief Adds the end of a COPY OUT response to the packet. + * + * This function adds the final part of a COPY OUT response to the packet, + * indicating the end of the response. + * + * @return The number of bytes added to the packet. + */ + unsigned int add_copy_out_response_end(); + /** * @brief Retrieves the query result set and copies it to a PtrSizeArray. * @@ -870,6 +905,45 @@ class PgSQL_Protocol : public MySQL_Protocol { */ unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result); + /** + * @brief Copies the start of a response to a PgSQL_Query_Result. + * + * This function copies the initial part of a response to the provided + * PgSQL_Query_Result object. It can optionally send the response. + * + * @param send Whether to send the response. + * @param pg_query_result The PgSQL_Query_Result object to copy the response to. + * @param result The PGresult object containing the response data. + * @return The number of bytes copied. + */ + unsigned int copy_out_response_start_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + + /** + * @brief Copies a row to a PgSQL_Query_Result. + * + * This function copies a single row of data to the provided PgSQL_Query_Result + * object. It can optionally send the row data. + * + * @param send Whether to send the row data. + * @param pg_query_result The PgSQL_Query_Result object to copy the row to. + * @param data The row data to copy. + * @param len The length of the row data. + * @return The number of bytes copied. + */ + unsigned int copy_out_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const unsigned char* data, unsigned int len); + + /** + * @brief Copies the end of a response to a PgSQL_Query_Result. + * + * This function copies the final part of a response to the provided + * PgSQL_Query_Result object. It can optionally send the response. + * + * @param send Whether to send the response. + * @param pg_query_result The PgSQL_Query_Result object to copy the response to. + * @return The number of bytes copied. + */ + unsigned int copy_out_response_end_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result); + private: /** diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index fa665e312..6e190187c 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1535,6 +1535,7 @@ PgSQL_Connection::PgSQL_Connection() { query_result = NULL; query_result_reuse = NULL; new_result = true; + is_copy_out = false; reset_error(); } @@ -1609,7 +1610,7 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) { #if ENABLE_TIMER Timer timer(myds->sess->thread->Timers.Connections_Handlers); #endif // ENABLE_TIMER - unsigned long long processed_bytes = 0; // issue #527 : this variable will store the amount of bytes processed during this event + uint64_t processed_bytes = 0; // issue #527 : this variable will store the amount of bytes processed during this event if (pgsql_conn == NULL) { // it is the first time handler() is being called async_state_machine = ASYNC_CONNECT_START; @@ -1804,6 +1805,12 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) { case PGRES_SINGLE_TUPLE: break; case PGRES_COPY_OUT: + if (handle_copy_out(result.get(), &processed_bytes) == false) { + next_event(ASYNC_USE_RESULT_CONT); + return async_state_machine; // Threashold for result size reached. Pause temporarily + } + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + break; case PGRES_COPY_IN: case PGRES_COPY_BOTH: // NOT IMPLEMENTED @@ -1920,6 +1927,7 @@ PG_ASYNC_ST PgSQL_Connection::handler(short event) { } // should be NULL assert(!pgsql_result); + assert(!is_copy_out); break; case ASYNC_RESET_SESSION_START: reset_session_start(); @@ -2188,19 +2196,21 @@ void PgSQL_Connection::fetch_result_cont(short event) { // This situation can happen when a multi-statement query has been executed. if (pgsql_result) return; - - switch (PShandleRowData(pgsql_conn, &ps_result)) { - case 0: - result_type = 2; - return; - case 1: - // we already have data available in buffer - if (PQisBusy(pgsql_conn) == 0) { - result_type = 1; - pgsql_result = PQgetResult(pgsql_conn); + + if (is_copy_out == false) { + switch (PShandleRowData(pgsql_conn, new_result, &ps_result)) { + case 0: + result_type = 2; return; + case 1: + // we already have data available in buffer + if (PQisBusy(pgsql_conn) == 0) { + result_type = 1; + pgsql_result = PQgetResult(pgsql_conn); + return; + } + break; } - break; } if (PQconsumeInput(pgsql_conn) == 0) { @@ -2217,7 +2227,7 @@ void PgSQL_Connection::fetch_result_cont(short event) { return; } - switch (PShandleRowData(pgsql_conn, &ps_result)) { + switch (PShandleRowData(pgsql_conn, new_result, &ps_result)) { case 0: result_type = 2; return; @@ -2936,3 +2946,47 @@ const char* PgSQL_Connection::get_pg_transaction_status_str() { } return "INVALID"; } + +bool PgSQL_Connection::handle_copy_out(const PGresult* result, uint64_t* processed_bytes) { + + if (new_result == true) { + const unsigned int bytes_recv = query_result->add_copy_out_response_start(result); + update_bytes_recv(bytes_recv); + new_result = false; + is_copy_out = true; + } + + char* buffer = NULL; + int copy_data_len = 0; + + while ((copy_data_len = PQgetCopyData(pgsql_conn, &buffer, 1)) > 0) { + const unsigned int bytes_recv = query_result->add_copy_out_row(buffer, copy_data_len); + update_bytes_recv(bytes_recv); + PQfreemem(buffer); + buffer = NULL; + *processed_bytes += bytes_recv; // issue #527 : this variable will store the amount of bytes processed during this event + if ( + (*processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) + || + (pgsql_thread___throttle_ratio_server_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (*processed_bytes > (uint64_t)pgsql_thread___throttle_max_bytes_per_second_to_client / 10 * (uint64_t)pgsql_thread___throttle_ratio_server_to_client)) + ) + { + return false; + } + } + + if (copy_data_len == -1) { + const unsigned int bytes_recv = query_result->add_copy_out_response_end(); + update_bytes_recv(bytes_recv); + is_copy_out = false; + } else if (copy_data_len < 0) { + const PGresult* result = PQgetResultFromPGconn(pgsql_conn); + if (result || is_error_present() == false) { + set_error_from_result(result); + proxy_error("PQgetCopyData failed. %s\n", get_error_code_with_message().c_str()); + } + is_copy_out = false; + } + + return true; +} diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index b152a4b49..6f894fb49 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1808,6 +1808,139 @@ unsigned int PgSQL_Protocol::copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_ return size; } +unsigned int PgSQL_Protocol::copy_out_response_start_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result) { + assert(pg_query_result); + assert(result); + + const int fields_cnt = PQnfields(result); + unsigned int size = 1 + 4 + 1 + 2 + (fields_cnt * 2); + + bool alloced_new_buffer = false; + unsigned char* _ptr = pg_query_result->buffer_reserve_space(size); + + // buffer is not enough to store the new row description. Remember we have already pushed data to PSarrayOUT + if (_ptr == NULL) { + _ptr = (unsigned char*)l_alloc(size); + alloced_new_buffer = true; + } + + PG_pkt pgpkt(_ptr, size); + uint8_t format = 0; // Format: Text + pgpkt.put_char('H'); + pgpkt.put_uint32(size - 1); + pgpkt.put_char(format); + pgpkt.put_uint16(fields_cnt); + + for (int i = 0; i < fields_cnt; i++) { + int format_code = PQfformat(result, i); + pgpkt.put_uint16(format_code); + + if (format_code != 0) + format = format_code; + } + + if (format != 0) { + _ptr[1 + 4] = format; + } + + + if (send == true) { + // not supported + //(*myds)->PSarrayOUT->add((void*)_ptr, size); + } + + //#ifdef DEBUG + // if (dump_pkt) { __dump_pkt(__func__, _ptr, size); } + //#endif + + pg_query_result->resultset_size = size; + + if (alloced_new_buffer) { + // we created new buffer + //pg_query_result->buffer_to_PSarrayOut(); + pg_query_result->PSarrayOUT.add(_ptr, size); + } + + pg_query_result->num_fields = fields_cnt; + pg_query_result->pkt_count++; + return size; +} + +unsigned int PgSQL_Protocol::copy_out_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, + const unsigned char* data, unsigned int len) { + assert(pg_query_result); + //assert(result); + assert(pg_query_result->num_fields); + + unsigned int size = 1 + 4 + len; // 'd', length, packet length + + bool alloced_new_buffer = false; + unsigned char* _ptr = pg_query_result->buffer_reserve_space(size); + + // buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT + if (_ptr == NULL) { + _ptr = (unsigned char*)l_alloc(size); + alloced_new_buffer = true; + } + + PG_pkt pgpkt(_ptr, size); + + pgpkt.put_char('d'); + pgpkt.put_uint32(size - 1); + pgpkt.put_bytes(data, len); + + if (send == true) { + // not supported + //(*myds)->PSarrayOUT->add((void*)_ptr, size); + } + + pg_query_result->resultset_size += size; + + if (alloced_new_buffer) { + // we created new buffer + //pg_query_result->buffer_to_PSarrayOut(); + pg_query_result->PSarrayOUT.add(_ptr, size); + } + pg_query_result->pkt_count++; + pg_query_result->num_rows += 1; + return size; +} + +unsigned int PgSQL_Protocol::copy_out_response_end_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result) { + assert(pg_query_result); + + const unsigned int size = 1 + 4; // 'c', length + bool alloced_new_buffer = false; + + unsigned char* _ptr = pg_query_result->buffer_reserve_space(size); + + // buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT + if (_ptr == NULL) { + _ptr = (unsigned char*)l_alloc(size); + alloced_new_buffer = true; + } + + PG_pkt pgpkt(_ptr, size); + + pgpkt.put_char('c'); + pgpkt.put_uint32(size - 1); + + if (send == true) { + // not supported + //(*myds)->PSarrayOUT->add((void*)_ptr, size); + } + + pg_query_result->resultset_size += size; + + if (alloced_new_buffer) { + // we created new buffer + //pg_query_result->buffer_to_PSarrayOut(); + pg_query_result->PSarrayOUT.add(_ptr, size); + } + pg_query_result->pkt_count++; + return size; +} + PgSQL_Query_Result::PgSQL_Query_Result() { buffer = NULL; transfer_started = false; @@ -1872,6 +2005,26 @@ unsigned int PgSQL_Query_Result::add_row(const PSresult* result) { return res; } +unsigned int PgSQL_Query_Result::add_copy_out_response_start(const PGresult* result) { + const unsigned int res = proto->copy_out_response_start_to_PgSQL_Query_Result(false, this, result); + result_packet_type |= PGSQL_QUERY_RESULT_COPY_OUT; + return res; +} + +unsigned int PgSQL_Query_Result::add_copy_out_row(const void* data, unsigned int len) { + const unsigned int res = proto->copy_out_row_to_PgSQL_Query_Result(false, this, (const unsigned char*)data, len); + result_packet_type |= PGSQL_QUERY_RESULT_COPY_OUT; + num_rows += 1; + return res; +} + +unsigned int PgSQL_Query_Result::add_copy_out_response_end() { + const unsigned int res = proto->copy_out_response_end_to_PgSQL_Query_Result(false, this); + result_packet_type |= PGSQL_QUERY_RESULT_COPY_OUT; + return res; +} + + unsigned int PgSQL_Query_Result::add_error(const PGresult* result) { unsigned int size = 0; diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 0131b1afd..c5a24f25c 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -961,7 +961,7 @@ bool PgSQL_Session::handler_special_queries(PtrSize_t* pkt) { } // Unsupported Features: // COPY - if (pkt->size > (5 + 5) && strncasecmp((char*)"COPY ", (char*)pkt->ptr + 5, 5) == 0) { + /*if (pkt->size > (5 + 5) && strncasecmp((char*)"COPY ", (char*)pkt->ptr + 5, 5) == 0) { client_myds->DSS = STATE_QUERY_SENT_NET; client_myds->myprot.generate_error_packet(true, true, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED, false, true); @@ -975,7 +975,7 @@ bool PgSQL_Session::handler_special_queries(PtrSize_t* pkt) { } l_free(pkt->size, pkt->ptr); return true; - } + }*/ // if (pkt->size > (5 + 18) && strncasecmp((char*)"PROXYSQL INTERNAL ", (char*)pkt->ptr + 5, 18) == 0) { return_proxysql_internal(pkt); @@ -6231,7 +6231,7 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da if (query_result && query_result->get_result_packet_type() != PGSQL_QUERY_RESULT_NO_DATA) { bool transfer_started = query_result->is_transfer_started(); // if there is an error, it will be false so results are not cached - bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); + bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); CurrentQuery.rows_sent = query_result->get_num_rows(); const auto _affected_rows = query_result->get_affected_rows(); if (_affected_rows != static_cast(-1)) {