📄 ndb_cluster_connection.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_pthread.h>#include <my_sys.h>#include "ndb_cluster_connection_impl.hpp"#include <mgmapi_configuration.hpp>#include <mgmapi_config_parameters.h>#include <TransporterFacade.hpp>#include <NdbOut.hpp>#include <NdbSleep.h>#include <NdbThread.h>#include <ndb_limits.h>#include <ConfigRetriever.hpp>#include <ndb_version.h>#include <mgmapi_debug.h>#include <mgmapi_internal.h>#include <md5_hash.hpp>#include <EventLogger.hpp>EventLogger g_eventLogger;static int g_run_connect_thread= 0;#include <NdbMutex.h>NdbMutex *ndb_global_event_buffer_mutex= NULL;#ifdef VM_TRACENdbMutex *ndb_print_state_mutex= NULL;#endif/* * Ndb_cluster_connection */Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) : m_impl(* new Ndb_cluster_connection_impl(connect_string)){}Ndb_cluster_connection::Ndb_cluster_connection(Ndb_cluster_connection_impl& impl) : m_impl(impl){}Ndb_cluster_connection::~Ndb_cluster_connection(){ Ndb_cluster_connection_impl *tmp = &m_impl; if (this != tmp) delete tmp;}int Ndb_cluster_connection::get_connected_port() const{ if (m_impl.m_config_retriever) return m_impl.m_config_retriever->get_mgmd_port(); return -1;}const char *Ndb_cluster_connection::get_connected_host() const{ if (m_impl.m_config_retriever) return m_impl.m_config_retriever->get_mgmd_host(); return 0;}const char *Ndb_cluster_connection::get_connectstring(char *buf, int buf_sz) const{ if (m_impl.m_config_retriever) return m_impl.m_config_retriever->get_connectstring(buf,buf_sz); return 0;}pthread_handler_t run_ndb_cluster_connection_connect_thread(void *me){ g_run_connect_thread= 1; ((Ndb_cluster_connection_impl*) me)->connect_thread(); return me;}int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void)){ int r; DBUG_ENTER("Ndb_cluster_connection::start_connect_thread"); m_impl.m_connect_callback= connect_callback; if ((r = connect(0,0,0)) == 1) { DBUG_PRINT("info",("starting thread")); m_impl.m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread, (void**)&m_impl, 32768, "ndb_cluster_connection", NDB_THREAD_PRIO_LOW); } else if (r < 0) { DBUG_RETURN(-1); } else if (m_impl.m_connect_callback) { (*m_impl.m_connect_callback)(); } DBUG_RETURN(0);}void Ndb_cluster_connection::set_optimized_node_selection(int val){ m_impl.m_optimized_node_selection= val;}voidNdb_cluster_connection_impl::init_get_next_node(Ndb_cluster_connection_node_iter &iter){ if (iter.scan_state != (Uint8)~0) iter.cur_pos= iter.scan_state; if (iter.cur_pos >= no_db_nodes()) iter.cur_pos= 0; iter.init_pos= iter.cur_pos; iter.scan_state= 0; // fprintf(stderr,"[init %d]",iter.init_pos); return;}Uint32Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter){ Uint32 cur_pos= iter.cur_pos; if (cur_pos >= no_db_nodes()) return 0; Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase(); Ndb_cluster_connection_impl::Node &node= nodes[cur_pos]; if (iter.scan_state != (Uint8)~0) { assert(iter.scan_state < no_db_nodes()); if (nodes[iter.scan_state].group == node.group) iter.scan_state= ~0; else return nodes[iter.scan_state++].id; } // fprintf(stderr,"[%d]",node.id); cur_pos++; Uint32 init_pos= iter.init_pos; if (cur_pos == node.next_group) { cur_pos= nodes[init_pos].this_group; } // fprintf(stderr,"[cur_pos %d]",cur_pos); if (cur_pos != init_pos) iter.cur_pos= cur_pos; else { iter.cur_pos= node.next_group; iter.init_pos= node.next_group; } return node.id;}unsignedNdb_cluster_connection::no_db_nodes(){ return m_impl.m_all_nodes.size();}unsignedNdb_cluster_connection::node_id(){ return m_impl.m_transporter_facade->ownId();}intNdb_cluster_connection::wait_until_ready(int timeout, int timeout_after_first_alive){ DBUG_ENTER("Ndb_cluster_connection::wait_until_ready"); TransporterFacade *tp = TransporterFacade::instance(); if (tp == 0) { DBUG_RETURN(-1); } if (tp->ownId() == 0) { DBUG_RETURN(-1); } int secondsCounter = 0; int milliCounter = 0; int noChecksSinceFirstAliveFound = 0; do { unsigned int foundAliveNode = 0; tp->lock_mutex(); for(unsigned i= 0; i < no_db_nodes(); i++) { //************************************************ // If any node is answering, ndb is answering //************************************************ if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) { foundAliveNode++; } } tp->unlock_mutex(); if (foundAliveNode == no_db_nodes()) { DBUG_RETURN(0); } else if (foundAliveNode > 0) { noChecksSinceFirstAliveFound++; // 100 ms delay -> 10* if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive) DBUG_RETURN(1); } else if (secondsCounter >= timeout) { // no alive nodes and timed out DBUG_RETURN(-1); } NdbSleep_MilliSleep(100); milliCounter += 100; if (milliCounter >= 1000) { secondsCounter++; milliCounter = 0; }//if } while (1);}/* * Ndb_cluster_connection_impl */Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * connect_string) : Ndb_cluster_connection(*this), m_optimized_node_selection(1), m_name(0){ DBUG_ENTER("Ndb_cluster_connection"); DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this)); g_eventLogger.createConsoleHandler(); g_eventLogger.setCategory("NdbApi"); g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR); m_connect_thread= 0; m_connect_callback= 0; if (ndb_global_event_buffer_mutex == NULL) ndb_global_event_buffer_mutex= NdbMutex_Create();#ifdef VM_TRACE if (ndb_print_state_mutex == NULL) ndb_print_state_mutex= NdbMutex_Create();#endif m_config_retriever= new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API); if (m_config_retriever->hasError()) { printf("Could not connect initialize handle to management server: %s", m_config_retriever->getErrorString()); delete m_config_retriever; m_config_retriever= 0; } if (m_name) { NdbMgmHandle h= m_config_retriever->get_mgmHandle(); ndb_mgm_set_name(h, m_name); } m_transporter_facade= TransporterFacade::theFacadeInstance= new TransporterFacade(); DBUG_VOID_RETURN;}Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl(){ DBUG_ENTER("~Ndb_cluster_connection"); TransporterFacade::stop_instance(); if (m_connect_thread) { void *status; g_run_connect_thread= 0; NdbThread_WaitFor(m_connect_thread, &status); NdbThread_Destroy(&m_connect_thread); m_connect_thread= 0; } if (m_transporter_facade != 0) { delete m_transporter_facade; if (m_transporter_facade != TransporterFacade::theFacadeInstance) abort(); TransporterFacade::theFacadeInstance= 0; } if (m_config_retriever) { delete m_config_retriever; m_config_retriever= NULL; } if (ndb_global_event_buffer_mutex != NULL) { NdbMutex_Destroy(ndb_global_event_buffer_mutex); ndb_global_event_buffer_mutex= NULL; }#ifdef VM_TRACE if (ndb_print_state_mutex != NULL) { NdbMutex_Destroy(ndb_print_state_mutex); ndb_print_state_mutex= NULL; }#endif if (m_name) free(m_name); DBUG_VOID_RETURN;}voidNdb_cluster_connection_impl::set_name(const char *name){ if (m_name) free(m_name); m_name= strdup(name); if (m_config_retriever && m_name) { NdbMgmHandle h= m_config_retriever->get_mgmHandle(); ndb_mgm_set_name(h, m_name); }}voidNdb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid, const ndb_mgm_configuration &config){ DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector"); ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION); for(iter.first(); iter.valid(); iter.next()) { Uint32 nodeid1, nodeid2, remoteNodeId, group= 5; const char * remoteHostName= 0, * localHostName= 0; if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue; if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue; if(nodeid1 != nodeid && nodeid2 != nodeid) continue; remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1); iter.get(CFG_CONNECTION_GROUP, &group); { const char * host1= 0, * host2= 0; iter.get(CFG_CONNECTION_HOSTNAME_1, &host1); iter.get(CFG_CONNECTION_HOSTNAME_2, &host2); localHostName = (nodeid == nodeid1 ? host1 : host2); remoteHostName = (nodeid == nodeid1 ? host2 : host1); } Uint32 type = ~0; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; switch(type){ case CONNECTION_TYPE_SHM:{ break; } case CONNECTION_TYPE_SCI:{ break; } case CONNECTION_TYPE_TCP:{ // connecting through localhost // check if config_hostname is local if (SocketServer::tryBind(0,remoteHostName)) group--; // upgrade group value break; } case CONNECTION_TYPE_OSE:{ break; } } m_impl.m_all_nodes.push_back(Node(group,remoteNodeId)); DBUG_PRINT("info",("saved %d %d", group,remoteNodeId)); for (int i= m_impl.m_all_nodes.size()-2; i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group; i--) { Node tmp= m_impl.m_all_nodes[i]; m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1]; m_impl.m_all_nodes[i+1]= tmp; } } int i; Uint32 cur_group, i_group= 0; cur_group= ~0; for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--) { if (m_impl.m_all_nodes[i].group != cur_group) { cur_group= m_impl.m_all_nodes[i].group; i_group= i+1; } m_impl.m_all_nodes[i].next_group= i_group; } cur_group= ~0; for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++) { if (m_impl.m_all_nodes[i].group != cur_group) { cur_group= m_impl.m_all_nodes[i].group; i_group= i; } m_impl.m_all_nodes[i].this_group= i_group; }#if 0 for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++) { fprintf(stderr, "[%d] %d %d %d %d\n", i, m_impl.m_all_nodes[i].id, m_impl.m_all_nodes[i].group, m_impl.m_all_nodes[i].this_group, m_impl.m_all_nodes[i].next_group); } do_test();#endif DBUG_VOID_RETURN;}voidNdb_cluster_connection_impl::do_test(){ Ndb_cluster_connection_node_iter iter; int n= no_db_nodes()+5; Uint32 *nodes= new Uint32[n+1]; for (int g= 0; g < n; g++) { for (int h= 0; h < n; h++) { Uint32 id; Ndb_cluster_connection_node_iter iter2; { for (int j= 0; j < g; j++) { nodes[j]= get_next_node(iter2); } } for (int i= 0; i < n; i++) { init_get_next_node(iter); fprintf(stderr, "%d dead:(", g); id= 0; while (id == 0) { if ((id= get_next_node(iter)) == 0) break; for (int j= 0; j < g; j++) { if (nodes[j] == id) { fprintf(stderr, " %d", id); id= 0; break; } } } fprintf(stderr, ")"); if (id == 0) { break; } fprintf(stderr, " %d\n", id); } fprintf(stderr, "\n"); } } delete [] nodes;}void Ndb_cluster_connection::set_name(const char *name){ m_impl.set_name(name);}int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, int verbose){ struct ndb_mgm_reply mgm_reply; DBUG_ENTER("Ndb_cluster_connection::connect"); const char* error = 0; do { if (m_impl.m_config_retriever == 0) DBUG_RETURN(-1); if (m_impl.m_config_retriever->do_connect(no_retries, retry_delay_in_seconds, verbose)) DBUG_RETURN(1); // mgmt server not up yet Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/, 3/*delay*/); if(nodeId == 0) break; ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig(); if(props == 0) break; m_impl.m_transporter_facade->start_instance(nodeId, props); m_impl.init_nodes_vector(nodeId, *props); for(unsigned i=0; i<m_impl.m_transporter_facade->get_registry()->m_transporter_interface.size(); i++) ndb_mgm_set_connection_int_parameter(m_impl.m_config_retriever->get_mgmHandle(), nodeId, m_impl.m_transporter_facade->get_registry() ->m_transporter_interface[i] .m_remote_nodeId, CFG_CONNECTION_SERVER_PORT, m_impl.m_transporter_facade->get_registry() ->m_transporter_interface[i] .m_s_service_port, &mgm_reply); ndb_mgm_destroy_configuration(props); m_impl.m_transporter_facade->connected(); DBUG_RETURN(0); } while(0); ndbout << "Configuration error: "; const char* erString = m_impl.m_config_retriever->getErrorString(); if (erString == 0) { erString = "No error specified!"; } ndbout << erString << endl; DBUG_RETURN(-1);}void Ndb_cluster_connection_impl::connect_thread(){ DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread"); int r; do { NdbSleep_SecSleep(1); if ((r = connect(0,0,0)) == 0) break; if (r == -1) { printf("Ndb_cluster_connection::connect_thread error\n"); DBUG_ASSERT(false); g_run_connect_thread= 0; } else { // Wait before making a new connect attempt NdbSleep_SecSleep(1); } } while (g_run_connect_thread); if (m_connect_callback) (*m_connect_callback)(); DBUG_VOID_RETURN;}template class Vector<Ndb_cluster_connection_impl::Node>;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -