From 0c5eabf5c7586ac3b4f21e979ffdb649e15053ba Mon Sep 17 00:00:00 2001 From: Niels Vogell Date: Wed, 21 May 2025 15:02:24 +0000 Subject: [PATCH 1/2] Refactoring rds_topology discovery to support Multi-AZ and Blue/Green Deployment topologies --- include/MySQL_HostGroups_Manager.h | 2 +- include/MySQL_Monitor.hpp | 31 +++- lib/MySQL_HostGroups_Manager.cpp | 18 +- lib/MySQL_Monitor.cpp | 270 ++++++++++++++++++++--------- 4 files changed, 223 insertions(+), 98 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 9bdf7b5e89..5705ac942c 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -1077,7 +1077,7 @@ class MySQL_HostGroups_Manager : public Base_HostGroups_Manager { void set_Readyset_status(char *hostname, int port, enum MySerStatus status); unsigned long long Get_Memory_Stats(); - void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector>& new_servers); + void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector>& new_servers); void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 6ce9b83ffe..621959b5a8 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -59,8 +59,12 @@ struct cmp_str { #define N_L_ASE 16 #define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com" -#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology" +#define QUERY_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology" +#define QUERY_INNODB_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY "SELECT @@global.innodb_read_only read_only, id, endpoint, port from mysql.rds_topology" +#define QUERY_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY "SELECT @@global.read_only AS read_only, id, endpoint, port, role, status, version FROM mysql.rds_topology" +#define QUERY_INNODB_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY "SELECT @@global.innodb_read_only AS read_only, id, endpoint, port, role, status, version FROM mysql.rds_topology" +#define SUPPORTED_AWS_RDS_TOPOLOGY_VERSION "1.0" /* Implementation of monitoring in AWS Aurora will be different than previous modules @@ -203,7 +207,16 @@ enum MySQL_Monitor_State_Data_Task_Type { MON_REPLICATION_LAG, MON_GALERA, MON_AWS_AURORA, - MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY + MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY, + MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY, + MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY, + MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY, +}; + +enum MySQL_Monitor_Aws_Metadata_Check { + AWS_RDS_TOPOLOGY_CHECK, + AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK, + NONE }; enum class MySQL_Monitor_State_Data_Task_Result { @@ -442,7 +455,6 @@ struct DNS_Resolve_Data { unsigned int refresh_intv = 0; }; - class MySQL_Monitor { public: static std::string dns_lookup(const std::string& hostname, bool return_hostname_if_lookup_fails = true, size_t* ip_count = NULL); @@ -450,8 +462,13 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); - void process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, int reader_hostgroup); - bool is_aws_rds_multi_az_db_cluster_topology(const std::vector& discovered_servers); + void process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, const MySQL_Monitor_State_Data* mmsd, int num_fields); + bool is_aws_rds_multi_az_db_cluster_topology(const string& originating_server_hostname, const vector>& discovered_servers); + bool is_aws_rds_topology_query_task(const MySQL_Monitor_State_Data_Task_Type& task_type); + bool mysql_row_matches_query_task(const unordered_set &field_names, const MySQL_Monitor_State_Data_Task_Type &task_type); + void add_topology_query_to_task(MySQL_Monitor_State_Data_Task_Type &task_type); + bool is_aws_rds_topology_version_supported(const string& version); + private: std::vector *tables_defs_monitor; @@ -501,6 +518,8 @@ class MySQL_Monitor { bool shutdown; pthread_mutex_t mon_en_mutex; bool monitor_enabled; + MySQL_Monitor_Aws_Metadata_Check rds_topology_check_type = MySQL_Monitor_Aws_Metadata_Check::AWS_RDS_TOPOLOGY_CHECK; + int topology_loop = 0; SQLite3DB *admindb; // internal database SQLite3DB *monitordb; // internal database SQLite3DB *monitor_internal_db; // internal database @@ -563,7 +582,7 @@ class MySQL_Monitor { * Note: Calling init_async is mandatory before executing tasks asynchronously. */ void monitor_ping_async(SQLite3_result* resultset); - void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check); + void monitor_read_only_async(SQLite3_result* resultset); void monitor_replication_lag_async(SQLite3_result* resultset); void monitor_group_replication_async(); void monitor_galera_async(); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 7b579a2310..edcf64efc9 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -7355,7 +7355,7 @@ MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *h * @param new_servers A vector of tuples where each tuple contains the values needed to add each new server. */ void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups( - const vector>& new_servers + const vector>& new_servers ) { int added_new_server = -1; @@ -7363,15 +7363,19 @@ void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_repli wrlock(); // Add the discovered server with default values - for (const tuple& s : new_servers) { + for (const tuple& s : new_servers) { string host = std::get<0>(s); - uint16_t port = std::get<1>(s); + int port = std::get<1>(s); long int hostgroup_id = std::get<2>(s); - - srv_info_t srv_info { host.c_str(), port, "AWS RDS" }; - srv_opts_t srv_opts { -1, -1, -1 }; + int weight = std::get<3>(s); + + srv_info_t srv_info { host.c_str(), (uint16_t)port, "AWS RDS" }; + srv_opts_t srv_opts { weight, -1, -1 }; - added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts); + int res = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts); + if (added_new_server < 0) { + added_new_server = res; + } } // If servers were added, perform necessary updates to internal structures diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index d589ad89f1..488cb8b9d7 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -667,7 +667,25 @@ void MySQL_Monitor_State_Data::init_async() { task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; break; case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY: - query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY; + query_ = QUERY_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY; + async_state_machine_ = ASYNC_QUERY_START; + task_timeout_ = mysql_thread___monitor_read_only_timeout; + task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; + break; + case MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY: + query_ = QUERY_INNODB_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY; + async_state_machine_ = ASYNC_QUERY_START; + task_timeout_ = mysql_thread___monitor_read_only_timeout; + task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; + break; + case MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY: + query_ = QUERY_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY; + async_state_machine_ = ASYNC_QUERY_START; + task_timeout_ = mysql_thread___monitor_read_only_timeout; + task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; + break; + case MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY: + query_ = QUERY_INNODB_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY; async_state_machine_ = ASYNC_QUERY_START; task_timeout_ = mysql_thread___monitor_read_only_timeout; task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; @@ -1703,7 +1721,13 @@ void * monitor_read_only_thread(void *arg) { } else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only"); } else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { - mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY); + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY); + } else if (mmsd->get_task_type() == MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_INNODB_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY); + } else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY) { + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY); + } else if (mmsd->get_task_type() == MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY) { + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_INNODB_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY); } else { // default mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only"); } @@ -1799,7 +1823,7 @@ void * monitor_read_only_thread(void *arg) { int j=-1; num_fields = mysql_num_fields(mmsd->result); fields = mysql_fetch_fields(mmsd->result); - if (fields && num_fields == 1) { + if (fields && num_fields >= 1) { for(k = 0; k < num_fields; k++) { if (strcmp((char *)"read_only", (char *)fields[k].name)==0) { j=k; @@ -3447,7 +3471,29 @@ VALGRIND_ENABLE_ERROR_REPORTING; * @param discovered_servers A vector of servers discovered when querying the cluster's topology. * @param reader_hostgroup Reader hostgroup to which we will add the discovered servers. */ -void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, int reader_hostgroup) { +void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, const MySQL_Monitor_State_Data* mmsd, int num_fields) { + // Check if the query result matches the query task type: exactly 3 for Multi-AZ DB Clusters, even number for blue/green deployment + if (rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() % 2 == 0) { + // With the AWS_RDS_TOPOLOGY_CHECK, we didn't get the role and status data, so we should retry on the next read_only check with the correct query + rds_topology_check_type = AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK; + topology_loop = mysql_thread___monitor_aws_rds_topology_discovery_interval; + return; + } else if ((rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() != 3) + || (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && discovered_servers.size() % 2 != 0)) { + // Query result matches neither a Multi_AZ DB Cluster nor a Blue/Green deployment + rds_topology_check_type = AWS_RDS_TOPOLOGY_CHECK; // Set back to default rds_topology check + proxy_debug(PROXY_DEBUG_MONITOR, 7, "Got a query result for the rds_topology metadata table but it matches neither Multi-AZ DB Clusters, nor a blue/green deployment. Number of records: %d\n", discovered_servers.size()); + return; + } + + + if (num_fields < 4) { + proxy_error("Received row with too few fields. num_field = %d\n", num_fields); + return; + } + + int reader_hostgroup = mmsd->reader_hostgroup; + char *error = NULL; int cols = 0; int affected_rows = 0; @@ -3456,30 +3502,40 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers); + assert(runtime_mysql_servers); if (error) { proxy_error("Error on %s : %s\n", query, error); } else { - vector> new_servers; - vector saved_hostnames; - saved_hostnames.push_back(originating_server_hostname); + unordered_set saved_hostnames; + saved_hostnames.insert(originating_server_hostname); + vector> new_servers; - // Do an initial loop through the query results to save existing runtime server hostnames + // Do a loop through the query results to save existing runtime server hostnames for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { SQLite3_row *r1 = *it; string current_runtime_hostname = r1->fields[0]; - - saved_hostnames.push_back(current_runtime_hostname); + saved_hostnames.insert(current_runtime_hostname); } // Loop through discovered servers and process the ones we haven't saved yet - for (MYSQL_ROW s : discovered_servers) { - string current_discovered_hostname = s[2]; - string current_discovered_port_string = s[3]; - int current_discovered_port_int; - + for (MYSQL_ROW row : discovered_servers) { + if ( !row ) { + proxy_warning("Received empty RDS topology record from %s.\n", originating_server_hostname.c_str()); + continue; + } + int current_discovered_read_only = 1; +VALGRIND_DISABLE_ERROR_REPORTING; + if (row[0]) { + if (!strcmp(row[0], "0") || !strcasecmp(row[0], "OFF")) + current_discovered_read_only = 0; + } +VALGRIND_ENABLE_ERROR_REPORTING; + string current_discovered_hostname = row[2]; + string current_discovered_port_string = row[3]; + int current_discovered_port; try { - current_discovered_port_int = stoi(s[3]); + current_discovered_port = stoi(current_discovered_port_string); } catch (...) { proxy_error( "Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n", @@ -3487,16 +3543,29 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s ); return; } - - if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) { - tuple new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup); - new_servers.push_back(new_server); - saved_hostnames.push_back(current_discovered_hostname); + string current_discovered_role, current_discovered_status, current_discovered_version; + if (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && num_fields >= 7) { + current_discovered_role = row[4]; + current_discovered_status = row[5]; + current_discovered_version = row[6]; + } + if (!current_discovered_version.empty() && !is_aws_rds_topology_version_supported(current_discovered_version)) { + proxy_warning("Discovered topology version (%s) is not compatible with supported version (%s)\n", + current_discovered_version.c_str(), SUPPORTED_AWS_RDS_TOPOLOGY_VERSION); + return; } + + int current_determined_weight = -1; // TODO: Add logic for selecting a different weight based on discovered role and status + tuple discovered_server(current_discovered_hostname, current_discovered_port, reader_hostgroup, current_determined_weight); + if (!saved_hostnames.count(current_discovered_hostname)) { + // Server isn't in either hostgroup yet, adding as reader + proxy_info("%d: Adding new host '%s' to new server list in hostgroup [%ld].\n", __LINE__, std::get<0>(discovered_server).c_str(), std::get<2>(discovered_server)); + new_servers.push_back(discovered_server); + } } // Add the new servers if any - if (!new_servers.empty()) { + if (!new_servers.empty() && (rds_topology_check_type != AWS_RDS_TOPOLOGY_CHECK || is_aws_rds_multi_az_db_cluster_topology(originating_server_hostname, new_servers))) { MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); } } @@ -3508,27 +3577,65 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s * @param discovered_servers A vector of servers discovered when querying the cluster's topology. * @return Returns 'true' if all conditions are met and 'false' otherwise. */ -bool MySQL_Monitor::is_aws_rds_multi_az_db_cluster_topology(const std::vector& discovered_servers) { - if (discovered_servers.size() != 3) { +bool MySQL_Monitor::is_aws_rds_multi_az_db_cluster_topology(const string& originating_servername, const std::vector>& discovered_servers) { + if (discovered_servers.size() != 2) { return false; } - const std::vector instance_names = {"-instance-1", "-instance-2", "-instance-3"}; - int identified_hosts = 0; - for (const std::string& instance_str : instance_names) { - for (MYSQL_ROW server : discovered_servers) { - if (server[2] == NULL || (server[2][0] == '\0')) { - continue; - } + vector hostnames(1, originating_servername); + for (tuple server : discovered_servers) { + string hostname = std::get<0>(server); + if (hostname.empty()) { + continue; + } + hostnames.push_back(hostname); + } - std::string current_discovered_hostname = server[2]; - if (current_discovered_hostname.find(instance_str) != std::string::npos) { - ++identified_hosts; - break; - } + const unordered_set expected_instance_names = {"-instance-1", "-instance-2", "-instance-3"}; + unordered_set discovered_instance_names; + for( string hostname : hostnames) { + size_t domain_start = hostname.find('.'); + if (domain_start != string::npos && domain_start > 11) { + string prospect_instance_suffix = hostname.substr(domain_start - 11, 11); + discovered_instance_names.insert(prospect_instance_suffix); } } - return (identified_hosts == 3); + return (expected_instance_names == discovered_instance_names); +} + +bool MySQL_Monitor::is_aws_rds_topology_query_task(const MySQL_Monitor_State_Data_Task_Type &task_type) { + return task_type == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY + || task_type == MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY + || task_type == MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY + || task_type == MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY; +} + +bool MySQL_Monitor::mysql_row_matches_query_task(const unordered_set &field_names, const MySQL_Monitor_State_Data_Task_Type &task_type) +{ + if (task_type == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY || task_type == MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + return field_names == unordered_set({ "read_only", "id", "endpoint", "port"}); + } + return field_names == unordered_set({"read_only", "id", "endpoint", "port", "role", "status", "version"}); +} + +void MySQL_Monitor::add_topology_query_to_task(MySQL_Monitor_State_Data_Task_Type &task_type) +{ + switch (rds_topology_check_type) { + case AWS_RDS_TOPOLOGY_CHECK: + if (task_type == MON_READ_ONLY) + task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY; + else if (task_type == MON_INNODB_READ_ONLY) + task_type = MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY; + break; + case AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK: + if (task_type == MON_READ_ONLY) + task_type = MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY; + else if (task_type == MON_INNODB_READ_ONLY) + task_type = MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY; + break; + default: + proxy_warning("Attempting to add rds_topology query to unsupported read_only check."); + } } void * MySQL_Monitor::monitor_read_only() { @@ -3544,12 +3651,9 @@ void * MySQL_Monitor::monitor_read_only() { unsigned long long t1; unsigned long long t2; unsigned long long next_loop_at=0; - int topology_loop = 0; while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) { int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval; - bool do_discovery_check = false; - unsigned int glover; char *error=NULL; SQLite3_result *resultset=NULL; @@ -3565,7 +3669,6 @@ void * MySQL_Monitor::monitor_read_only() { next_loop_at=0; } - if (t1 < next_loop_at) { goto __sleep_monitor_read_only; } @@ -3584,14 +3687,19 @@ void * MySQL_Monitor::monitor_read_only() { if (topology_loop_max > 0) { // if the discovery interval is set to zero, do not query for the topology if (topology_loop >= topology_loop_max) { - do_discovery_check = true; + if (rds_topology_check_type == NONE) { + proxy_info("Setting topology check to aws_rds_topology_check\n"); + rds_topology_check_type = AWS_RDS_TOPOLOGY_CHECK; + } topology_loop = 0; } topology_loop += 1; + } else { + rds_topology_check_type = NONE; } // resultset must be initialized before calling monitor_read_only_async - monitor_read_only_async(resultset, do_discovery_check); + monitor_read_only_async(resultset); if (shutdown) return NULL; __end_monitor_read_only_loop: @@ -7564,21 +7672,25 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vectorresult); - int j = -1; + int i_ro = -1; num_fields = mysql_num_fields(mmsd->result); fields = mysql_fetch_fields(mmsd->result); - if (fields && num_fields == 1) { + unordered_set field_names; + MYSQL_ROW row; + if (fields && num_fields >= 1) { for (k = 0; k < num_fields; k++) { if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) { - j = k; + i_ro = k; } + // For multiple task types we add extra fields. To make sure all the expected fields are there, we store them + field_names.insert(string(fields[k].name)); } - if (j > -1) { - MYSQL_ROW row = mysql_fetch_row(mmsd->result); + if (i_ro > -1) { + row = mysql_fetch_row(mmsd->result); if (row) { VALGRIND_DISABLE_ERROR_REPORTING; - if (row[j]) { - if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF")) + if (row[i_ro]) { + if (!strcmp(row[i_ro], "0") || !strcasecmp(row[i_ro], "OFF")) read_only = 0; } VALGRIND_ENABLE_ERROR_REPORTING; @@ -7586,41 +7698,26 @@ VALGRIND_ENABLE_ERROR_REPORTING; } rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb); - } else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { - // Process the read_only field as above and store the first server - vector discovered_servers; - for (k = 0; k < num_fields; k++) { - if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) { - j = k; - } - } - if (j > -1) { - MYSQL_ROW row = mysql_fetch_row(mmsd->result); - if (row) { + if (fields && is_aws_rds_topology_query_task(mmsd->get_task_type()) && mysql_row_matches_query_task(field_names, mmsd->get_task_type())) { + // Process the read_only field as above and store the first server + vector discovered_servers; + discovered_servers.push_back(row); + + // Store the remaining servers + int num_rows = mysql_num_rows(mmsd->result); + for (int i = 1; i < num_rows; i++) { + row = mysql_fetch_row(mmsd->result); discovered_servers.push_back(row); -VALGRIND_DISABLE_ERROR_REPORTING; - if (row[j]) { - if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF")) - read_only = 0; - } -VALGRIND_ENABLE_ERROR_REPORTING; } - } - // Store the remaining servers - int num_rows = mysql_num_rows(mmsd->result); - for (int i = 1; i < num_rows; i++) { - MYSQL_ROW row = mysql_fetch_row(mmsd->result); - discovered_servers.push_back(row); - } - - // Process the discovered servers and add them to 'runtime_mysql_servers' (process only for AWS RDS Multi-AZ DB Clusters) - if (!discovered_servers.empty() && is_aws_rds_multi_az_db_cluster_topology(discovered_servers)) { - process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup); + // Process the discovered servers and add them to 'runtime_mysql_servers' + if (!discovered_servers.empty()) { + process_discovered_topology(originating_server_hostname, discovered_servers, mmsd, num_fields); + } + } else { + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); + rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); } - } else { - proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); - rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); } mysql_free_result(mmsd->result); mmsd->result = NULL; @@ -7678,7 +7775,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; return true; } -void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) { +void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { assert(resultset); std::vector> mmsds; @@ -7704,8 +7801,8 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d // Change task type if it's time to do discovery check. Only for aws rds endpoints string hostname = r->fields[0]; - if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { - task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY; + if (hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos && rds_topology_check_type != NONE) { + add_topology_query_to_task(task_type); } } @@ -8514,3 +8611,8 @@ void MySQL_Monitor::monitor_galera_async() { template class WorkItem; template class WorkItem; + +bool MySQL_Monitor::is_aws_rds_topology_version_supported(const string& version) { + // TODO: implement better check that considers minor and major versions + return version == SUPPORTED_AWS_RDS_TOPOLOGY_VERSION; +} From a31838d77683daa65bd7bd7887c46f3695f097c4 Mon Sep 17 00:00:00 2001 From: Niels Vogell Date: Wed, 18 Jun 2025 13:54:01 +0000 Subject: [PATCH 2/2] Removing unsafe assert and moving error logging to correct if-else block --- lib/MySQL_Monitor.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 488cb8b9d7..09f96fda8c 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3502,7 +3502,6 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers); - assert(runtime_mysql_servers); if (error) { proxy_error("Error on %s : %s\n", query, error); @@ -7698,7 +7697,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; } rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb); - if (fields && is_aws_rds_topology_query_task(mmsd->get_task_type()) && mysql_row_matches_query_task(field_names, mmsd->get_task_type())) { + if (is_aws_rds_topology_query_task(mmsd->get_task_type()) && mysql_row_matches_query_task(field_names, mmsd->get_task_type())) { // Process the read_only field as above and store the first server vector discovered_servers; discovered_servers.push_back(row); @@ -7714,10 +7713,10 @@ VALGRIND_ENABLE_ERROR_REPORTING; if (!discovered_servers.empty()) { process_discovered_topology(originating_server_hostname, discovered_servers, mmsd, num_fields); } - } else { - proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); - rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); } + } else { + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); + rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); } mysql_free_result(mmsd->result); mmsd->result = NULL;