📄 transporterregistry.cpp
字号:
SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)) { if(t->isConnected()) { t->doSend(); } } }#endif}intTransporterRegistry::forceSendCheck(int sendLimit){ int tSendCounter = sendCounter; sendCounter = tSendCounter + 1; if (tSendCounter >= sendLimit) { performSend(); sendCounter = 1; return 1; }//if return 0;}//TransporterRegistry::forceSendCheck()#ifdef DEBUG_TRANSPORTERvoidTransporterRegistry::printState(){ ndbout << "-- TransporterRegistry -- " << endl << endl << "Transporters = " << nTransporters << endl; for(int i = 0; i<maxTransporters; i++) if(theTransporters[i] != NULL){ const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId(); ndbout << "Transporter: " << remoteNodeId << " PerformState: " << performStates[remoteNodeId] << " IOState: " << ioStates[remoteNodeId] << endl; }}#endifIOStateTransporterRegistry::ioState(NodeId nodeId) { return ioStates[nodeId]; }voidTransporterRegistry::setIOState(NodeId nodeId, IOState state) { DEBUG("TransporterRegistry::setIOState(" << nodeId << ", " << state << ")"); ioStates[nodeId] = state;}static void * run_start_clients_C(void * me){ ((TransporterRegistry*) me)->start_clients_thread(); return 0;}// Run by kernel threadvoidTransporterRegistry::do_connect(NodeId node_id){ PerformState &curr_state = performStates[node_id]; switch(curr_state){ case DISCONNECTED: break; case CONNECTED: return; case CONNECTING: return; case DISCONNECTING: break; } DBUG_ENTER("TransporterRegistry::do_connect"); DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id)); curr_state= CONNECTING; DBUG_VOID_RETURN;}voidTransporterRegistry::do_disconnect(NodeId node_id){ PerformState &curr_state = performStates[node_id]; switch(curr_state){ case DISCONNECTED: return; case CONNECTED: break; case CONNECTING: break; case DISCONNECTING: return; } DBUG_ENTER("TransporterRegistry::do_disconnect"); DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id)); curr_state= DISCONNECTING; DBUG_VOID_RETURN;}voidTransporterRegistry::report_connect(NodeId node_id){ DBUG_ENTER("TransporterRegistry::report_connect"); DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id)); performStates[node_id] = CONNECTED; reportConnect(callbackObj, node_id); DBUG_VOID_RETURN;}voidTransporterRegistry::report_disconnect(NodeId node_id, int errnum){ DBUG_ENTER("TransporterRegistry::report_disconnect"); DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id)); performStates[node_id] = DISCONNECTED; reportDisconnect(callbackObj, node_id, errnum); DBUG_VOID_RETURN;}voidTransporterRegistry::update_connections(){ for (int i= 0, n= 0; n < nTransporters; i++){ Transporter * t = theTransporters[i]; if (!t) continue; n++; const NodeId nodeId = t->getRemoteNodeId(); switch(performStates[nodeId]){ case CONNECTED: case DISCONNECTED: break; case CONNECTING: if(t->isConnected()) report_connect(nodeId); break; case DISCONNECTING: if(!t->isConnected()) report_disconnect(nodeId, 0); break; } }}// run as own threadvoidTransporterRegistry::start_clients_thread(){ DBUG_ENTER("TransporterRegistry::start_clients_thread"); while (m_run_start_clients_thread) { NdbSleep_MilliSleep(100); for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){ Transporter * t = theTransporters[i]; if (!t) continue; n++; const NodeId nodeId = t->getRemoteNodeId(); switch(performStates[nodeId]){ case CONNECTING: if(!t->isConnected() && !t->isServer) { bool connected= false; /** * First, we try to connect (if we have a port number). */ if (t->get_s_port()) connected= t->connect_client(); /** * If dynamic, get the port for connecting from the management server */ if( !connected && t->get_s_port() <= 0) { // Port is dynamic int server_port= 0; struct ndb_mgm_reply mgm_reply; if(!ndb_mgm_is_connected(m_mgm_handle)) ndb_mgm_connect(m_mgm_handle, 0, 0, 0); if(ndb_mgm_is_connected(m_mgm_handle)) { int res= ndb_mgm_get_connection_int_parameter(m_mgm_handle, t->getRemoteNodeId(), t->getLocalNodeId(), CFG_CONNECTION_SERVER_PORT, &server_port, &mgm_reply); DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)", server_port,t->getRemoteNodeId(), t->getLocalNodeId(),res)); if( res >= 0 ) { /** * Server_port == 0 just means that that a mgmt server * has not received a new port yet. Keep the old. */ if (server_port) t->set_s_port(server_port); } else if(ndb_mgm_is_connected(m_mgm_handle)) { ndbout_c("Failed to get dynamic port to connect to: %d", res); ndb_mgm_disconnect(m_mgm_handle); } else { ndbout_c("Management server closed connection early. " "It is probably being shut down (or has crashed). " "We will retry the connection."); } } /** else * We will not be able to get a new port unless * the m_mgm_handle is connected. Note that not * being connected is an ok state, just continue * until it is able to connect. Continue using the * old port until we can connect again and get a * new port. */ } } break; case DISCONNECTING: if(t->isConnected()) t->doDisconnect(); break; default: break; } } } DBUG_VOID_RETURN;}boolTransporterRegistry::start_clients(){ m_run_start_clients_thread= true; m_start_clients_thread= NdbThread_Create(run_start_clients_C, (void**)this, 32768, "ndb_start_clients", NDB_THREAD_PRIO_LOW); if (m_start_clients_thread == 0) { m_run_start_clients_thread= false; return false; } return true;}boolTransporterRegistry::stop_clients(){ if (m_start_clients_thread) { m_run_start_clients_thread= false; void* status; NdbThread_WaitFor(m_start_clients_thread, &status); NdbThread_Destroy(&m_start_clients_thread); } return true;}voidTransporterRegistry::add_transporter_interface(NodeId remoteNodeId, const char *interf, int s_port){ DBUG_ENTER("TransporterRegistry::add_transporter_interface"); DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port)); if (interf && strlen(interf) == 0) interf= 0; for (unsigned i= 0; i < m_transporter_interface.size(); i++) { Transporter_interface &tmp= m_transporter_interface[i]; if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0) continue; if (interf != 0 && tmp.m_interface != 0 && strcmp(interf, tmp.m_interface) == 0) { DBUG_VOID_RETURN; // found match, no need to insert } if (interf == 0 && tmp.m_interface == 0) { DBUG_VOID_RETURN; // found match, no need to insert } } Transporter_interface t; t.m_remote_nodeId= remoteNodeId; t.m_s_service_port= s_port; t.m_interface= interf; m_transporter_interface.push_back(t); DBUG_PRINT("exit",("interface and port added")); DBUG_VOID_RETURN;}boolTransporterRegistry::start_service(SocketServer& socket_server){ struct ndb_mgm_reply mgm_reply; DBUG_ENTER("TransporterRegistry::start_service"); if (m_transporter_interface.size() > 0 && !nodeIdSpecified) { ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified"); DBUG_RETURN(false); } for (unsigned i= 0; i < m_transporter_interface.size(); i++) { Transporter_interface &t= m_transporter_interface[i]; unsigned short port= (unsigned short)t.m_s_service_port; if(t.m_s_service_port<0) port= -t.m_s_service_port; // is a dynamic port TransporterService *transporter_service = new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd")); if(!socket_server.setup(transporter_service, &port, t.m_interface)) { DBUG_PRINT("info", ("Trying new port")); port= 0; if(t.m_s_service_port>0 || !socket_server.setup(transporter_service, &port, t.m_interface)) { /* * If it wasn't a dynamically allocated port, or * our attempts at getting a new dynamic port failed */ ndbout_c("Unable to setup transporter service port: %s:%d!\n" "Please check if the port is already used,\n" "(perhaps the node is already running)", t.m_interface ? t.m_interface : "*", t.m_s_service_port); delete transporter_service; DBUG_RETURN(false); } } t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port)); transporter_service->setTransporterRegistry(this); } DBUG_RETURN(true);}#ifdef NDB_SHM_TRANSPORTERstaticRETSIGTYPE shm_sig_handler(int signo){ g_shm_counter++;}#endifvoidTransporterRegistry::startReceiving(){ DBUG_ENTER("TransporterRegistry::startReceiving");#ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != NULL){ theOSEReceiver->createPhantom(); }#endif#ifdef NDB_OSE theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0);#endif#if defined NDB_OSE || defined NDB_SOFTOSE theReceiverPid = current_process(); for(int i = 0; i<nTCPTransporters; i++) theTCPTransporters[i]->theReceiverPid = theReceiverPid;#endif#ifdef NDB_SHM_TRANSPORTER m_shm_own_pid = getpid(); if (g_ndb_shm_signum) { DBUG_PRINT("info",("Install signal handler for signum %d", g_ndb_shm_signum)); struct sigaction sa; sigemptyset(&sa.sa_mask); sigaddset(&sa.sa_mask, g_ndb_shm_signum); pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0); sa.sa_handler = shm_sig_handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; int ret; while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR); if(ret != 0) { DBUG_PRINT("error",("Install failed")); g_eventLogger.error("Failed to install signal handler for" " SHM transporter, signum %d, errno: %d (%s)", g_ndb_shm_signum, errno, strerror(errno)); } }#endif // NDB_SHM_TRANSPORTER DBUG_VOID_RETURN;}voidTransporterRegistry::stopReceiving(){#ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != NULL){ theOSEReceiver->destroyPhantom(); }#endif /** * Disconnect all transporters, this includes detach from remote node * and since that must be done from the same process that called attach * it's done here in the receive thread */ disconnectAll();#if defined NDB_OSE || defined NDB_SOFTOSE if(theOSEJunkSocketRecv > 0) close(theOSEJunkSocketRecv); theOSEJunkSocketRecv = -1;#endif}voidTransporterRegistry::startSending(){#if defined NDB_OSE || defined NDB_SOFTOSE theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0);#endif}voidTransporterRegistry::stopSending(){#if defined NDB_OSE || defined NDB_SOFTOSE if(theOSEJunkSocketSend > 0) close(theOSEJunkSocketSend); theOSEJunkSocketSend = -1;#endif}NdbOut & operator <<(NdbOut & out, SignalHeader & sh){ out << "-- Signal Header --" << endl; out << "theLength: " << sh.theLength << endl; out << "gsn: " << sh.theVerId_signalNumber << endl; out << "recBlockNo: " << sh.theReceiversBlockNumber << endl; out << "sendBlockRef: " << sh.theSendersBlockRef << endl; out << "sendersSig: " << sh.theSendersSignalId << endl; out << "theSignalId: " << sh.theSignalId << endl; out << "trace: " << (int)sh.theTrace << endl; return out;} Transporter*TransporterRegistry::get_transporter(NodeId nodeId) { return theTransporters[nodeId];}bool TransporterRegistry::connect_client(NdbMgmHandle *h){ DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)"); Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h); if(!mgm_nodeid) { ndbout_c("%s: %d", __FILE__, __LINE__); return false; } Transporter * t = theTransporters[mgm_nodeid]; if (!t) { ndbout_c("%s: %d", __FILE__, __LINE__); return false; } DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));}/** * Given a connected NdbMgmHandle, turns it into a transporter * and returns the socket. */NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h){ struct ndb_mgm_reply mgm_reply; if ( h==NULL || *h == NULL ) { ndbout_c("%s: %d", __FILE__, __LINE__); return NDB_INVALID_SOCKET; } for(unsigned int i=0;i < m_transporter_interface.size();i++) if (m_transporter_interface[i].m_s_service_port < 0 && ndb_mgm_set_connection_int_parameter(*h, get_localNodeId(), m_transporter_interface[i].m_remote_nodeId, CFG_CONNECTION_SERVER_PORT, m_transporter_interface[i].m_s_service_port, &mgm_reply) < 0) { ndbout_c("Error: %s: %d", ndb_mgm_get_latest_error_desc(*h), ndb_mgm_get_latest_error(*h)); ndbout_c("%s: %d", __FILE__, __LINE__); ndb_mgm_destroy_handle(h); return NDB_INVALID_SOCKET; } /** * convert_to_transporter also disposes of the handle (i.e. we don't leak * memory here. */ NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h); if ( sockfd == NDB_INVALID_SOCKET) { ndbout_c("Error: %s: %d", ndb_mgm_get_latest_error_desc(*h), ndb_mgm_get_latest_error(*h)); ndbout_c("%s: %d", __FILE__, __LINE__); ndb_mgm_destroy_handle(h); } return sockfd;}/** * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter * and returns the socket. */NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc){ NdbMgmHandle h= ndb_mgm_create_handle(); if ( h == NULL ) { return NDB_INVALID_SOCKET; } /** * Set connectstring */ { BaseString cs; cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port()); ndb_mgm_set_connectstring(h, cs.c_str()); } if(ndb_mgm_connect(h, 0, 0, 0)<0) { ndb_mgm_destroy_handle(&h); return NDB_INVALID_SOCKET; } return connect_ndb_mgmd(&h);}template class Vector<TransporterRegistry::Transporter_interface>;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -