From 100630fba5968f5ae024fe74835e4d2eead704a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 24 Oct 2024 09:55:21 +0200 Subject: [PATCH 1/2] Second iteration on PostgreSQL monitoring POC This commit packs multiple fixes/improvements: - Added READONLY support for PostgreSQL. - Major rework for queries and statements used on monitoring checks: + Checks/Actions rewrote for single instance checks. + Reuse of prepared statements instance of re-preparation. - Fixed missing error handling in connection creation state machine. - Fixed connection rotation in connection pool (now FIFO). - Added support for configurable batching in scheduler thread, via '*_interval_window' variables. These variables allows to define the burstiness of the scheduling within the processing interval. - Added new config variable 'pgsql-monitor_dbname'. Allows to control which 'db' will be target by monitoring connections. - Several fixes for 'poll' timeout computation for worker threads. - Fixed edge cases for current interval detection. - Reduced deviation in scheduling intervals computation. - Refactored and simplified connection event handling. - Improved error messages for monitoring actions. - Replaced several invalid uses of 'mysql_thread___monitor_*' in favor or new 'mysql_thread___monitor_*' variables. - Honor '-M' argument for disabling monitoring support. --- include/PgSQL_HostGroups_Manager.h | 2 +- include/PgSQL_Monitor.hpp | 10 +- include/PgSQL_Thread.h | 4 + include/proxysql_glovars.hpp | 3 +- include/proxysql_structs.h | 11 + lib/PgSQL_Data_Stream.cpp | 2 +- lib/PgSQL_HostGroups_Manager.cpp | 8 +- lib/PgSQL_Monitor.cpp | 1732 ++++++++++++++++++---------- lib/PgSQL_Protocol.cpp | 4 +- lib/PgSQL_Session.cpp | 4 +- lib/PgSQL_Thread.cpp | 51 +- lib/ProxySQL_Cluster.cpp | 2 +- lib/ProxySQL_GloVars.cpp | 6 +- src/main.cpp | 12 +- 14 files changed, 1205 insertions(+), 646 deletions(-) diff --git a/include/PgSQL_HostGroups_Manager.h b/include/PgSQL_HostGroups_Manager.h index 4662107db2..86b5b2303a 100644 --- a/include/PgSQL_HostGroups_Manager.h +++ b/include/PgSQL_HostGroups_Manager.h @@ -838,7 +838,7 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { void replication_lag_action_inner(PgSQL_HGC *, const char*, unsigned int, int); void replication_lag_action(const std::list& pgsql_servers); void read_only_action(char *hostname, int port, int read_only); - void read_only_action_v2(const std::list& pgsql_servers); + void read_only_action_v2(const std::list& pgsql_servers, bool writer_is_also_reader); unsigned int get_servers_table_version(); void wait_servers_table_version(unsigned, unsigned); bool shun_and_killall(char *hostname, int port); diff --git a/include/PgSQL_Monitor.hpp b/include/PgSQL_Monitor.hpp index 6a6b9e18dd..bd5a3b7b78 100644 --- a/include/PgSQL_Monitor.hpp +++ b/include/PgSQL_Monitor.hpp @@ -14,6 +14,8 @@ #define MONITOR_SQLITE_TABLE_PGSQL_SERVER_PING_LOG "CREATE TABLE pgsql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , ping_success_time_us INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_READ_ONLY_LOG "CREATE TABLE pgsql_server_read_only_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , read_only INT DEFAULT 1 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" + #define MONITOR_SQLITE_TABLE_PGSQL_SERVERS "CREATE TABLE pgsql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port) )" #define MONITOR_SQLITE_TABLE_PROXYSQL_SERVERS "CREATE TABLE proxysql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostname, port) )" @@ -35,6 +37,8 @@ struct PgSQL_Monitor { uint64_t connect_check_OK { 0 }; uint64_t ping_check_ERR { 0 }; uint64_t ping_check_OK { 0 }; + uint64_t readonly_check_ERR { 0 }; + uint64_t readonly_check_OK { 0 }; /////////////////////////////////////////////////////////////////////////// std::vector tables_defs_monitor { @@ -45,7 +49,11 @@ struct PgSQL_Monitor { { const_cast("pgsql_server_ping_log"), const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_PING_LOG) - } + }, + { + const_cast("pgsql_server_read_only_log"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_READ_ONLY_LOG) + }, }; std::vector tables_defs_monitor_internal { diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 672694eaf7..7b06fe2fce 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -801,9 +801,11 @@ class PgSQL_Threads_Handler int monitor_history; int monitor_connect_interval; + int monitor_connect_interval_window; int monitor_connect_timeout; //! Monitor ping interval. Unit: 'ms'. int monitor_ping_interval; + int monitor_ping_interval_window; int monitor_ping_max_failures; //! Monitor ping timeout. Unit: 'ms'. int monitor_ping_timeout; @@ -811,6 +813,7 @@ class PgSQL_Threads_Handler int monitor_aws_rds_topology_discovery_interval; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_interval; + int monitor_read_only_interval_window; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_timeout; int monitor_read_only_max_timeout_count; @@ -848,6 +851,7 @@ class PgSQL_Threads_Handler int monitor_local_dns_resolver_queue_maxsize; char* monitor_username; char* monitor_password; + char* monitor_dbname; char* monitor_replication_lag_use_percona_heartbeat; int ping_interval_server_msec; int ping_timeout_server; diff --git a/include/proxysql_glovars.hpp b/include/proxysql_glovars.hpp index 68a51f25f7..23389be8e7 100644 --- a/include/proxysql_glovars.hpp +++ b/include/proxysql_glovars.hpp @@ -86,7 +86,8 @@ class ProxySQL_GlobalVariables { unsigned long long start_time; bool gdbg; bool nostart; - bool monitor; + bool my_monitor; + bool pg_monitor; bool version_check; #ifdef SO_REUSEPORT bool reuseport; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 47b149081a..ae992a4bf8 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -648,6 +648,7 @@ enum PROXYSQL_MYSQL_ERR { ER_PROXYSQL_AWS_HEALTH_CHECK_TIMEOUT = 9018, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG = 9019, ER_PROXYSQL_CONNECT_TIMEOUT = 9020, + ER_PROXYSQL_READONLY_TIMEOUT = 9021, }; enum proxysql_session_type { @@ -1084,16 +1085,21 @@ __thread int pgsql_thread___query_processor_regex; __thread bool pgsql_thread___monitor_enabled; __thread int pgsql_thread___monitor_history; __thread int pgsql_thread___monitor_connect_interval; +__thread int pgsql_thread___monitor_connect_interval_window; __thread int pgsql_thread___monitor_connect_timeout; __thread int pgsql_thread___monitor_ping_interval; +__thread int pgsql_thread___monitor_ping_interval_window; __thread int pgsql_thread___monitor_ping_max_failures; __thread int pgsql_thread___monitor_ping_timeout; __thread int pgsql_thread___monitor_read_only_interval; +__thread int pgsql_thread___monitor_read_only_interval_window; __thread int pgsql_thread___monitor_read_only_timeout; __thread int pgsql_thread___monitor_read_only_max_timeout_count; +__thread bool pgsql_thread___monitor_writer_is_also_reader; __thread int pgsql_thread___monitor_threads; __thread char* pgsql_thread___monitor_username; __thread char* pgsql_thread___monitor_password; +__thread char* pgsql_thread___monitor_dbname; //--------------------------- @@ -1371,16 +1377,21 @@ extern __thread int pgsql_thread___query_processor_regex; extern __thread bool pgsql_thread___monitor_enabled; extern __thread int pgsql_thread___monitor_history; extern __thread int pgsql_thread___monitor_connect_interval; +extern __thread int pgsql_thread___monitor_connect_interval_window; extern __thread int pgsql_thread___monitor_connect_timeout; extern __thread int pgsql_thread___monitor_ping_interval; +extern __thread int pgsql_thread___monitor_ping_interval_window; extern __thread int pgsql_thread___monitor_ping_max_failures; extern __thread int pgsql_thread___monitor_ping_timeout; extern __thread int pgsql_thread___monitor_read_only_interval; +extern __thread int pgsql_thread___monitor_read_only_interval_window; extern __thread int pgsql_thread___monitor_read_only_timeout; extern __thread int pgsql_thread___monitor_read_only_max_timeout_count; +extern __thread bool pgsql_thread___monitor_writer_is_also_reader; extern __thread int pgsql_thread___monitor_threads; extern __thread char* pgsql_thread___monitor_username; extern __thread char* pgsql_thread___monitor_password; +extern __thread char* pgsql_thread___monitor_dbname; //--------------------------- diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index f7fb1c16b0..7094fba20d 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -1414,4 +1414,4 @@ int PgSQL_Data_Stream::buffer2array() { queueIN.pkt.ptr = NULL; } return ret; -} \ No newline at end of file +} diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 99247a85ab..5f3dab7357 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -3600,7 +3600,9 @@ void PgSQL_HostGroups_Manager::read_only_action(char *hostname, int port, int re * @param pgsql_servers List of servers having hostname, port and read only value. * */ -void PgSQL_HostGroups_Manager::read_only_action_v2(const std::list& pgsql_servers) { +void PgSQL_HostGroups_Manager::read_only_action_v2( + const std::list& pgsql_servers, bool writer_is_also_reader +) { bool update_pgsql_servers_table = false; @@ -3637,7 +3639,7 @@ void PgSQL_HostGroups_Manager::read_only_action_v2(const std::listcopy_if_not_exists(HostGroup_Server_Mapping::Type::WRITER, HostGroup_Server_Mapping::Type::READER); - if (mysql_thread___monitor_writer_is_also_reader == false) { + if (writer_is_also_reader == false) { // remove node from reader host_server_mapping->clear(HostGroup_Server_Mapping::Type::READER); } @@ -3683,7 +3685,7 @@ void PgSQL_HostGroups_Manager::read_only_action_v2(const std::listcopy_if_not_exists(HostGroup_Server_Mapping::Type::WRITER, HostGroup_Server_Mapping::Type::READER); - if (mysql_thread___monitor_writer_is_also_reader == false) { + if (writer_is_also_reader == false) { // remove node from reader host_server_mapping->clear(HostGroup_Server_Mapping::Type::READER); } diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 5642d830fe..4db02cf77a 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -25,6 +25,29 @@ using std::list; extern PgSQL_Monitor* GloPgMon; extern PgSQL_Threads_Handler* GloPTH; +/** + * @brief Used for performing the PING operation. + * @details Direct use of 'libpq' isn't possible (creates new conns). + */ +const char PING_QUERY[] { "" }; +/** + * @brief Used to detect if server is a replica in 'hot_standby'. + * @details If the server is not in this mode would be assumed to be a primary. + */ +const char READ_ONLY_QUERY[] { "SELECT pg_is_in_recovery()" }; + +template +void append(std::vector& dest, std::vector&& src) { + dest.insert(dest.end(), + std::make_move_iterator(src.begin()), + std::make_move_iterator(src.end()) + ); +} + +/** + * @brief Only responsive servers are eligible for monitoring actions. + * @details Non-suitable is determined by 'ping_max_failures'. + */ const char RESP_SERVERS_QUERY_T[] { "SELECT 1 FROM (" "SELECT hostname,port,ping_error FROM pgsql_server_ping_log" @@ -36,30 +59,37 @@ const char RESP_SERVERS_QUERY_T[] { " GROUP BY hostname,port HAVING COUNT(*)=%d" }; -bool server_responds_to_ping( - SQLite3DB& db, const char* addr, int port, int max_fails -) { - bool res = true; - - cfmt_t query_fmt { cstr_format(RESP_SERVERS_QUERY_T, addr, port, max_fails, max_fails) }; +/** + * @brief Checks if a server is responsive (suitable for other monitoring ops). + * @param db The monitor DB against to perform the query. + * @param addr The server address. + * @param port The server port. + * @param max_fails Maximum number of failures to consider the server non-suitable. + * @return True if the server is suitable, false otherwise. + */ +bool server_responds_to_ping(SQLite3DB& db, const char* addr, int port, int max_fails) { + cfmt_t q_fmt { cstr_format(RESP_SERVERS_QUERY_T, addr, port, max_fails, max_fails) }; char* err { nullptr }; - unique_ptr result { db.execute_statement(query_fmt.str.c_str(), &err) }; + unique_ptr result { db.execute_statement(q_fmt.str.c_str(), &err) }; - if (!err && result && result->rows_count) { - res = false; - } else if (err) { + if (err || result == nullptr) { proxy_error( - "Internal error querying 'pgsql_server_ping_log'. Aborting query=%s error=%s\n", - query_fmt.str.c_str(), err + "Internal error querying 'pgsql_server_ping_log'. Aborting query=%s error='%s'\n", + q_fmt.str.c_str(), err ); free(err); assert(0); + } else { + return !result->rows_count; } - - return res; } +/** + * @brief Helper function for building the tables for the monitoring DB. + * @param db The monitor DB in which to create the tables. + * @param tables_defs The definitions of the tables to be created. + */ void check_and_build_standard_tables(SQLite3DB& db, const vector& tables_defs) { db.execute("PRAGMA foreign_keys = OFF"); @@ -94,7 +124,6 @@ PgSQL_Monitor::PgSQL_Monitor() { // Explicit index creation monitordb.execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON pgsql_server_connect_log (time_start_us)"); monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON pgsql_server_ping_log (time_start_us)"); - // TODO: Futher investigate monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_2 ON pgsql_server_ping_log (hostname, port, time_start_us)"); } @@ -129,15 +158,23 @@ void sqlite_bind_int64(sqlite3_stmt* stmt, int index, long long value) { ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); } +void sqlite_bind_null(sqlite3_stmt* stmt, int index) { + int rc = (*proxy_sqlite3_bind_null)(stmt, index); + ASSERT_SQLITE3_OK(rc, sqlite3_db_handle(stmt)); +} + // Helper function for executing a statement -void sqlite_execute_statement(sqlite3_stmt* stmt) { +int sqlite_execute_statement(sqlite3_stmt* stmt) { int rc = 0; + do { rc = (*proxy_sqlite3_step)(stmt); if (rc == SQLITE_LOCKED || rc == SQLITE_BUSY) { usleep(100); } } while (rc == SQLITE_LOCKED || rc == SQLITE_BUSY); + + return rc; } // Helper function for clearing bindings @@ -157,6 +194,15 @@ void sqlite_finalize_statement(sqlite3_stmt* stmt) { (*proxy_sqlite3_finalize)(stmt); } +unique_ptr sqlite_fetch_and_clear(sqlite3_stmt* stmt) { + unique_ptr result { new SQLite3_result(stmt) }; + + sqlite_clear_bindings(stmt); + sqlite_reset_statement(stmt); + + return result; +} + void update_monitor_pgsql_servers(SQLite3_result* rs, SQLite3DB* db) { std::lock_guard monitor_db_guard { GloPgMon->pgsql_srvs_mutex }; @@ -225,15 +271,20 @@ struct mon_srv_t { struct mon_user_t { string user; string pass; - string schema; + string dbname; }; struct ping_params_t { int32_t interval; + double interval_window; int32_t timeout; int32_t max_failures; }; +struct readonly_res_t { + int32_t val; +}; + struct ping_conf_t { unique_ptr srvs_info; ping_params_t params; @@ -241,6 +292,7 @@ struct ping_conf_t { struct connect_params_t { int32_t interval; + double interval_window; int32_t timeout; int32_t ping_max_failures; int32_t ping_interval; @@ -253,8 +305,12 @@ struct connect_conf_t { struct readonly_params_t { int32_t interval; + double interval_window; int32_t timeout; int32_t max_timeout_count; + int32_t ping_max_failures; + int32_t ping_interval; + bool writer_is_also_reader; }; struct readonly_conf_t { @@ -262,7 +318,7 @@ struct readonly_conf_t { readonly_params_t params; }; -struct mon_tasks_conf_t { +struct tasks_conf_t { ping_conf_t ping; connect_conf_t connect; readonly_conf_t readonly; @@ -309,13 +365,18 @@ vector ext_srvs(const unique_ptr& srvs_info) { return srvs; } -// First part of fetchStatusConfig :: [(resulset,config)] -mon_tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager* hgm) { +/** + * @brief Fetches updated config to be used in the current monitoring interval. + * @param mon Pointer to 'PgSQL_Monitor' module instance. + * @param hgm Pointer to 'PgSQL_HostGroups_Manager' module instance. + * @return Updated config to be used for interval tasks. + */ +tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager* hgm) { // Update the 'monitor_internal.pgsql_servers' servers info. { try { std::lock_guard pgsql_srvs_guard(hgm->pgsql_servers_to_monitor_mutex); - update_monitor_pgsql_servers(hgm->pgsql_servers_to_monitor, &GloPgMon->monitordb); + update_monitor_pgsql_servers(hgm->pgsql_servers_to_monitor, &mon->monitordb); } catch (const std::exception& e) { proxy_error("Exception e=%s\n", e.what()); } @@ -339,11 +400,12 @@ mon_tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager )}; - return mon_tasks_conf_t { + return tasks_conf_t { ping_conf_t { std::move(ping_srvrs), ping_params_t { pgsql_thread___monitor_ping_interval * 1000, + pgsql_thread___monitor_ping_interval_window / 100.0, pgsql_thread___monitor_ping_timeout * 1000, pgsql_thread___monitor_ping_max_failures } @@ -352,6 +414,7 @@ mon_tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager std::move(connect_srvrs), connect_params_t { pgsql_thread___monitor_connect_interval * 1000, + pgsql_thread___monitor_connect_interval_window / 100.0, pgsql_thread___monitor_connect_timeout * 1000, // TODO: Revisit this logic; For now identical to previous // - Used for server responsiveness @@ -364,30 +427,46 @@ mon_tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager std::move(readonly_srvs), readonly_params_t { pgsql_thread___monitor_read_only_interval * 1000, + pgsql_thread___monitor_read_only_interval_window / 100.0, pgsql_thread___monitor_read_only_timeout * 1000, - pgsql_thread___monitor_read_only_max_timeout_count + pgsql_thread___monitor_read_only_max_timeout_count, + pgsql_thread___monitor_ping_max_failures, + pgsql_thread___monitor_ping_interval * 1000, + pgsql_thread___monitor_writer_is_also_reader } }, mon_user_t { pgsql_thread___monitor_username, - pgsql_thread___monitor_password + pgsql_thread___monitor_password, + pgsql_thread___monitor_dbname } }; } -using task_params_t = std::unique_ptr>; +using op_params_t = std::unique_ptr>; +using op_result_t = std::unique_ptr>; struct op_st_t { - uint64_t start; - uint64_t end; - mon_srv_t srv_info; + // :: info + mon_srv_t srv_info; mon_user_t user_info; - task_params_t task_params; + op_params_t op_params; + // :: state + uint64_t exec_time { 0 }; + op_result_t op_result; }; struct task_st_t { - uint64_t start; - uint64_t end; + // :: info + task_type_t type; + uint64_t sched_intv; + // :: state + uint64_t start { 0 }; + uint64_t end { 0 }; + op_st_t op_st; +}; + +struct task_inf_t { task_type_t type; op_st_t op_st; }; @@ -400,132 +479,289 @@ struct state_t { enum class task_status_t { success, failure }; mf_unique_ptr strdup_no_lf(const char* input) { - if (input == nullptr) return nullptr; - size_t length = std::strlen(input); + if (input == nullptr) return nullptr; + + size_t len = std::strlen(input); + char* res = static_cast(malloc(len + 1)); + memset(res, 0, len + 1); + + bool in_lf = false; + size_t res_pos = 0; + + for (size_t i = 0; i < len; i++) { + if (input[i] == '\n') { + if (i < len - 1) { + res[res_pos] = ' '; + res_pos++; + } + in_lf = true; + } else if (in_lf && (input[i] == ' ' || input[i] == '\t')) { + if (input[i - 1] == '\n' && (input[i] == ' ' || input[i] == '\t')) { + res[res_pos] = ' '; + res_pos++; + } else { + continue; + } + } else { + in_lf = false; + res[res_pos] = input[i]; + res_pos++; + } + } + + res[res_pos] = '\0'; + + return mf_unique_ptr(res); +} + +void set_failed_st(state_t& st, ASYNC_ST new_st, mf_unique_ptr err) { + st.conn.state = new_st; + st.conn.err = std::move(err); + st.task.end = monotonic_time(); +} + +void set_finish_st(state_t& st, ASYNC_ST new_st, op_result_t res = {}) { + st.conn.state = new_st; + st.task.op_st.op_result = std::move(res); + st.task.end = monotonic_time(); +} + +short handle_async_check_cont(state_t& st, short _) { + pgsql_conn_t& pgconn { st.conn }; + + // Single command queries; 'PQisBusy' and 'PQconsumeInput' not required + PGresult* res { PQgetResult(pgconn.conn) }; + + // Wait for the result asynchronously + if (res == NULL) { + if (st.task.type == task_type_t::ping) { + set_finish_st(st, ASYNC_PING_END); + } else { + set_finish_st(st, ASYNC_QUERY_END); + } + } else { + // Check for errors in the query execution + ExecStatusType status = PQresultStatus(res); + + if (status == PGRES_EMPTY_QUERY) { + set_finish_st(st, ASYNC_PING_END); + // Cleanup of resultset required for conn reuse + PQclear(PQgetResult(pgconn.conn)); + } else if (status == PGRES_TUPLES_OK) { + int row_count = PQntuples(res); + + if (row_count > 0) { + const char* value_str { PQgetvalue(res, 0, 0) }; + bool value { strcmp(value_str, "t") == 0 }; + + set_finish_st(st, ASYNC_QUERY_END, + op_result_t { + new readonly_res_t { value }, + [] (void* v) { delete static_cast(v); } + } + ); + } else { + const mon_srv_t& srv { st.task.op_st.srv_info }; + const char err_t[] { "Invalid number of rows '%d'" }; + char err_b[sizeof(err_t) + 12] = { 0 }; + + cstr_format(err_b, err_t, row_count); + proxy_error( + "Monitor readonly failed addr='%s:%d' status=%d error='%s'\n", + srv.addr.c_str(), srv.port, status, err_b + ); + set_failed_st(st, ASYNC_QUERY_FAILED, mf_unique_ptr(strdup(err_b))); + } + + // Cleanup of resultset required for conn reuse + PQclear(PQgetResult(pgconn.conn)); + } else if (status != PGRES_COMMAND_OK) { + const mon_srv_t& srv { st.task.op_st.srv_info }; + auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + + if (st.task.type == task_type_t::ping) { + proxy_error( + "Monitor ping failed addr='%s:%d' status=%d error='%s'\n", + srv.addr.c_str(), srv.port, status, err.get() + ); + set_failed_st(st, ASYNC_PING_FAILED, std::move(err)); + } else if (st.task.type == task_type_t::readonly) { + proxy_error( + "Monitor readonly failed addr='%s:%d' status=%d error='%s'\n", + srv.addr.c_str(), srv.port, status, err.get() + ); + set_failed_st(st, ASYNC_QUERY_FAILED, std::move(err)); + } else { + assert(0 && "Invalid task type"); + } + } + } + + // Clear always; we assume no resultset on ping + PQclear(res); + + return POLLIN; +} + +pair handle_async_connect_cont(state_t& st, short revent) { + pgsql_conn_t& pgconn { st.conn }; + + short req_events { 0 }; + bool proc_again { false }; + + // NOTE: SCRAM-Handshake-256 may introduce an observable delay (CPU intensive). + PostgresPollingStatusType poll_res { PQconnectPoll(pgconn.conn) }; + pgconn.fd = PQsocket(pgconn.conn); + + switch (poll_res) { + case PGRES_POLLING_WRITING: + req_events |= POLLOUT; + break; + case PGRES_POLLING_ACTIVE: + case PGRES_POLLING_READING: + req_events |= POLLIN; + break; + case PGRES_POLLING_OK: + pgconn.state = ASYNC_ST::ASYNC_CONNECT_END; + + if (st.task.type == task_type_t::connect) { + st.task.end = monotonic_time(); + } else if (st.task.type == task_type_t::ping) { + proc_again = true; + } else if (st.task.type == task_type_t::readonly) { + proc_again = true; + } else { + assert(0 && "Non-implemented task-type"); + } + break; + case PGRES_POLLING_FAILED: { + // During connection phase use `PQerrorMessage` + const mon_srv_t& srv { st.task.op_st.srv_info }; + auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + + proxy_error( + "Monitor connect failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, err.get() + ); + set_failed_st(st, ASYNC_CONNECT_FAILED, std::move(err)); + break; + } + } + + return { req_events, proc_again }; +} + +short handle_async_connect_end(state_t& st, short _) { + pgsql_conn_t& pgconn { st.conn }; + + short req_events { 0 }; + const char* QUERY { st.task.type == task_type_t::ping ? PING_QUERY : READ_ONLY_QUERY }; + + int rc = PQsendQuery(pgconn.conn, QUERY); + if (rc == 0) { + const mon_srv_t& srv { st.task.op_st.srv_info }; + auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + + if (st.task.type == task_type_t::ping) { + proxy_error( + "Monitor ping start failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, err.get() + ); + set_failed_st(st, ASYNC_PING_FAILED, std::move(err)); + } else if (st.task.type == task_type_t::readonly) { + proxy_error( + "Monitor readonly start failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, err.get() + ); + set_failed_st(st, ASYNC_QUERY_FAILED, std::move(err)); + } else { + assert(0 && "Invalid task type"); + } + } else { + int res = PQflush(pgconn.conn); - if (length > 0 && input[length - 1] == '\n') { - length--; - } + if (res < 0) { + const mon_srv_t& srv { st.task.op_st.srv_info }; + auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - char* result = static_cast(malloc(length + 1)); + if (st.task.type == task_type_t::ping) { + proxy_error( + "Monitor ping start failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, err.get() + ); + set_failed_st(st, ASYNC_PING_FAILED, std::move(err)); + } else if (st.task.type == task_type_t::readonly) { + proxy_error( + "Monitor readonly start failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, err.get() + ); + set_failed_st(st, ASYNC_QUERY_FAILED, std::move(err)); + } else { + assert(0 && "Invalid task type"); + } + } else { + req_events |= res > 0 ? POLLOUT : POLLIN; - std::strncpy(result, input, length); - result[length] = '\0'; + if (st.task.type == task_type_t::ping) { + pgconn.state = ASYNC_ST::ASYNC_PING_CONT; + } else if (st.task.type == task_type_t::readonly) { + pgconn.state = ASYNC_ST::ASYNC_QUERY_CONT; + } else { + assert(0 && "Invalid task type"); + } + } + } - return mf_unique_ptr(result); + return req_events; } short handle_pg_event(state_t& st, short event) { pgsql_conn_t& pgconn { st.conn }; - short req_event = 0; + short req_events = 0; #ifdef DEBUG const char* host { PQhostaddr(pgconn.conn) }; const char* port { PQport(pgconn.conn) }; proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Handling event for conn fd=%d addr='%s:%s' event=%d state=%d thread=%lu\n", - pgconn.fd, host, port, event, st.conn.state, pthread_self() + "Handling event for conn fd=%d addr='%s:%s' event=%d state=%d\n", + pgconn.fd, host, port, event, st.conn.state ); #endif next_immediate: switch (pgconn.state) { + case ASYNC_ST::ASYNC_CONNECT_FAILED: { + // Conn creation failed; no socket adquired + break; + } case ASYNC_ST::ASYNC_CONNECT_CONT: { - PostgresPollingStatusType poll_res = PQconnectPoll(pgconn.conn); - - switch (poll_res) { - case PGRES_POLLING_WRITING: - // Continue writing - req_event |= POLLOUT; - break; - case PGRES_POLLING_ACTIVE: - case PGRES_POLLING_READING: - // Switch to reading - req_event |= POLLIN; - break; - case PGRES_POLLING_OK: - pgconn.state = ASYNC_ST::ASYNC_CONNECT_END; - - if (st.task.type == task_type_t::connect) { - st.task.end = monotonic_time(); - } else if (st.task.type == task_type_t::ping) { - goto next_immediate; - } else { - assert(0 && "Non-implemented task-type"); - } - break; - case PGRES_POLLING_FAILED: { - // During connection phase use `PQerrorMessage` - auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - st.task.end = monotonic_time(); - proxy_error("Monitor connect FAILED error='%s'\n", err.get()); - - pgconn.state = ASYNC_ST::ASYNC_CONNECT_FAILED; - pgconn.err = std::move(err); - break; - } + auto [events, proc_again] = handle_async_connect_cont(st, event); + req_events = events; + + if (proc_again) { + goto next_immediate; } break; } case ASYNC_ST::ASYNC_CONNECT_END: { - // Check if NOTHING, comment works - int rc = PQsendQuery(pgconn.conn, ""); - if (rc == 0) { - const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - proxy_error("Monitor ping start FAILED error='%s'\n", err.get()); - - pgconn.state = ASYNC_ST::ASYNC_PING_FAILED; - } else { - int res = PQflush(pgconn.conn); - - if (res < 0) { - const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - proxy_error("Monitor ping start FAILED error='%s'\n", err.get()); - - pgconn.state = ASYNC_ST::ASYNC_PING_FAILED; - } else { - req_event |= res > 0 ? POLLOUT : POLLIN; - pgconn.state = ASYNC_ST::ASYNC_PING_CONT; - } - } + req_events = handle_async_connect_end(st, event); break; } + case ASYNC_ST::ASYNC_QUERY_CONT: case ASYNC_ST::ASYNC_PING_CONT: { - // Single command queries; 'PQisBusy' and 'PQconsumeInput' not required - PGresult* res { PQgetResult(pgconn.conn) }; - - // Wait for the result asynchronously - if (res == NULL) { - pgconn.state = ASYNC_ST::ASYNC_PING_END; - st.task.end = monotonic_time(); - } else { - // Check for errors in the query execution - ExecStatusType status = PQresultStatus(res); - - if (status == PGRES_EMPTY_QUERY) { - const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - pgconn.state = ASYNC_ST::ASYNC_PING_END; - st.task.end = monotonic_time(); - - // Cleanup of resultset required for conn reuse - PQclear(PQgetResult(pgconn.conn)); - } else if (status != PGRES_COMMAND_OK) { - const auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - proxy_error("Monitor ping FAILED status=%d error='%s'\n", status, err.get()); - pgconn.state = ASYNC_ST::ASYNC_PING_FAILED; - } - } - - // Clear always; we assume no resultset on ping - PQclear(res); + req_events = handle_async_check_cont(st, event); break; } case ASYNC_ST::ASYNC_PING_END: { pgconn.state = ASYNC_ST::ASYNC_CONNECT_END; break; } + case ASYNC_ST::ASYNC_QUERY_END: { + pgconn.state = ASYNC_ST::ASYNC_CONNECT_END; + break; + } default: { // Should not be reached assert(0 && "State matching should be exhaustive"); @@ -533,21 +769,7 @@ short handle_pg_event(state_t& st, short event) { } } - return req_event; -} - -string build_conn_str(const task_st_t& task_st) { - const mon_srv_t& srv_info { task_st.op_st.srv_info }; - const mon_user_t& user_info { task_st.op_st.user_info }; - - return string { - "host='" + srv_info.addr + "' " - + "port='" + std::to_string(srv_info.port) + "' " - + "user='" + user_info.user + "' " - + "password='" + user_info.pass + "' " - + "dbname='" + user_info.schema + "' " - + "application_name=ProxySQL-Monitor" - }; + return req_events; } struct conn_pool_t { @@ -565,7 +787,7 @@ pair get_conn( vector expired_conns {}; { - std::lock_guard lock(mon_conn_pool.mutex); + std::lock_guard lock(conn_pool.mutex); const string key { srv_info.addr + ":" + std::to_string(srv_info.port) }; auto it = mon_conn_pool.conn_map.find(key); @@ -606,7 +828,7 @@ void put_conn(conn_pool_t& conn_pool, const mon_srv_t& srv_info, pgsql_conn_t co std::lock_guard lock(conn_pool.mutex); const string key { srv_info.addr + ":" + std::to_string(srv_info.port) }; - conn_pool.conn_map[key].emplace_back(std::move(conn)); + conn_pool.conn_map[key].emplace_front(std::move(conn)); } uint64_t get_connpool_cleanup_intv(task_st_t& task) { @@ -614,16 +836,22 @@ uint64_t get_connpool_cleanup_intv(task_st_t& task) { if (task.type == task_type_t::connect) { connect_params_t* params { - static_cast(task.op_st.task_params.get()) + static_cast(task.op_st.op_params.get()) }; res = params->ping_interval; } else if (task.type == task_type_t::ping) { ping_params_t* params { - static_cast(task.op_st.task_params.get()) + static_cast(task.op_st.op_params.get()) }; res = params->interval; + } else if (task.type == task_type_t::readonly){ + readonly_params_t* params { + static_cast(task.op_st.op_params.get()) + }; + + res = params->ping_interval; } else { assert(0 && "Non-implemented task-type"); } @@ -642,60 +870,98 @@ pair get_task_conn(conn_pool_t& conn_pool, task_st_t& task_st } } -pgsql_conn_t create_conn(task_st_t& task_st) { -#ifdef DEBUG - const mon_srv_t& srv { task_st.op_st.srv_info }; -#endif +string build_conn_str(const task_st_t& task_st) { + const mon_srv_t& srv_info { task_st.op_st.srv_info }; + const mon_user_t& user_info { task_st.op_st.user_info }; + + return string { + "host='" + srv_info.addr + "' " + + "port='" + std::to_string(srv_info.port) + "' " + + "user='" + user_info.user + "' " + + "password='" + user_info.pass + "' " + + "dbname='" + user_info.dbname + "' " + + "application_name=ProxySQL-Monitor" + }; +} + +pgsql_conn_t create_new_conn(task_st_t& task_st) { + pgsql_conn_t pgconn {}; // Initialize connection parameters const string conn_str { build_conn_str(task_st) }; - // Count the task as already started (conn acquisition) - task_st.start = monotonic_time(); - // Get task from connpool if task types allows it - pair conn_pool_res { get_task_conn(mon_conn_pool, task_st) }; + pgconn.conn = PQconnectStart(conn_str.c_str()); - if (conn_pool_res.first) { - proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Fetched conn from pool addr='%s:%d' thread=%lu\n", - srv.addr.c_str(), srv.port, pthread_self() - ); + if (pgconn.conn == NULL || PQstatus(pgconn.conn) == CONNECTION_BAD) { + const mon_srv_t& srv { task_st.op_st.srv_info }; - return std::move(conn_pool_res.second); - } else { -#ifdef DEBUG - if (task_st.type != task_type_t::connect) { - proxy_debug(PROXY_DEBUG_MONITOR, 5, - "No suitable conn found in pool addr='%s:%d' thread=%lu\n", - srv.addr.c_str(), srv.port, pthread_self() + if (pgconn.conn) { + auto error { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + proxy_error( + "Monitor connect failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, error.get() + ); + + pgconn.err = std::move(error); + task_st.end = monotonic_time(); + } else { + mf_unique_ptr error { strdup("Out of memory") }; + proxy_error( + "Monitor connect failed addr='%s:%d' error='%s'\n", + srv.addr.c_str(), srv.port, "Out of memory" ); - } -#endif - pgsql_conn_t pg_conn {}; - pg_conn.conn = PQconnectStart(conn_str.c_str()); + pgconn.err = std::move(error); + task_st.end = monotonic_time(); + } + } else { + if (PQsetnonblocking(pgconn.conn, 1) != 0) { + auto error { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; + proxy_error("Failed to set non-blocking mode error='%s'\n", error.get()); - if (pg_conn.conn == NULL || PQstatus(pg_conn.conn) == CONNECTION_BAD) { - if (pg_conn.conn) { - // WARNING: DO NOT RELEASE this PGresult - const PGresult* result = PQgetResultFromPGconn(pg_conn.conn); - const char* error { PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY) }; - proxy_error("Monitor connect failed error='%s'\n", error); - } else { - proxy_error("Monitor connect failed error='%s'\n", "Out of memory"); - } + pgconn.err = std::move(error); + task_st.end = monotonic_time(); } else { - if (PQsetnonblocking(pg_conn.conn, 1) != 0) { - // WARNING: DO NOT RELEASE this PGresult - const PGresult* result = PQgetResultFromPGconn(pg_conn.conn); - const char* error { PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY) }; - proxy_error("Failed to set non-blocking mode error=%s\n", error); - } else { - pg_conn.state = ASYNC_ST::ASYNC_CONNECT_CONT; - pg_conn.fd = PQsocket(pg_conn.conn); - } + pgconn.state = ASYNC_ST::ASYNC_CONNECT_CONT; + pgconn.fd = PQsocket(pgconn.conn); } + } + + return pgconn; +} + +#ifdef DEBUG +uint64_t count_pool_conns(conn_pool_t& pool) { + std::lock_guard lock(pool.mutex); + uint64_t count = 0; + + for (const auto& [key, connections] : pool.conn_map) { + count += connections.size(); + } + + return count; +} +#endif + +pgsql_conn_t create_conn(task_st_t& task_st) { + // Count the task as already started (conn acquisition) + task_st.start = monotonic_time(); + // Get taskFetched from conn_pool if task types allows it + pair pool_res { get_task_conn(mon_conn_pool, task_st) }; + +#ifdef DEBUG + const mon_srv_t& srv { task_st.op_st.srv_info }; + uint64_t pool_conn_count { count_pool_conns(mon_conn_pool) }; + + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Fetched conn from pool task_type=%d fd=%d addr='%s:%d' pool_conn_count=%lu\n", + int(task_st.type), pool_res.second.fd, srv.addr.c_str(), srv.port, pool_conn_count + ); +#endif - return pg_conn; + if (pool_res.first) { + return std::move(pool_res.second); + } else { + return create_new_conn(task_st); } } @@ -727,9 +993,10 @@ uint32_t required_worker_threads( return req_worker_threads; } -struct next_tasks_intvs_t { +struct tasks_intvs_t { uint64_t next_ping_at; uint64_t next_connect_at; + uint64_t next_readonly_at; }; struct task_poll_t { @@ -794,8 +1061,8 @@ tasks_stats_t compute_intv_stats(result_queue_t& results) { if (results.queue.size() != 0) { stats = tasks_stats_t { - results.queue.front().task.op_st.start, - results.queue.back().task.op_st.end, + results.queue.front().task.start, + results.queue.back().task.end, results.queue.size() }; } else { @@ -807,111 +1074,43 @@ tasks_stats_t compute_intv_stats(result_queue_t& results) { return stats; } -vector create_ping_tasks( - uint64_t curtime, - const mon_user_t user_info, - const ping_conf_t& conf +template +vector create_simple_tasks( + uint64_t curtime, const mon_user_t user, const conf_t& conf, task_type_t type ) { vector tasks {}; const vector srvs_info { ext_srvs(conf.srvs_info) }; for (const auto& srv_info : srvs_info) { - tasks.push_back(task_st_t { - curtime, - 0, - task_type_t::ping, - op_st_t { - 0, - 0, - srv_info, - user_info, - task_params_t { - new ping_params_t { conf.params }, - [] (void* v) { delete static_cast(v); } - } - } - }); - } - - return tasks; -} - -vector create_connect_tasks( - uint64_t curtime, - const mon_user_t user_info, - const connect_conf_t& conf -) { - vector tasks {}; - const vector srvs_info { ext_srvs(conf.srvs_info) }; + auto op_dtor { [] (void* v) { delete static_cast(v); } }; + op_params_t op_params { new params_t { conf.params }, op_dtor }; + op_st_t op_st { srv_info, user, std::move(op_params) }; - for (const auto& srv_info : srvs_info) { - tasks.push_back(task_st_t { - curtime, - 0, - task_type_t::connect, - op_st_t { - 0, - 0, - srv_info, - user_info, - task_params_t { - new connect_params_t { conf.params }, - [] (void* v) { delete static_cast(v); } - } - } - }); + tasks.push_back(task_st_t { type, curtime, curtime, 0, std::move(op_st) }); } return tasks; } -struct thread_t { - pthread_t handle; +using worker_queue_t = pair; +using worker_thread_t = pair>; - thread_t(const thread_t&) = delete; - thread_t(thread_t&) = delete; +std::pair create_thread(size_t stack_size, void*(*routine)(void*), void* args) { + pthread_attr_t attr; + int result = pthread_attr_init(&attr); + assert(result == 0 && "Failed to initialize thread attributes."); - thread_t() : handle(0) {}; - thread_t(pthread_t hndl) : handle(hndl) {}; - thread_t(thread_t&& other) : handle(other.handle) { - other.handle = 0; - }; + result = pthread_attr_setstacksize(&attr, stack_size); + assert(result == 0 && "Invalid stack size provided for thread creation."); - ~thread_t() { - if (handle == 0) return; - - // NOTE: Not required since **right now** threads are joined by scheduler - // //////////////////////////////////////////////////////////////////// - // Detach the thread if it's not already detached. - // int detach_result = pthread_detach(handle); - // assert(detach_result == 0 && "Failed to detach thread during destruction."); - // //////////////////////////////////////////////////////////////////// - - // Cancel the thread if it's not already canceled. - int cancel_result = pthread_cancel(handle); - assert(cancel_result == 0 && "Failed to cancel thread during destruction."); - } -}; - -using worker_queue_t = pair; -using worker_thread_t = pair>; - -std::pair create_thread(size_t stack_size, void*(*routine)(void*), void* args) { - pthread_attr_t attr; - int result = pthread_attr_init(&attr); - assert(result == 0 && "Failed to initialize thread attributes."); - - result = pthread_attr_setstacksize(&attr, stack_size); - assert(result == 0 && "Invalid stack size provided for thread creation."); - - pthread_t thread; - result = pthread_create(&thread, &attr, routine, args); + pthread_t pthread; + result = pthread_create(&pthread, &attr, routine, args); pthread_attr_destroy(&attr); if (result != 0) { - return std::make_pair(result, thread_t {}); + return std::make_pair(result, pthread_t {}); } else { - return std::make_pair(result, thread_t { thread }); + return std::make_pair(result, pthread_t { pthread }); } } @@ -956,23 +1155,23 @@ uint8_t read_signal(int fd) { } /** - * @brief At worst ⌊A/B⌋ + (B - 1) extra elements for the final thread. - * @details TODO: Improve batch scheduling to avoid network burst. + * @brief Add the supplied tasks to the worker threads queues. + * @details Scheduling to avoid network burst is config dependent. Task distribution is + * even between workers with the exception of the last thread, which at worst could + * receive ⌊A/B⌋ + (B - 1) extra elements. * - * @param worker_threads - * @param new_tasks + * @param workers Workers threads for even task distribution. + * @param tasks The tasks to be moved to the worker queues. */ -void schedule_tasks( - vector& worker_threads, vector tasks -) { - size_t tasks_per_thread { tasks.size() / worker_threads.size() }; +void schedule_tasks(vector& workers, vector&& tasks) { + size_t tasks_per_thread { tasks.size() / workers.size() }; size_t task_idx = 0; - for (size_t i = 0; i < worker_threads.size(); i++) { - task_queue_t& task_queue { worker_threads[i].second->first }; + for (size_t i = 0; i < workers.size(); i++) { + task_queue_t& task_queue { workers[i].second->first }; std::lock_guard lock_queue { task_queue.mutex }; - if (i == worker_threads.size() - 1) { + if (i == workers.size() - 1) { for (size_t j = task_idx; j < tasks.size(); j++) { task_queue.queue.push(std::move(tasks[j])); } @@ -984,66 +1183,75 @@ void schedule_tasks( } // Signal all threads to process queues - for (size_t i = 0; i < worker_threads.size(); i++) { - task_queue_t& task_queue { worker_threads[i].second->first }; + for (size_t i = 0; i < workers.size(); i++) { + task_queue_t& task_queue { workers[i].second->first }; write_signal(task_queue.comm_fd[1], 0); } } -uint64_t CONN_RATE_LIMIT = 50; - -void schedule_tasks_batches( - vector& worker_threads, vector tasks +pair compute_task_rate( + uint64_t workers, uint64_t tasks, uint64_t intv_us, double intv_pct ) { - size_t batch_c = tasks.size() / CONN_RATE_LIMIT; - size_t f_batch = tasks.size() % CONN_RATE_LIMIT; + uint64_t intv_pct_us { uint64_t(ceil(intv_us * intv_pct)) }; + double tasks_per_worker { ceil(tasks / double(workers)) }; + uint64_t delay_per_bat { uint64_t(floor(intv_pct_us / tasks_per_worker)) }; -#ifdef DEBUG - // TODO: Should give info about the kind/count of tasks scheduled - proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Scheduling tasks batches batch_count=%lu final_batch=%lu\n", - batch_c, f_batch - ); -#endif + return { workers, delay_per_bat }; +} - vector> batches {}; +uint64_t compute_sched_sleep(uint64_t curtime, uint64_t closest_intv, uint64_t next_batch_wait) { + const uint64_t next_intv_diff { closest_intv < curtime ? 0 : closest_intv - curtime }; + const uint64_t max_wait_us { std::min({ next_batch_wait, next_intv_diff }) }; - for (size_t i = 0; i <= batch_c; i++) { - vector new_batch {}; + return max_wait_us; +} - if (i < batch_c) { - for (size_t j = i * CONN_RATE_LIMIT; j < CONN_RATE_LIMIT * (i + 1); j++) { - new_batch.push_back(std::move(tasks[j])); - } - } else { - for (size_t j = i * CONN_RATE_LIMIT; j < f_batch; j++) { - new_batch.push_back(std::move(tasks[j])); - } - } +struct task_batch_t { + // :: info + task_type_t type; + uint64_t batch_sz; + int32_t intv_us; + double intv_window; + // :: state + uint64_t next_sched; + vector tasks; +}; - batches.push_back(std::move(new_batch)); - } +vector get_from_batch(task_batch_t& batch, uint64_t tasks) { + vector new_bat {}; - for (size_t i = 0; i < batches.size(); i++) { - schedule_tasks(worker_threads, std::move(batches[i])); - usleep(CONN_RATE_LIMIT * 1000); + if (batch.tasks.size()) { + uint64_t batch_size { tasks > batch.tasks.size() ? batch.tasks.size() : tasks }; + + new_bat.insert( + new_bat.end(), + std::make_move_iterator(batch.tasks.begin()), + std::make_move_iterator(batch.tasks.begin() + batch_size) + ); + batch.tasks.erase(batch.tasks.begin(), batch.tasks.begin() + batch_size); } + + return new_bat; } -bool check_success(pgsql_conn_t& c, task_st_t& st) { +bool is_task_success(pgsql_conn_t& c, task_st_t& st) { return ((c.state != ASYNC_ST::ASYNC_CONNECT_FAILED && c.state != ASYNC_CONNECT_TIMEOUT) - || (c.state != ASYNC_ST::ASYNC_PING_FAILED && c.state != ASYNC_PING_TIMEOUT)) + || (c.state != ASYNC_ST::ASYNC_PING_FAILED && c.state != ASYNC_PING_TIMEOUT) + || (c.state != ASYNC_ST::ASYNC_QUERY_FAILED && c.state != ASYNC_QUERY_TIMEOUT)) && ((c.state == ASYNC_ST::ASYNC_CONNECT_END && st.type == task_type_t::connect) - || (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping)); + || (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping) + || (c.state == ASYNC_ST::ASYNC_QUERY_END && st.type == task_type_t::readonly)); } bool is_task_finish(pgsql_conn_t& c, task_st_t& st) { return ((c.state == ASYNC_ST::ASYNC_CONNECT_FAILED || c.state == ASYNC_ST::ASYNC_CONNECT_TIMEOUT) - || (c.state == ASYNC_ST::ASYNC_PING_FAILED || c.state == ASYNC_ST::ASYNC_PING_TIMEOUT)) + || (c.state == ASYNC_ST::ASYNC_PING_FAILED || c.state == ASYNC_ST::ASYNC_PING_TIMEOUT) + || (c.state == ASYNC_ST::ASYNC_QUERY_FAILED || c.state == ASYNC_ST::ASYNC_QUERY_TIMEOUT)) || (c.state == ASYNC_ST::ASYNC_CONNECT_END && st.type == task_type_t::connect) - || (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping); + || (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping) + || (c.state == ASYNC_ST::ASYNC_QUERY_END && st.type == task_type_t::readonly); } void update_connect_table(SQLite3DB* db, state_t& state) { @@ -1053,16 +1261,16 @@ void update_connect_table(SQLite3DB* db, state_t& state) { ); ASSERT_SQLITE_OK(rc, db); + uint64_t op_dur_us { state.task.end - state.task.start }; + sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str()); sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port); - uint64_t op_dur_us = state.task.end - state.task.start; - // TODO: Revisit this; maybe a better way? uint64_t time_start_us = realtime_time() - op_dur_us; sqlite_bind_int64(stmt, 3, time_start_us); - uint64_t conn_succ_time_us { check_success(state.conn, state.task) ? op_dur_us : 0 }; - sqlite_bind_int64(stmt, 4, conn_succ_time_us); + uint64_t succ_time_us { is_task_success(state.conn, state.task) ? op_dur_us : 0 }; + sqlite_bind_int64(stmt, 4, succ_time_us); sqlite_bind_text(stmt, 5, state.conn.err.get()); SAFE_SQLITE3_STEP2(stmt); @@ -1073,6 +1281,7 @@ void update_connect_table(SQLite3DB* db, state_t& state) { if (state.conn.err) { const mon_srv_t& srv { state.task.op_st.srv_info }; + char* srv_addr { const_cast(srv.addr.c_str()) }; int err_code { 0 }; if (state.conn.state != ASYNC_ST::ASYNC_CONNECT_TIMEOUT) { @@ -1082,11 +1291,7 @@ void update_connect_table(SQLite3DB* db, state_t& state) { }; PgHGM->p_update_pgsql_error_counter( - p_pgsql_error_type::proxysql, - 0, - const_cast(srv.addr.c_str()), - srv.port, - err_code + p_pgsql_error_type::proxysql, 0, srv_addr, srv.port, err_code ); __sync_fetch_and_add(&GloPgMon->connect_check_ERR, 1); } else { @@ -1097,20 +1302,20 @@ void update_connect_table(SQLite3DB* db, state_t& state) { void update_ping_table(SQLite3DB* db, state_t& state) { sqlite3_stmt* stmt = nullptr; int rc = db->prepare_v2( - "INSERT OR REPLACE INTO pgsql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)", &stmt + "INSERT OR REPLACE INTO pgsql_server_ping_log VALUES (?1, ?2, ?3, ?4, ?5)", &stmt ); ASSERT_SQLITE_OK(rc, db); + uint64_t op_dur_us { state.task.end - state.task.start }; + sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str()); sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port); - uint64_t op_dur_us = state.task.end - state.task.start; - // TODO: Revisit this; maybe a better way? - uint64_t time_start_us = realtime_time() - op_dur_us; + uint64_t time_start_us { realtime_time() - op_dur_us }; sqlite_bind_int64(stmt, 3, time_start_us); + uint64_t succ_time_us { is_task_success(state.conn, state.task) ? op_dur_us : 0 }; + sqlite_bind_int64(stmt, 4, succ_time_us); - uint64_t conn_succ_time_us { check_success(state.conn, state.task) ? op_dur_us : 0 }; - sqlite_bind_int64(stmt, 4, conn_succ_time_us); sqlite_bind_text(stmt, 5, state.conn.err.get()); SAFE_SQLITE3_STEP2(stmt); @@ -1121,6 +1326,7 @@ void update_ping_table(SQLite3DB* db, state_t& state) { if (state.conn.err) { const mon_srv_t& srv { state.task.op_st.srv_info }; + char* srv_addr { const_cast(srv.addr.c_str()) }; int err_code { 0 }; if (state.conn.state != ASYNC_ST::ASYNC_PING_TIMEOUT) { @@ -1130,11 +1336,7 @@ void update_ping_table(SQLite3DB* db, state_t& state) { }; PgHGM->p_update_pgsql_error_counter( - p_pgsql_error_type::proxysql, - 0, - const_cast(srv.addr.c_str()), - srv.port, - err_code + p_pgsql_error_type::proxysql, 0, srv_addr, srv.port, err_code ); __sync_fetch_and_add(&GloPgMon->ping_check_ERR, 1); } else { @@ -1142,130 +1344,123 @@ void update_ping_table(SQLite3DB* db, state_t& state) { } } -const char MAINT_PING_LOG_QUERY[] { - "DELETE FROM pgsql_server_ping_log WHERE time_start_us < ?1" -}; - -const char MAINT_CONNECT_LOG_QUERY[] { - "DELETE FROM pgsql_server_connect_log WHERE time_start_us < ?1" -}; +void update_readonly_table(SQLite3DB* db, state_t& state) { + readonly_res_t* op_result { + static_cast(state.task.op_st.op_result.get()) + }; -void maint_monitor_table(SQLite3DB* db, const char query[], const ping_params_t& params) { - sqlite3_stmt* stmt { nullptr }; - int rc = db->prepare_v2(query, &stmt); + sqlite3_stmt* stmt = nullptr; + int rc = db->prepare_v2( + "INSERT OR REPLACE INTO pgsql_server_read_only_log VALUES (?1, ?2, ?3, ?4, ?5, ?6)", &stmt + ); ASSERT_SQLITE_OK(rc, db); - if (pgsql_thread___monitor_history < (params.interval * (params.max_failures + 1)) / 1000) { - if (static_cast(params.interval) < uint64_t(3600000) * 1000) { - pgsql_thread___monitor_history = (params.interval * (params.max_failures + 1)) / 1000; - } + uint64_t op_dur_us { state.task.end - state.task.start }; + + sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str()); + sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port); + + uint64_t time_start_us { realtime_time() - op_dur_us }; + sqlite_bind_int64(stmt, 3, time_start_us); + + uint64_t succ_time_us { is_task_success(state.conn, state.task) ? op_dur_us : 0 }; + sqlite_bind_int64(stmt, 4, succ_time_us); + + if (op_result) { + sqlite_bind_int64(stmt, 5, op_result->val); + } else { + sqlite_bind_null(stmt, 5); } - uint64_t max_history_age { realtime_time() - uint64_t(pgsql_thread___monitor_history)*1000 }; - sqlite_bind_int64(stmt, 1, max_history_age); + sqlite_bind_text(stmt, 6, state.conn.err.get()); + SAFE_SQLITE3_STEP2(stmt); sqlite_clear_bindings(stmt); sqlite_reset_statement(stmt); sqlite_finalize_statement(stmt); -} -const char PING_MON_HOSTS_QUERY[] { - "SELECT DISTINCT" - " a.hostname," - " a.port" - " FROM" - " monitor_internal.pgsql_servers a" - " JOIN pgsql_server_ping_log b ON a.hostname = b.hostname" - " WHERE" - " b.ping_error IS NOT NULL" - " AND b.ping_error NOT LIKE '%%password authentication failed for user%%'" -}; + if (state.conn.err) { + const mon_srv_t& srv { state.task.op_st.srv_info }; + char* srv_addr { const_cast(srv.addr.c_str()) }; + int err_code { 0 }; + + if (state.conn.state != ASYNC_ST::ASYNC_QUERY_TIMEOUT) { + err_code = 9100 + state.conn.state; + } else { + err_code = ER_PROXYSQL_READONLY_TIMEOUT; + }; + + PgHGM->p_update_pgsql_error_counter( + p_pgsql_error_type::proxysql, 0, srv_addr, srv.port, err_code + ); + __sync_fetch_and_add(&GloPgMon->readonly_check_ERR, 1); + } else { + __sync_fetch_and_add(&GloPgMon->readonly_check_OK, 1); + } +} -const char HOST_TO_SHUNN_QUERY[] { +const char CHECK_HOST_ERR_LIMIT_QUERY[] { "SELECT 1" " FROM" " (" " SELECT hostname, port, ping_error" " FROM pgsql_server_ping_log" - " WHERE hostname = '%s' AND port = '%s'" + " WHERE hostname = ? AND port = ?" " ORDER BY time_start_us DESC" - " LIMIT % d" + " LIMIT ?" " ) a" " WHERE" " ping_error IS NOT NULL" - " AND ping_error NOT LIKE '%%password authentication failed for user%%'" + " AND ping_error NOT LIKE '%password authentication failed for user%'" " GROUP BY" - " hostname," - " port" + " hostname, port" " HAVING" - " COUNT(*) = %d" + " COUNT(*) = ?" }; -void shunn_non_resp_srvs(SQLite3DB* db, state_t& state) { - ping_params_t* params { static_cast(state.task.op_st.task_params.get()) }; - char* err { nullptr }; +thread_local sqlite3_stmt* CHECK_HOST_ERR_LIMIT_STMT { nullptr }; - unique_ptr resultset { db->execute_statement(PING_MON_HOSTS_QUERY, &err) }; - if (err) { - proxy_error( - "Internal query error. Aborting query=%s error=%s\n", PING_MON_HOSTS_QUERY, err - ); - free(err); - assert(0); - } +void shunn_non_resp_srv(SQLite3DB* db, state_t& state) { + ping_params_t* params { static_cast(state.task.op_st.op_params.get()) }; - vector> addr_port_p {}; + const mon_srv_t& srv { state.task.op_st.srv_info }; + char* addr { const_cast(srv.addr.c_str()) }; + int port { srv.port }; + int32_t max_fails { params->max_failures }; - for (const SQLite3_row* row : resultset->rows) { - char* addr { row->fields[0] }; - char* port { row->fields[1] }; - int32_t max_fails { params->max_failures }; + if (CHECK_HOST_ERR_LIMIT_STMT == nullptr) { + int rc = db->prepare_v2(CHECK_HOST_ERR_LIMIT_QUERY, &CHECK_HOST_ERR_LIMIT_STMT); + ASSERT_SQLITE_OK(rc, db); + } - cfmt_t query_fmt { - cstr_format(HOST_TO_SHUNN_QUERY, addr, port, max_fails, max_fails) - }; - char* err { nullptr }; - unique_ptr resultset { - db->execute_statement(query_fmt.str.c_str(), &err) - }; + sqlite_bind_text(CHECK_HOST_ERR_LIMIT_STMT, 1, addr); + sqlite_bind_int(CHECK_HOST_ERR_LIMIT_STMT, 2, port); + sqlite_bind_int(CHECK_HOST_ERR_LIMIT_STMT, 3, max_fails); + sqlite_bind_int(CHECK_HOST_ERR_LIMIT_STMT, 4, max_fails); - if (!err && resultset && resultset->rows_count) { - bool shunned { PgHGM->shun_and_killall(addr, atoi(port)) }; - if (shunned) { - proxy_error( - "Server %s:%s missed %d heartbeats, shunning it and killing all the connections." - " Disabling other checks until the node comes back online.\n", - addr, port, max_fails - ); - } - } else if (err) { + unique_ptr limit_set { sqlite_fetch_and_clear(CHECK_HOST_ERR_LIMIT_STMT) }; + + if (limit_set && limit_set->rows_count) { + bool shunned { PgHGM->shun_and_killall(addr, port) }; + if (shunned) { proxy_error( - "Internal query error. Aborting query=%s error=%s\n", - query_fmt.str.c_str(), err + "Server %s:%d missed %d heartbeats, shunning it and killing all the connections." + " Disabling other checks until the node comes back online.\n", + addr, port, max_fails ); - free(err); - assert(0); } } } -const char PING_SRVS_NO_ERRORS[] { - "SELECT DISTINCT a.hostname, a.port" - " FROM" - " monitor_internal.pgsql_servers a" - " JOIN pgsql_server_ping_log b ON a.hostname = b.hostname" - " WHERE b.ping_error IS NULL" -}; - -const char UPD_SRVS_LATENCY_QUERY[] { +const char HOST_FETCH_UPD_LATENCY_QUERY[] { "SELECT" " hostname, port, COALESCE(CAST(AVG(ping_success_time_us) AS INTEGER), 10000)" " FROM" " (" " SELECT hostname, port, ping_success_time_us, ping_error" " FROM pgsql_server_ping_log" - " WHERE hostname = '%s' AND port = '%s'" + " WHERE hostname = ? AND port = ?" " ORDER BY time_start_us DESC" " LIMIT 3" " ) a" @@ -1273,39 +1468,27 @@ const char UPD_SRVS_LATENCY_QUERY[] { " GROUP BY hostname, port" }; -void upd_srvs_latency(SQLite3DB* db, state_t& state) { - char* err { nullptr }; +thread_local sqlite3_stmt* FETCH_HOST_LATENCY_STMT { nullptr }; - unique_ptr resultset { db->execute_statement(PING_SRVS_NO_ERRORS, &err) }; - if (err) { - proxy_error( - "Internal query error. Aborting query=%s error=%s\n", PING_SRVS_NO_ERRORS, err - ); - free(err); - assert(0); +void update_srv_latency(SQLite3DB* db, state_t& state) { + const mon_srv_t& srv { state.task.op_st.srv_info }; + char* addr { const_cast(srv.addr.c_str()) }; + int port { srv.port }; + + if (FETCH_HOST_LATENCY_STMT == nullptr) { + int rc = db->prepare_v2(HOST_FETCH_UPD_LATENCY_QUERY, &FETCH_HOST_LATENCY_STMT); + ASSERT_SQLITE_OK(rc, db); } - for (const SQLite3_row* row : resultset->rows) { - char* addr { row->fields[0] }; - char* port { row->fields[1] }; + sqlite_bind_text(FETCH_HOST_LATENCY_STMT, 1, addr); + sqlite_bind_int(FETCH_HOST_LATENCY_STMT, 2, port); - cfmt_t query_fmt { cstr_format(UPD_SRVS_LATENCY_QUERY, addr, port) }; - char* err { nullptr }; - unique_ptr resultset { - db->execute_statement(query_fmt.str.c_str(), &err) - }; + unique_ptr resultset { sqlite_fetch_and_clear(FETCH_HOST_LATENCY_STMT) }; - if (!err && resultset && resultset->rows_count) { - for (const SQLite3_row* srv : resultset->rows) { - char* cur_latency { srv->fields[2] }; - PgHGM->set_server_current_latency_us(addr, atoi(port), atoi(cur_latency)); - } - } else if (err) { - proxy_error( - "Internal query error. Aborting query=%s error=%s\n", query_fmt.str.c_str(), err - ); - free(err); - assert(0); + if (resultset && resultset->rows_count) { + for (const SQLite3_row* srv : resultset->rows) { + char* cur_latency { srv->fields[2] }; + PgHGM->set_server_current_latency_us(addr, port, atoi(cur_latency)); } } } @@ -1314,55 +1497,176 @@ void perf_ping_actions(SQLite3DB* db, state_t& state) { // Update table entries update_ping_table(db, state); - // TODO: Checks for the following potential actions take most of the processing time. - // The actions should be redesign so the checks themselves are cheap operations, - // actions could remain expensive, as they should be the exception, not the norm. + // TODO: Checks could be redesign so the checks themselves are cheap operations. + // Actions could remain expensive, as they should be the exception, not the norm. ///////////////////////////////////////////////////////////////////////////////////// + // Shunn all problematic hosts - shunn_non_resp_srvs(db, state); + shunn_non_resp_srv(db, state); // Update 'current_lantency_ms' - upd_srvs_latency(db, state); + update_srv_latency(db, state); ///////////////////////////////////////////////////////////////////////////////////// } -void proc_task_state(state_t& state) { - pgsql_conn_t& pg_conn { state.conn }; +const char READONLY_HOSTS_QUERY_T[] { + "SELECT 1 FROM (" + " SELECT hostname, port, read_only, error FROM pgsql_server_read_only_log" + " WHERE hostname = '%s' AND port = '%d'" + " ORDER BY time_start_us DESC" + " LIMIT %d" + ") a WHERE" + " read_only IS NULL AND error LIKE '%%Operation timed out%%'" + " GROUP BY" + " hostname, port" + " HAVING" + " COUNT(*) = %d" +}; - if (state.task.type == task_type_t::connect) { +void perf_readonly_actions(SQLite3DB* db, state_t& state) { + // Update table entries + update_readonly_table(db, state); + + // Perform the readonly actions + { + const op_st_t& op_st { state.task.op_st }; + const mon_srv_t& srv { state.task.op_st.srv_info }; + readonly_params_t* params { static_cast(op_st.op_params.get()) }; + + cfmt_t q_fmt { + cstr_format( + READONLY_HOSTS_QUERY_T, + srv.addr.c_str(), + srv.port, + params->max_timeout_count, + params->max_timeout_count + ) + }; + + if (is_task_success(state.conn, state.task)) { + readonly_res_t* op_result { static_cast(op_st.op_result.get()) }; + PgHGM->read_only_action_v2( + {{ srv.addr, srv.port, op_result->val }}, params->writer_is_also_reader + ); + } else { + char* err { nullptr }; + unique_ptr resultset { db->execute_statement(q_fmt.str.c_str(), &err) }; + + if (!err && resultset && resultset->rows_count) { + proxy_error( + "Server %s:%d missed %d read_only checks. Assuming read_only=1\n", + srv.addr.c_str(), srv.port, params->max_timeout_count + ); + PgHGM->read_only_action_v2( + {{ srv.addr, srv.port, 1 }}, params->writer_is_also_reader + ); + } else if (err) { + proxy_error( + "Internal query error. Aborting query=%s error='%s'\n", q_fmt.str.c_str(), err + ); + free(err); + assert(0); + } + } + } +} + +uint64_t get_task_timeout(task_st_t& task) { + uint64_t task_to = 0; + + if (task.type == task_type_t::connect) { connect_params_t* params { - static_cast(state.task.op_st.task_params.get()) + static_cast(task.op_st.op_params.get()) + }; + + task_to = params->timeout; + } else if (task.type == task_type_t::ping) { + ping_params_t* params { + static_cast(task.op_st.op_params.get()) }; - if (monotonic_time() - state.task.start > static_cast(params->timeout)) { + task_to = params->timeout; + } else if (task.type == task_type_t::readonly) { + readonly_params_t* params { + static_cast(task.op_st.op_params.get()) + }; + + task_to = params->timeout; + } else { + assert(0 && "Non-implemented task-type"); + } + + return task_to; +} + +uint64_t get_task_max_ping_fails(task_st_t& task) { + uint64_t max_fails { 0 }; + + if (task.type == task_type_t::connect) { + connect_params_t* params { + static_cast(task.op_st.op_params.get()) + }; + + max_fails = params->ping_max_failures; + } else if (task.type == task_type_t::ping) { + ping_params_t* params { + static_cast(task.op_st.op_params.get()) + }; + + max_fails = params->max_failures; + } else if (task.type == task_type_t::readonly) { + readonly_params_t* params { + static_cast(task.op_st.op_params.get()) + }; + + max_fails = params->ping_max_failures; + } else { + assert(0 && "Non-implemented task-type"); + } + + return max_fails; +} + +void proc_task_state(state_t& state, uint64_t task_start) { + pgsql_conn_t& pg_conn { state.conn }; + state.task.op_st.exec_time += monotonic_time() - task_start; + + if (state.task.type == task_type_t::connect) { + if (monotonic_time() - state.task.start > get_task_timeout(state.task)) { // TODO: Unified state processing pg_conn.state = ASYNC_ST::ASYNC_CONNECT_TIMEOUT; - state.task.end = monotonic_time(); pg_conn.err = mf_unique_ptr(strdup("Operation timed out")); + state.task.end = monotonic_time(); // TODO: proxy_error + metrics update update_connect_table(&GloPgMon->monitordb, state); } else if (is_task_finish(state.conn, state.task)) { - // Perform the dumping update_connect_table(&GloPgMon->monitordb, state); } } else if (state.task.type == task_type_t::ping) { - ping_params_t* params { - static_cast(state.task.op_st.task_params.get()) - }; - - if (monotonic_time() - state.task.start > static_cast(params->timeout)) { + if (monotonic_time() - state.task.start > get_task_timeout(state.task)) { // TODO: Unified state processing pg_conn.state = ASYNC_ST::ASYNC_PING_TIMEOUT; - state.task.end = monotonic_time(); pg_conn.err = mf_unique_ptr(strdup("Operation timed out")); + state.task.end = monotonic_time(); // TODO: proxy_error + metrics update perf_ping_actions(&GloPgMon->monitordb, state); } else if (is_task_finish(state.conn, state.task)) { - // Perform the dumping perf_ping_actions(&GloPgMon->monitordb, state); } + } else if (state.task.type == task_type_t::readonly) { + if (monotonic_time() - state.task.start > get_task_timeout(state.task)) { + // TODO: Unified state processing + pg_conn.state = ASYNC_ST::ASYNC_QUERY_TIMEOUT; + pg_conn.err = mf_unique_ptr(strdup("Operation timed out")); + state.task.end = monotonic_time(); + + // TODO: proxy_error + metrics update + perf_readonly_actions(&GloPgMon->monitordb, state); + } else if (is_task_finish(state.conn, state.task)) { + perf_readonly_actions(&GloPgMon->monitordb, state); + } } else { assert(0 && "Non-implemented task-type"); } @@ -1383,53 +1687,30 @@ void add_scheduler_comm_task(const task_queue_t& tasks_queue, task_poll_t& task_ add_task(task_poll, POLLIN, std::move(dummy_state)); } -uint64_t MAX_CHECK_DELAY_US = 500000; - -uint64_t get_task_timeout(state_t& state) { - uint64_t task_to = 0; - - if (state.task.type == task_type_t::connect) { - connect_params_t* params { - static_cast(state.task.op_st.task_params.get()) - }; - - task_to = params->timeout; - } else if (state.task.type == task_type_t::ping) { - ping_params_t* params { - static_cast(state.task.op_st.task_params.get()) - }; - - task_to = params->timeout; - } else { - assert(0 && "Non-implemented task-type"); - } - - return task_to; -} +const uint64_t MAX_CHECK_DELAY_US { 500000 }; void* worker_thread(void* args) { pair* queues { static_cast*>(args) }; - - pthread_t self = pthread_self(); - task_queue_t& tasks_queue = queues->first; - // TODO: Not used for now; results should be used by scheduler - // result_queue_t& _ = queues->second; - bool recv_stop_signal = 0; + task_queue_t& tasks_queue { queues->first }; queue next_tasks {}; task_poll_t task_poll {}; + + bool recv_stop_signal = 0; + uint64_t prev_it_time = 0; + // Insert dummy task for scheduler comms add_scheduler_comm_task(tasks_queue, task_poll); while (recv_stop_signal == false) { - // Process wakup signal from scheduler + // Process wakeup signal from scheduler if (task_poll.fds[0].revents & POLLIN) { recv_stop_signal = read_signal(task_poll.fds[0].fd); if (recv_stop_signal == 1) { - proxy_info("Received exit signal, stopping worker thread=%ld\n", self); + proxy_info("Received exit signal. Stopping worker thread=%ld\n", pthread_self()); continue; } } @@ -1440,8 +1721,7 @@ void* worker_thread(void* args) { #ifdef DEBUG if (tasks_queue.queue.size()) { proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Fetching tasks from queue size=%lu thread=%lu\n", - tasks_queue.queue.size(), self + "Fetching tasks from queue size=%lu\n", tasks_queue.queue.size() ); } #endif @@ -1456,39 +1736,38 @@ void* worker_thread(void* args) { task_st_t task { std::move(next_tasks.front()) }; next_tasks.pop(); - if (task.type != task_type_t::ping) { // Check if server is responsive; if not, only ping tasks are processed const mon_srv_t& srv { task.op_st.srv_info }; + uint64_t max_fails { get_task_max_ping_fails(task) }; - connect_params_t* params { - static_cast(task.op_st.task_params.get()) - }; - int32_t max_fails = params->ping_max_failures; - - bool srv_resp { + bool resp_srv { server_responds_to_ping( GloPgMon->monitordb, srv.addr.c_str(), srv.port, max_fails ) }; - if (srv_resp == false) { + if (resp_srv == false) { proxy_debug(PROXY_DEBUG_MONITOR, 6, - "Skipping unresponsive server addr='%s:%d' thread=%lu\n", - srv.addr.c_str(), srv.port, self + "Skipping unresponsive server addr='%s:%d'\n", + srv.addr.c_str(), srv.port ); continue; } } + // Acquire new conn, update task on failure + uint64_t t1 { monotonic_time() }; pgsql_conn_t conn { create_conn(task) }; + task.op_st.exec_time += monotonic_time() - t1; + state_t init_st { std::move(conn), std::move(task) }; #ifdef DEBUG const mon_srv_t& srv { init_st.task.op_st.srv_info }; proxy_debug(PROXY_DEBUG_MONITOR, 6, - "Adding new task to poll addr='%s:%d' fd=%d thread=%lu\n", - srv.addr.c_str(), srv.port, conn.fd, self + "Adding new task to poll fd=%d type=%d addr='%s:%d'\n", + conn.fd, int(init_st.task.type), srv.addr.c_str(), srv.port ); #endif @@ -1496,19 +1775,22 @@ void* worker_thread(void* args) { } uint64_t next_timeout_at = ULONG_LONG_MAX; + uint64_t tasks_start = monotonic_time(); // Continue processing tasks; Next async operation for (size_t i = 1; i < task_poll.size; i++) { + uint64_t task_start = monotonic_time(); + #if DEBUG pollfd& pfd { task_poll.fds[i] }; state_t& task_st { task_poll.tasks[i] }; proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Processing task fd=%d revents=%d type=%d state=%d thread=%ld\n", - pfd.fd, pfd.revents, int(task_st.task.type), task_st.conn.state, self + "Processing task fd=%d revents=%d type=%d state=%d\n", + pfd.fd, pfd.revents, int(task_st.task.type), task_st.conn.state ); #endif - // filtering is possible here for the task + // Filtering is possible here for the task if (task_poll.fds[i].revents) { task_poll.fds[i].events = handle_pg_event( task_poll.tasks[i], task_poll.fds[i].revents @@ -1520,28 +1802,25 @@ void* worker_thread(void* args) { // TODO: Dump all relevant task state and changes due 'pg_event' proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Updating task state fd=%d conn_st=%d thread=%lu\n", - conn.fd, static_cast(conn.state), self + "Updating task state fd=%d conn_st=%d\n", conn.fd, conn.state ); // Process task status; Update final state if finished - proc_task_state(task_poll.tasks[i]); + proc_task_state(task_poll.tasks[i], task_start); // TODO: Dump all relevant task state proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Updated task state fd=%d conn_st=%d thread=%lu\n", - conn.fd, static_cast(conn.state), self + "Updated task state fd=%d conn_st=%d\n", conn.fd, conn.state ); // Failed/finished task; resuse conn / cleanup resources if (is_task_finish(conn, task_poll.tasks[i].task)) { // TODO: Dump all relevant task state proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Finished task fd=%d conn_st=%d thread=%ld\n", - conn.fd, static_cast(conn.state), pthread_self() + "Finished task fd=%d conn_st=%d\n", conn.fd, conn.state ); - if (check_success(task_poll.tasks[i].conn, task_poll.tasks[i].task)) { + if (is_task_success(task_poll.tasks[i].conn, task_poll.tasks[i].task)) { const mon_srv_t& srv { task_poll.tasks[i].task.op_st.srv_info }; // TODO: Better unified design to update state @@ -1551,36 +1830,50 @@ void* worker_thread(void* args) { put_conn(mon_conn_pool, srv, std::move(task_poll.tasks[i].conn)); proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Succeed task conn returned to pool fd=%d conn_st=%d thread=%ld\n", - conn.fd, static_cast(conn.state), pthread_self() + "Succeed task conn returned to pool fd=%d conn_st=%d\n", + conn.fd, conn.state ); } else { PQfinish(task_poll.tasks[i].conn.conn); proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Failed task conn killed fd=%d conn_st=%d thread=%ld\n", - conn.fd, static_cast(conn.state), pthread_self() + "Failed task conn killed fd=%d conn_st=%d\n", conn.fd, conn.state ); } // Remove from poll; after conn cleanup rm_task_fast(task_poll, i); } else { - uint64_t task_to = get_task_timeout(task_poll.tasks[i]); + uint64_t task_to = get_task_timeout(task_poll.tasks[i].task); uint64_t task_due_to = task_poll.tasks[i].task.start + task_to; next_timeout_at = next_timeout_at > task_due_to ? task_due_to : next_timeout_at; } } - uint64_t curtime = monotonic_time(); - uint64_t next_to_wait = next_timeout_at - curtime; - uint64_t poll_wait = next_to_wait > MAX_CHECK_DELAY_US ? MAX_CHECK_DELAY_US : next_to_wait; + const uint64_t tasks_end { monotonic_time() }; + prev_it_time = tasks_end - tasks_start; + + uint64_t to_timeout_us { next_timeout_at - tasks_end }; + uint64_t poll_timeout_us { + to_timeout_us > MAX_CHECK_DELAY_US ? MAX_CHECK_DELAY_US : to_timeout_us + }; proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Waiting for poll fds_len=%lu wait=%lu thread=%ld\n", task_poll.size, poll_wait, self + "Waiting for poll fds_len=%lu poll_to=%lu\n", task_poll.size, poll_timeout_us ); - int rc = poll(task_poll.fds.data(), task_poll.size, poll_wait); + + int rc = poll(task_poll.fds.data(), task_poll.size, poll_timeout_us/1000); + uint64_t poll_waited = monotonic_time() - tasks_end; + + for (size_t i = 1; i < task_poll.size; i++) { + if (!task_poll.fds[i].revents) { + task_poll.tasks[i].task.op_st.exec_time += prev_it_time; + } + + task_poll.tasks[i].task.op_st.exec_time += poll_waited; + } + proxy_debug(PROXY_DEBUG_MONITOR, 5, - "Wokeup from poll fds_len=%lu thread=%ld\n", task_poll.size, self + "Wokeup from poll fds_len=%lu\n", task_poll.size ); if (rc == -1 && errno == EINTR) @@ -1591,9 +1884,184 @@ void* worker_thread(void* args) { } } + sqlite_finalize_statement(CHECK_HOST_ERR_LIMIT_STMT); + sqlite_finalize_statement(FETCH_HOST_LATENCY_STMT); + return NULL; } +void maint_monitor_table(SQLite3DB* db, const char query[], const ping_params_t& params) { + sqlite3_stmt* stmt { nullptr }; + int rc = db->prepare_v2(query, &stmt); + ASSERT_SQLITE_OK(rc, db); + + if (pgsql_thread___monitor_history < (params.interval * (params.max_failures + 1)) / 1000) { + if (static_cast(params.interval) < uint64_t(3600000) * 1000) { + pgsql_thread___monitor_history = (params.interval * (params.max_failures + 1)) / 1000; + } + } + + uint64_t max_history_age { realtime_time() - uint64_t(pgsql_thread___monitor_history)*1000 }; + sqlite_bind_int64(stmt, 1, max_history_age); + SAFE_SQLITE3_STEP2(stmt); + + sqlite_clear_bindings(stmt); + sqlite_reset_statement(stmt); + sqlite_finalize_statement(stmt); +} + +const char MAINT_PING_LOG_QUERY[] { + "DELETE FROM pgsql_server_ping_log WHERE time_start_us < ?1" +}; + +const char MAINT_CONNECT_LOG_QUERY[] { + "DELETE FROM pgsql_server_connect_log WHERE time_start_us < ?1" +}; + +const char MAINT_READONLY_LOG_QUERY[] { + "DELETE FROM pgsql_server_read_only_log WHERE time_start_us < ?1" +}; + +/** + * @brief Performs the required maintenance in the monitor log tables. + * @param tasks_conf The updated tasks config for the interval. + * @param next_intvs Timestamps of each operation next interval. + * @param intv_start Timestamp of current interval start. + */ +void maint_mon_tables( + const tasks_conf_t& tasks_conf, const tasks_intvs_t& next_intvs, uint64_t intv_start +) { + if (next_intvs.next_ping_at <= intv_start) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Performed PING table maintenance intv_start=%lu\n", intv_start + ); + maint_monitor_table( + &GloPgMon->monitordb, MAINT_PING_LOG_QUERY, tasks_conf.ping.params + ); + } + + if (next_intvs.next_connect_at <= intv_start) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Performed CONNECT table maintenance intv_start=%lu\n", intv_start + ); + maint_monitor_table( + &GloPgMon->monitordb, MAINT_CONNECT_LOG_QUERY, tasks_conf.ping.params + ); + } + + if (next_intvs.next_readonly_at <= intv_start) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Performed READONLY table maintenance intv_start=%lu\n", intv_start + ); + maint_monitor_table( + &GloPgMon->monitordb, MAINT_READONLY_LOG_QUERY, tasks_conf.ping.params + ); + } +} + +/** + * @brief Builds the tasks batches for the current interval. + * @param tasks_conf The updated tasks config for the interval. + * @param next_intvs Timestamps of each operation next interval. + * @param intv_start Timestamp of current interval start. + * @return The new tasks batches to be queued for the worker threads. + */ +vector build_intv_batches( + const tasks_conf_t& tasks_conf, const tasks_intvs_t& next_intvs, uint64_t intv_start +) { + vector intv_tasks {}; + + if (next_intvs.next_ping_at <= intv_start && tasks_conf.ping.srvs_info->rows_count) { + intv_tasks.push_back({ + task_type_t::ping, + uint64_t(tasks_conf.ping.srvs_info->rows_count), + tasks_conf.ping.params.interval, + tasks_conf.ping.params.interval_window, + intv_start, + create_simple_tasks( + intv_start, tasks_conf.user_info, tasks_conf.ping, task_type_t::ping + ) + }); + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Created PING tasks tasks=%lu intv_start=%lu\n", + intv_tasks.back().tasks.size(), intv_start + ); + } + + if (next_intvs.next_connect_at <= intv_start && tasks_conf.connect.srvs_info->rows_count) { + intv_tasks.push_back({ + task_type_t::connect, + uint64_t(tasks_conf.connect.srvs_info->rows_count), + tasks_conf.connect.params.interval, + tasks_conf.connect.params.interval_window, + intv_start, + create_simple_tasks( + intv_start, tasks_conf.user_info, tasks_conf.connect, task_type_t::connect + ) + }); + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Created CONNECT tasks tasks=%lu intv_start=%lu\n", + intv_tasks.back().tasks.size(), intv_start + ); + } + + if (next_intvs.next_readonly_at <= intv_start && tasks_conf.readonly.srvs_info->rows_count) { + intv_tasks.push_back({ + task_type_t::readonly, + uint64_t(tasks_conf.readonly.srvs_info->rows_count), + tasks_conf.readonly.params.interval, + tasks_conf.readonly.params.interval_window, + intv_start, + create_simple_tasks( + intv_start, tasks_conf.user_info, tasks_conf.readonly, task_type_t::readonly + ) + }); + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Created READONLY tasks tasks=%lu intv_start=%lu\n", + intv_tasks.back().tasks.size(), intv_start + ); + } + + return intv_tasks; +} + +/** + * @brief Computes new tasks intervals using current ones and interval start. + * @param conf The updated tasks config for the interval. + * @param next_intvs Timestamps of each operation next interval. + * @param intv_start Timestamp of current interval start. + * @return The new next intervals for the tasks. + */ +tasks_intvs_t compute_next_intvs( + const tasks_conf_t& conf, const tasks_intvs_t& next_intvs, uint64_t intv_start +) { + tasks_intvs_t upd_intvs { next_intvs }; + + if (next_intvs.next_ping_at <= intv_start && conf.ping.params.interval != 0) { + if (conf.ping.params.interval != 0) { + upd_intvs.next_ping_at = intv_start + conf.ping.params.interval; + } else { + upd_intvs.next_ping_at = ULONG_MAX; + } + } + if (next_intvs.next_connect_at <= intv_start && conf.connect.params.interval != 0) { + if (conf.connect.params.interval != 0) { + upd_intvs.next_connect_at = intv_start + conf.connect.params.interval; + } else { + upd_intvs.next_connect_at = ULONG_MAX; + } + } + if (next_intvs.next_readonly_at <= intv_start && conf.readonly.params.interval != 0) { + if (conf.readonly.params.interval != 0) { + upd_intvs.next_readonly_at = intv_start + conf.readonly.params.interval; + } else { + upd_intvs.next_readonly_at = ULONG_MAX; + } + } + + return upd_intvs; +} + void* PgSQL_monitor_scheduler_thread() { proxy_info("Started Monitor scheduler thread for PgSQL servers\n"); @@ -1609,126 +2077,158 @@ void* PgSQL_monitor_scheduler_thread() { result_queue_t conn_results {}; uint32_t worker_threads_count = pgsql_thread___monitor_threads; - vector worker_threads {}; + vector workers {}; - // TODO: Threads are right now fixed on startup. After startup, they should be dynamically - // resized based on the processing rate of the queues. We need to fix contingency in the - // current approach before this scaling is a viable option. + // TODO: Threads are right now fixed on startup. for (uint32_t i = 0; i < worker_threads_count; i++) { unique_ptr worker_queue { new worker_queue_t {} }; auto [err, th] { create_thread(2048 * 1024, worker_thread, worker_queue.get()) }; assert(err == 0 && "Thread creation failed"); - worker_threads.emplace_back(worker_thread_t { std::move(th), std::move(worker_queue) }); + workers.emplace_back(worker_thread_t { std::move(th), std::move(worker_queue) }); } uint64_t cur_intv_start = 0; - next_tasks_intvs_t next_tasks_intvs {}; + tasks_intvs_t next_intvs {}; + vector tasks_batches {}; while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true) { cur_intv_start = monotonic_time(); - if ( - cur_intv_start < next_tasks_intvs.next_ping_at - && cur_intv_start < next_tasks_intvs.next_connect_at - ) { - uint64_t closest_intv = std::min( - next_tasks_intvs.next_connect_at, next_tasks_intvs.next_ping_at + uint64_t closest_intv { + std::min({ + next_intvs.next_ping_at, + next_intvs.next_connect_at, + next_intvs.next_readonly_at + }) + }; + + if (cur_intv_start >= closest_intv) { + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Scheduling interval time=%lu delta=%lu ping=%lu connect=%lu readonly=%lu\n", + cur_intv_start, + cur_intv_start - closest_intv, + next_intvs.next_ping_at, + next_intvs.next_connect_at, + next_intvs.next_readonly_at ); - uint64_t next_check_delay = 0; - if (closest_intv > MAX_CHECK_DELAY_US) { - next_check_delay = MAX_CHECK_DELAY_US; - } else { - next_check_delay = closest_intv; + // Quick exit during shutdown/restart + if (!GloPTH) { return NULL; } + + // Check variable version changes; refresh if needed + unsigned int glover = GloPTH->get_global_version(); + if (PgSQL_Thread__variables_version < glover) { + PgSQL_Thread__variables_version = glover; + pgsql_thread->refresh_variables(); } - usleep(next_check_delay); - continue; - } + // Fetch config for next task scheduling + tasks_conf_t tasks_conf { fetch_updated_conf(GloPgMon, PgHGM) }; - // Quick exit during shutdown/restart - if (!GloPTH) { return NULL; } + // Perform table maintenance + maint_mon_tables(tasks_conf, next_intvs, cur_intv_start); - // Check variable version changes; refresh if needed - unsigned int glover = GloPTH->get_global_version(); - if (PgSQL_Thread__variables_version < glover) { - PgSQL_Thread__variables_version = glover; - pgsql_thread->refresh_variables(); - // TODO: Invalidate the connection pool? Changed monitor username / password? - } + // Create the tasks from config for this interval + vector next_batches { + build_intv_batches(tasks_conf, next_intvs, cur_intv_start) + }; - // Fetch config for next task scheduling - mon_tasks_conf_t tasks_conf { fetch_updated_conf(GloPgMon, PgHGM) }; + if (next_batches.size()) { + append(tasks_batches, std::move(next_batches)); + } - // TODO: Compute metrics from worker queues from previous processing interval - // tasks_stats_t prev_intv_stats { compute_intv_stats(worker_queues->second) }; + // Compute the next intervals for the tasks + next_intvs = compute_next_intvs(tasks_conf, next_intvs, cur_intv_start); + } - // Schedule next tasks / Compute next task interval - uint64_t cur_intv_start = monotonic_time(); + uint64_t batches_max_wait { ULONG_MAX }; - // Create the tasks from config for this interval - vector intv_tasks {}; + for (task_batch_t& batch : tasks_batches) { + if (batch.next_sched > cur_intv_start) { + uint64_t wait { batch.next_sched - cur_intv_start }; - if (next_tasks_intvs.next_ping_at < cur_intv_start) { - maint_monitor_table( - &GloPgMon->monitordb, MAINT_PING_LOG_QUERY, tasks_conf.ping.params - ); + if (batches_max_wait < wait) { + batches_max_wait = wait; + } + continue; + } - vector ping_tasks { - create_ping_tasks(cur_intv_start, tasks_conf.user_info, tasks_conf.ping), - }; - intv_tasks.insert( - intv_tasks.end(), - std::make_move_iterator(ping_tasks.begin()), - std::make_move_iterator(ping_tasks.end()) + const auto [rate, wait] = compute_task_rate( + workers.size(), batch.batch_sz, batch.intv_us, batch.intv_window ); - // Schedule next interval - next_tasks_intvs.next_ping_at = cur_intv_start + tasks_conf.ping.params.interval; - } - - if (next_tasks_intvs.next_connect_at < cur_intv_start) { - maint_monitor_table( - &GloPgMon->monitordb, MAINT_CONNECT_LOG_QUERY, tasks_conf.ping.params + proxy_debug(PROXY_DEBUG_MONITOR, 5, + "Scheduling tasks batch type=%d workers=%lu rate=%lu wait=%lu\n", + int(batch.type), workers.size(), rate, wait ); - vector conn_tasks { - create_connect_tasks(cur_intv_start, tasks_conf.user_info, tasks_conf.connect) - }; + // Schedule tasks between the worker threads; simple even distribution + vector tasks { get_from_batch(batch, rate) }; + schedule_tasks(workers, std::move(tasks)); - intv_tasks.insert( - intv_tasks.end(), - std::make_move_iterator(conn_tasks.begin()), - std::make_move_iterator(conn_tasks.end()) - ); + // Only set if there are tasks remaining + if (wait < batches_max_wait && batch.tasks.size() != 0) { + batches_max_wait = wait; + } - // Schedule next interval - next_tasks_intvs.next_connect_at = cur_intv_start + tasks_conf.connect.params.interval; + batch.next_sched = cur_intv_start + wait; } - // TODO: With previous stats compute/resize number of working threads - // uint32_t _ = required_worker_threads( - // prev_intv_stats, - // worker_threads_count, - // tasks_conf.ping.params.interval, - // intv_tasks.size() - // ); + // Remove finished batches + tasks_batches.erase( + std::remove_if(tasks_batches.begin(), tasks_batches.end(), + [] (const task_batch_t& batch) -> bool { + return batch.tasks.empty(); + } + ), + tasks_batches.end() + ); + + { + const uint64_t curtime { monotonic_time() }; + uint64_t upd_closest_intv { + std::min({ + next_intvs.next_ping_at, + next_intvs.next_connect_at, + next_intvs.next_readonly_at + }) + }; + const uint64_t next_intv_diff { upd_closest_intv < curtime ? 0 : upd_closest_intv - curtime }; + const uint64_t sched_wait_us { std::min({ batches_max_wait, next_intv_diff }) }; - // Schedule the tasks for the worker threads; dummy even distribution - schedule_tasks_batches(worker_threads, std::move(intv_tasks)); + usleep(sched_wait_us > MAX_CHECK_DELAY_US ? MAX_CHECK_DELAY_US : sched_wait_us); + } } proxy_info("Exiting PgSQL_Monitor scheduling thread\n"); // Wakeup workers for shutdown { - for (worker_thread_t& worker : worker_threads) { + for (worker_thread_t& worker : workers) { write_signal(worker.second->first.comm_fd[1], 1); } - for (worker_thread_t& worker : worker_threads) { - pthread_join(worker.first.handle, NULL); + + // Give some time for a clean exit + usleep(500 * 1000); + + // Force the exit on the remaining threads + for (worker_thread_t& worker : workers) { + pthread_cancel(worker.first); + } + + // Wait for the threads to actually exit + for (worker_thread_t& worker : workers) { + pthread_join(worker.first, NULL); + } + + // Cleanup the global connection pool; no mutex, threads joined + for (auto& entry : mon_conn_pool.conn_map) { + for (auto& conn : entry.second) { + PQfinish(conn.conn); + } } + mon_conn_pool.conn_map.clear(); } return nullptr; diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index b152a4b491..99c5ea120b 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -785,14 +785,14 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char* || ((*myds)->sess->session_type == PROXYSQL_SESSION_SQLITE) ) { - if (strcmp((const char*)user, mysql_thread___monitor_username) == 0) { + if (strcmp((const char*)user, pgsql_thread___monitor_username) == 0) { (*myds)->sess->default_hostgroup = STATS_HOSTGROUP; (*myds)->sess->default_schema = strdup((char*)"main"); // just the pointer is passed (*myds)->sess->schema_locked = false; (*myds)->sess->transaction_persistent = false; (*myds)->sess->session_fast_forward = false; (*myds)->sess->user_max_connections = 0; - password = l_strdup(mysql_thread___monitor_password); + password = l_strdup(pgsql_thread___monitor_password); } } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index c454982d14..9234f69f07 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -4369,12 +4369,12 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( ( client_myds->encrypted == false && - strncmp(client_myds->myconn->userinfo->username, mysql_thread___monitor_username, strlen(mysql_thread___monitor_username)) == 0 + strncmp(client_myds->myconn->userinfo->username, pgsql_thread___monitor_username, strlen(pgsql_thread___monitor_username)) == 0 ) ) // Do not delete this line. See bug #492 ) { if (session_type == PROXYSQL_SESSION_ADMIN) { - if ((default_hostgroup < 0) || (strncmp(client_myds->myconn->userinfo->username, mysql_thread___monitor_username, strlen(mysql_thread___monitor_username)) == 0)) { + if ((default_hostgroup < 0) || (strncmp(client_myds->myconn->userinfo->username, pgsql_thread___monitor_username, strlen(pgsql_thread___monitor_username)) == 0)) { if (default_hostgroup == STATS_HOSTGROUP) { session_type = PROXYSQL_SESSION_STATS; } diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 372357c149..e5e82236e5 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -311,15 +311,18 @@ static char* pgsql_thread_variables_names[] = { (char*)"monitor_enabled", (char*)"monitor_history", (char*)"monitor_connect_interval", + (char*)"monitor_connect_interval_window", (char*)"monitor_connect_timeout", (char*)"monitor_ping_interval", + (char*)"monitor_ping_interval_window", (char*)"monitor_ping_max_failures", (char*)"monitor_ping_timeout", -/* - (char*)"monitor_aws_rds_topology_discovery_interval", (char*)"monitor_read_only_interval", + (char*)"monitor_read_only_interval_window", (char*)"monitor_read_only_timeout", (char*)"monitor_read_only_max_timeout_count", +/* + (char*)"monitor_aws_rds_topology_discovery_interval", (char*)"monitor_replication_lag_group_by_host", (char*)"monitor_replication_lag_interval", (char*)"monitor_replication_lag_timeout", @@ -335,6 +338,7 @@ static char* pgsql_thread_variables_names[] = { */ (char*)"monitor_username", (char*)"monitor_password", + (char*)"monitor_dbname", /* (char*)"monitor_replication_lag_use_percona_heartbeat", (char*)"monitor_query_interval", @@ -350,8 +354,8 @@ static char* pgsql_thread_variables_names[] = { (char*)"monitor_local_dns_cache_refresh_interval", (char*)"monitor_local_dns_resolver_queue_maxsize", (char*)"monitor_wait_timeout", - (char*)"monitor_writer_is_also_reader", */ + (char*)"monitor_writer_is_also_reader", (char*)"max_allowed_packet", (char*)"tcp_keepalive_time", (char*)"use_tcp_keepalive", @@ -932,12 +936,15 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.monitor_enabled = true; variables.monitor_history = 7200000; // changed in 2.6.0 : was 600000 variables.monitor_connect_interval = 120000; + variables.monitor_connect_interval_window = 50; variables.monitor_connect_timeout = 600; variables.monitor_ping_interval = 8000; + variables.monitor_ping_interval_window = 10; variables.monitor_ping_max_failures = 3; variables.monitor_ping_timeout = 1000; variables.monitor_aws_rds_topology_discovery_interval=1000; variables.monitor_read_only_interval = 1000; + variables.monitor_read_only_interval_window = 10; variables.monitor_read_only_timeout = 800; variables.monitor_read_only_max_timeout_count = 3; variables.monitor_replication_lag_group_by_host = false; @@ -966,6 +973,7 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.monitor_local_dns_resolver_queue_maxsize = 128; variables.monitor_username = strdup((char*)"monitor"); variables.monitor_password = strdup((char*)"monitor"); + variables.monitor_dbname = strdup((char*)"postgres"); /* TODO: Remove variables.monitor_replication_lag_use_percona_heartbeat = strdup((char*)""); */ @@ -1181,6 +1189,7 @@ char* PgSQL_Threads_Handler::get_variable_string(char* name) { if (!strncmp(name, "monitor_", 8)) { if (!strcmp(name, "monitor_username")) return strdup(variables.monitor_username); if (!strcmp(name, "monitor_password")) return strdup(variables.monitor_password); + if (!strcmp(name, "monitor_dbname")) return strdup(variables.monitor_dbname); /* if (!strcmp(name, "monitor_replication_lag_use_percona_heartbeat")) return strdup(variables.monitor_replication_lag_use_percona_heartbeat); */ @@ -1504,6 +1513,7 @@ char* PgSQL_Threads_Handler::get_variable(char* name) { // this is the public fu if (!strncasecmp(name, "monitor_", 8)) { if (!strcasecmp(name, "monitor_username")) return strdup(variables.monitor_username); if (!strcasecmp(name, "monitor_password")) return strdup(variables.monitor_password); + if (!strcasecmp(name, "monitor_dbname")) return strdup(variables.monitor_dbname); /* if (!strcasecmp(name, "monitor_replication_lag_use_percona_heartbeat")) return strdup(variables.monitor_replication_lag_use_percona_heartbeat); */ @@ -1615,6 +1625,11 @@ bool PgSQL_Threads_Handler::set_variable(char* name, const char* value) { // thi variables.monitor_password = strdup(value); return true; } + if (!strcasecmp(name, "monitor_dbname")) { + free(variables.monitor_dbname); + variables.monitor_dbname = strdup(value); + return true; + } if (!strcasecmp(name, "monitor_replication_lag_use_percona_heartbeat")) { if (vallen == 0) { // empty string free(variables.monitor_replication_lag_use_percona_heartbeat); @@ -2079,9 +2094,11 @@ char** PgSQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_history"] = make_tuple(&variables.monitor_history, 1000, 7 * 24 * 3600 * 1000, false); VariablesPointers_int["monitor_connect_interval"] = make_tuple(&variables.monitor_connect_interval, 100, 7 * 24 * 3600 * 1000, false); + VariablesPointers_int["monitor_connect_interval_window"] = make_tuple(&variables.monitor_connect_interval_window, 0, 100, false); VariablesPointers_int["monitor_connect_timeout"] = make_tuple(&variables.monitor_connect_timeout, 100, 600 * 1000, false); VariablesPointers_int["monitor_ping_interval"] = make_tuple(&variables.monitor_ping_interval, 100, 7 * 24 * 3600 * 1000, false); + VariablesPointers_int["monitor_ping_interval_window"] = make_tuple(&variables.monitor_ping_interval_window, 0, 100, false); VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600 * 1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000 * 1000, false); @@ -2089,6 +2106,7 @@ char** PgSQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false); */ VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7 * 24 * 3600 * 1000, false); + VariablesPointers_int["monitor_read_only_interval_window"] = make_tuple(&variables.monitor_read_only_interval_window, 0, 100, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600 * 1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000 * 1000, false); /* @@ -2590,6 +2608,7 @@ void PgSQL_Threads_Handler::flush_client_host_cache() { PgSQL_Threads_Handler::~PgSQL_Threads_Handler() { if (variables.monitor_username) { free(variables.monitor_username); variables.monitor_username = NULL; } if (variables.monitor_password) { free(variables.monitor_password); variables.monitor_password = NULL; } + if (variables.monitor_dbname) { free(variables.monitor_dbname); variables.monitor_dbname = NULL; } if (variables.monitor_replication_lag_use_percona_heartbeat) { free(variables.monitor_replication_lag_use_percona_heartbeat); variables.monitor_replication_lag_use_percona_heartbeat = NULL; @@ -2722,13 +2741,16 @@ PgSQL_Thread::~PgSQL_Thread() { if (pgsql_thread___monitor_username) { free(pgsql_thread___monitor_username); pgsql_thread___monitor_username = NULL; } if (pgsql_thread___monitor_password) { free(pgsql_thread___monitor_password); pgsql_thread___monitor_password = NULL; } + if (pgsql_thread___monitor_dbname) { free(pgsql_thread___monitor_dbname); pgsql_thread___monitor_dbname = NULL; } + /* if (mysql_thread___monitor_username) { free(mysql_thread___monitor_username); mysql_thread___monitor_username = NULL; } if (mysql_thread___monitor_password) { free(mysql_thread___monitor_password); mysql_thread___monitor_password = NULL; } if (mysql_thread___monitor_replication_lag_use_percona_heartbeat) { free(mysql_thread___monitor_replication_lag_use_percona_heartbeat); mysql_thread___monitor_replication_lag_use_percona_heartbeat = NULL; } + */ //if (pgsql_thread___default_schema) { free(pgsql_thread___default_schema); pgsql_thread___default_schema = NULL; } if (pgsql_thread___keep_multiplexing_variables) { free(pgsql_thread___keep_multiplexing_variables); pgsql_thread___keep_multiplexing_variables = NULL; } if (pgsql_thread___firewall_whitelist_errormsg) { free(pgsql_thread___firewall_whitelist_errormsg); pgsql_thread___firewall_whitelist_errormsg = NULL; } @@ -3799,36 +3821,39 @@ void PgSQL_Thread::refresh_variables() { mysql_thread___max_stmts_per_connection = GloPTH->get_variable_int((char*)"max_stmts_per_connection"); mysql_thread___max_stmts_cache = GloPTH->get_variable_int((char*)"max_stmts_cache"); - */ if (mysql_thread___monitor_username) free(mysql_thread___monitor_username); mysql_thread___monitor_username = GloPTH->get_variable_string((char*)"monitor_username"); if (mysql_thread___monitor_password) free(mysql_thread___monitor_password); mysql_thread___monitor_password = GloPTH->get_variable_string((char*)"monitor_password"); - /*if (mysql_thread___monitor_replication_lag_use_percona_heartbeat) free(mysql_thread___monitor_replication_lag_use_percona_heartbeat); + if (mysql_thread___monitor_replication_lag_use_percona_heartbeat) free(mysql_thread___monitor_replication_lag_use_percona_heartbeat); mysql_thread___monitor_replication_lag_use_percona_heartbeat = GloPTH->get_variable_string((char*)"monitor_replication_lag_use_percona_heartbeat"); mysql_thread___monitor_wait_timeout = (bool)GloPTH->get_variable_int((char*)"monitor_wait_timeout"); - mysql_thread___monitor_writer_is_also_reader = (bool)GloPTH->get_variable_int((char*)"monitor_writer_is_also_reader"); */ - + pgsql_thread___monitor_writer_is_also_reader = (bool)GloPTH->get_variable_int((char*)"monitor_writer_is_also_reader"); pgsql_thread___monitor_enabled = (bool)GloPTH->get_variable_int((char*)"monitor_enabled"); pgsql_thread___monitor_history = GloPTH->get_variable_int((char*)"monitor_history"); pgsql_thread___monitor_connect_interval = GloPTH->get_variable_int((char*)"monitor_connect_interval"); + pgsql_thread___monitor_connect_interval_window = GloPTH->get_variable_int((char*)"monitor_connect_interval_window"); pgsql_thread___monitor_connect_timeout = GloPTH->get_variable_int((char*)"monitor_connect_timeout"); pgsql_thread___monitor_ping_interval = GloPTH->get_variable_int((char*)"monitor_ping_interval"); + pgsql_thread___monitor_ping_interval_window = GloPTH->get_variable_int((char*)"monitor_ping_interval_window"); pgsql_thread___monitor_ping_max_failures = GloPTH->get_variable_int((char*)"monitor_ping_max_failures"); pgsql_thread___monitor_ping_timeout = GloPTH->get_variable_int((char*)"monitor_ping_timeout"); pgsql_thread___monitor_read_only_interval = GloPTH->get_variable_int((char*)"monitor_read_only_interval"); + pgsql_thread___monitor_read_only_interval_window = GloPTH->get_variable_int((char*)"monitor_read_only_interval_window"); pgsql_thread___monitor_read_only_timeout = GloPTH->get_variable_int((char*)"monitor_read_only_timeout"); + pgsql_thread___monitor_read_only_max_timeout_count = GloPTH->get_variable_int((char*)"monitor_read_only_max_timeout_count"); pgsql_thread___monitor_threads = GloPTH->get_variable_int((char*)"monitor_threads"); if (pgsql_thread___monitor_username) free(pgsql_thread___monitor_username); pgsql_thread___monitor_username = GloPTH->get_variable_string((char*)"monitor_username"); if (pgsql_thread___monitor_password) free(pgsql_thread___monitor_password); pgsql_thread___monitor_password = GloPTH->get_variable_string((char*)"monitor_password"); + if (pgsql_thread___monitor_dbname) free(pgsql_thread___monitor_dbname); + pgsql_thread___monitor_dbname = GloPTH->get_variable_string((char*)"monitor_dbname"); /* mysql_thread___monitor_aws_rds_topology_discovery_interval = GloPTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval"); - mysql_thread___monitor_read_only_max_timeout_count = GloPTH->get_variable_int((char*)"monitor_read_only_max_timeout_count"); mysql_thread___monitor_replication_lag_group_by_host = (bool)GloPTH->get_variable_int((char*)"monitor_replication_lag_group_by_host"); mysql_thread___monitor_replication_lag_interval = GloPTH->get_variable_int((char*)"monitor_replication_lag_interval"); mysql_thread___monitor_replication_lag_timeout = GloPTH->get_variable_int((char*)"monitor_replication_lag_timeout"); @@ -4437,19 +4462,19 @@ SQLite3_result* PgSQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) { pta[1] = buf; result->add_row(pta); } - /* { - pta[0] = (char*)"MySQL_Monitor_read_only_check_OK"; - sprintf(buf, "%llu", GloMyMon->read_only_check_OK); + pta[0] = (char*)"PgSQL_Monitor_read_only_check_OK"; + sprintf(buf, "%lu", GloPgMon->readonly_check_OK); pta[1] = buf; result->add_row(pta); } { - pta[0] = (char*)"MySQL_Monitor_read_only_check_ERR"; - sprintf(buf, "%llu", GloMyMon->read_only_check_ERR); + pta[0] = (char*)"PgSQL_Monitor_read_only_check_ERR"; + sprintf(buf, "%lu", GloPgMon->readonly_check_ERR); pta[1] = buf; result->add_row(pta); } + /* { pta[0] = (char*)"MySQL_Monitor_replication_lag_check_OK"; sprintf(buf, "%llu", GloMyMon->replication_lag_check_OK); diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index f4139446ab..8409475052 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -869,7 +869,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { mysql_servers_sync_algorithm mysql_server_sync_algo = (mysql_servers_sync_algorithm)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_servers_sync_algorithm, 0); if (mysql_server_sync_algo == mysql_servers_sync_algorithm::auto_select) { - mysql_server_sync_algo = (GloVars.global.monitor == false) ? + mysql_server_sync_algo = (GloVars.global.my_monitor == false) ? mysql_servers_sync_algorithm::runtime_mysql_servers_and_mysql_servers_v2 : mysql_servers_sync_algorithm::mysql_servers_v2; } diff --git a/lib/ProxySQL_GloVars.cpp b/lib/ProxySQL_GloVars.cpp index b14aedbbfe..7273fd15bf 100644 --- a/lib/ProxySQL_GloVars.cpp +++ b/lib/ProxySQL_GloVars.cpp @@ -197,7 +197,8 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() : global.gdbg=false; global.nostart=false; global.foreground=false; - global.monitor=true; + global.my_monitor=true; + global.pg_monitor=true; #ifdef IDLE_THREADS global.idle_threads=false; #endif /* IDLE_THREADS */ @@ -481,7 +482,8 @@ void ProxySQL_GlobalVariables::process_opts_post() { } if (opt->isSet("-M")) { - global.monitor=false; + global.my_monitor=false; + global.pg_monitor=false; } #ifdef SO_REUSEPORT diff --git a/src/main.cpp b/src/main.cpp index 8cb5f854b1..68bed74b9c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1414,12 +1414,20 @@ void ProxySQL_Main_init_phase3___start_all() { std::cerr << "Main phase3 : SQLite3 Server initialized in "; #endif } - if (GloVars.global.monitor==true) + if (GloVars.global.my_monitor==true) { cpu_timer t; ProxySQL_Main_init_MySQL_Monitor_module(); #ifdef DEBUG std::cerr << "Main phase3 : MySQL Monitor initialized in "; +#endif + } + if (GloVars.global.pg_monitor==true) + { + cpu_timer t; + pgsql_monitor_thread = new std::thread(&PgSQL_monitor_scheduler_thread); +#ifdef DEBUG + std::cerr << "Main phase3 : PgSQL Monitor initialized in "; #endif } #ifdef PROXYSQLCLICKHOUSE @@ -1447,8 +1455,6 @@ void ProxySQL_Main_init_phase3___start_all() { // Load the config not previously loaded for these modules GloAdmin->load_http_server(); GloAdmin->load_restapi_server(); - - pgsql_monitor_thread = new std::thread(&PgSQL_monitor_scheduler_thread); } From 12a5c31884f67d3c6641de415bf73264adcbc27f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Thu, 24 Oct 2024 09:56:45 +0200 Subject: [PATCH 2/2] Fix read-after-free in 'proxy_debug' for 'process_startup_packet' --- lib/PgSQL_Protocol.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index 99c5ea120b..86ca1901ea 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -668,7 +668,7 @@ bool PgSQL_Protocol::process_startup_packet(unsigned char* pkt, unsigned int len (*myds)->sess->writeout(); (*myds)->encrypted = have_ssl; ssl_request = true; - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 8, "Session=%p , DS=%p. SSL_REQUEST:'%c'\n", (*myds)->sess, (*myds), *ssl_supported); + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 8, "Session=%p , DS=%p. SSL_REQUEST:'%c'\n", (*myds)->sess, (*myds), have_ssl ? 'S' : 'N'); return true; }