⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transporterregistry.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
    SHM_Transporter  *t = theSHMTransporters[i];    const NodeId nodeId = t->getRemoteNodeId();    if(is_connected(nodeId))    {      if(t->isConnected())      {	t->doSend();      }    }  }#endif}intTransporterRegistry::forceSendCheck(int sendLimit){  int tSendCounter = sendCounter;  sendCounter = tSendCounter + 1;  if (tSendCounter >= sendLimit) {    performSend();    sendCounter = 1;    return 1;  }//if  return 0;}//TransporterRegistry::forceSendCheck()#ifdef DEBUG_TRANSPORTERvoidTransporterRegistry::printState(){  ndbout << "-- TransporterRegistry -- " << endl << endl	 << "Transporters = " << nTransporters << endl;  for(int i = 0; i<maxTransporters; i++)    if(theTransporters[i] != NULL){      const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();      ndbout << "Transporter: " << remoteNodeId 	     << " PerformState: " << performStates[remoteNodeId]	     << " IOState: " << ioStates[remoteNodeId] << endl;    }}#endifIOStateTransporterRegistry::ioState(NodeId nodeId) {   return ioStates[nodeId]; }voidTransporterRegistry::setIOState(NodeId nodeId, IOState state) {  DEBUG("TransporterRegistry::setIOState("	<< nodeId << ", " << state << ")");  ioStates[nodeId] = state;}static void * run_start_clients_C(void * me){  ((TransporterRegistry*) me)->start_clients_thread();  return 0;}// Run by kernel threadvoidTransporterRegistry::do_connect(NodeId node_id){  PerformState &curr_state = performStates[node_id];  switch(curr_state){  case DISCONNECTED:    break;  case CONNECTED:    return;  case CONNECTING:    return;  case DISCONNECTING:    break;  }  DBUG_ENTER("TransporterRegistry::do_connect");  DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));  curr_state= CONNECTING;  DBUG_VOID_RETURN;}voidTransporterRegistry::do_disconnect(NodeId node_id){  PerformState &curr_state = performStates[node_id];  switch(curr_state){  case DISCONNECTED:    return;  case CONNECTED:    break;  case CONNECTING:    break;  case DISCONNECTING:    return;  }  DBUG_ENTER("TransporterRegistry::do_disconnect");  DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));  curr_state= DISCONNECTING;  DBUG_VOID_RETURN;}voidTransporterRegistry::report_connect(NodeId node_id){  DBUG_ENTER("TransporterRegistry::report_connect");  DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));  performStates[node_id] = CONNECTED;  reportConnect(callbackObj, node_id);  DBUG_VOID_RETURN;}voidTransporterRegistry::report_disconnect(NodeId node_id, int errnum){  DBUG_ENTER("TransporterRegistry::report_disconnect");  DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));  performStates[node_id] = DISCONNECTED;  reportDisconnect(callbackObj, node_id, errnum);  DBUG_VOID_RETURN;}voidTransporterRegistry::update_connections(){  for (int i= 0, n= 0; n < nTransporters; i++){    Transporter * t = theTransporters[i];    if (!t)      continue;    n++;    const NodeId nodeId = t->getRemoteNodeId();    switch(performStates[nodeId]){    case CONNECTED:    case DISCONNECTED:      break;    case CONNECTING:      if(t->isConnected())	report_connect(nodeId);      break;    case DISCONNECTING:      if(!t->isConnected())	report_disconnect(nodeId, 0);      break;    }  }}// run as own threadvoidTransporterRegistry::start_clients_thread(){  DBUG_ENTER("TransporterRegistry::start_clients_thread");  while (m_run_start_clients_thread) {    NdbSleep_MilliSleep(100);    for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){      Transporter * t = theTransporters[i];      if (!t)	continue;      n++;      const NodeId nodeId = t->getRemoteNodeId();      switch(performStates[nodeId]){      case CONNECTING:	if(!t->isConnected() && !t->isServer) {	  bool connected= false;	  /**	   * First, we try to connect (if we have a port number).	   */	  if (t->get_s_port())	    connected= t->connect_client();	  /**	   * If dynamic, get the port for connecting from the management server	   */	  if( !connected && t->get_s_port() <= 0) {	// Port is dynamic	    int server_port= 0;	    struct ndb_mgm_reply mgm_reply;	    if(!ndb_mgm_is_connected(m_mgm_handle))	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);	    	    if(ndb_mgm_is_connected(m_mgm_handle))	    {	      int res=		ndb_mgm_get_connection_int_parameter(m_mgm_handle,						     t->getRemoteNodeId(),						     t->getLocalNodeId(),						     CFG_CONNECTION_SERVER_PORT,						     &server_port,						     &mgm_reply);	      DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",				 server_port,t->getRemoteNodeId(),				 t->getLocalNodeId(),res));	      if( res >= 0 )	      {		/**		 * Server_port == 0 just means that that a mgmt server		 * has not received a new port yet. Keep the old.		 */		if (server_port)		  t->set_s_port(server_port);	      }	      else if(ndb_mgm_is_connected(m_mgm_handle))	      {		ndbout_c("Failed to get dynamic port to connect to: %d", res);		ndb_mgm_disconnect(m_mgm_handle);	      }	      else	      {		ndbout_c("Management server closed connection early. "			 "It is probably being shut down (or has crashed). "			 "We will retry the connection.");	      }	    }	    /** else	     * We will not be able to get a new port unless	     * the m_mgm_handle is connected. Note that not	     * being connected is an ok state, just continue	     * until it is able to connect. Continue using the	     * old port until we can connect again and get a	     * new port.	     */	  }	}	break;      case DISCONNECTING:	if(t->isConnected())	  t->doDisconnect();	break;      default:	break;      }    }  }  DBUG_VOID_RETURN;}boolTransporterRegistry::start_clients(){  m_run_start_clients_thread= true;  m_start_clients_thread= NdbThread_Create(run_start_clients_C,					   (void**)this,					   32768,					   "ndb_start_clients",					   NDB_THREAD_PRIO_LOW);  if (m_start_clients_thread == 0) {    m_run_start_clients_thread= false;    return false;  }  return true;}boolTransporterRegistry::stop_clients(){  if (m_start_clients_thread) {    m_run_start_clients_thread= false;    void* status;    NdbThread_WaitFor(m_start_clients_thread, &status);    NdbThread_Destroy(&m_start_clients_thread);  }  return true;}voidTransporterRegistry::add_transporter_interface(NodeId remoteNodeId,					       const char *interf, 					       int s_port){  DBUG_ENTER("TransporterRegistry::add_transporter_interface");  DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));  if (interf && strlen(interf) == 0)    interf= 0;  for (unsigned i= 0; i < m_transporter_interface.size(); i++)  {    Transporter_interface &tmp= m_transporter_interface[i];    if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)      continue;    if (interf != 0 && tmp.m_interface != 0 &&	strcmp(interf, tmp.m_interface) == 0)    {      DBUG_VOID_RETURN; // found match, no need to insert    }    if (interf == 0 && tmp.m_interface == 0)    {      DBUG_VOID_RETURN; // found match, no need to insert    }  }  Transporter_interface t;  t.m_remote_nodeId= remoteNodeId;  t.m_s_service_port= s_port;  t.m_interface= interf;  m_transporter_interface.push_back(t);  DBUG_PRINT("exit",("interface and port added"));  DBUG_VOID_RETURN;}boolTransporterRegistry::start_service(SocketServer& socket_server){  struct ndb_mgm_reply mgm_reply;  DBUG_ENTER("TransporterRegistry::start_service");  if (m_transporter_interface.size() > 0 && !nodeIdSpecified)  {    ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");    DBUG_RETURN(false);  }  for (unsigned i= 0; i < m_transporter_interface.size(); i++)  {    Transporter_interface &t= m_transporter_interface[i];    unsigned short port= (unsigned short)t.m_s_service_port;    if(t.m_s_service_port<0)      port= -t.m_s_service_port; // is a dynamic port    TransporterService *transporter_service =      new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));    if(!socket_server.setup(transporter_service,			    &port, t.m_interface))    {      DBUG_PRINT("info", ("Trying new port"));      port= 0;      if(t.m_s_service_port>0	 || !socket_server.setup(transporter_service,				 &port, t.m_interface))      {	/*	 * If it wasn't a dynamically allocated port, or	 * our attempts at getting a new dynamic port failed	 */	ndbout_c("Unable to setup transporter service port: %s:%d!\n"		 "Please check if the port is already used,\n"		 "(perhaps the node is already running)",		 t.m_interface ? t.m_interface : "*", t.m_s_service_port);	delete transporter_service;	DBUG_RETURN(false);      }    }    t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic    DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));    transporter_service->setTransporterRegistry(this);  }  DBUG_RETURN(true);}#ifdef NDB_SHM_TRANSPORTERstaticRETSIGTYPE shm_sig_handler(int signo){  g_shm_counter++;}#endifvoidTransporterRegistry::startReceiving(){  DBUG_ENTER("TransporterRegistry::startReceiving");#ifdef NDB_OSE_TRANSPORTER  if(theOSEReceiver != NULL){    theOSEReceiver->createPhantom();  }#endif#ifdef NDB_OSE  theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0);#endif#if defined NDB_OSE || defined NDB_SOFTOSE  theReceiverPid = current_process();  for(int i = 0; i<nTCPTransporters; i++)    theTCPTransporters[i]->theReceiverPid = theReceiverPid;#endif#ifdef NDB_SHM_TRANSPORTER  m_shm_own_pid = getpid();  if (g_ndb_shm_signum)  {    DBUG_PRINT("info",("Install signal handler for signum %d",		       g_ndb_shm_signum));    struct sigaction sa;    sigemptyset(&sa.sa_mask);    sigaddset(&sa.sa_mask, g_ndb_shm_signum);    pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);    sa.sa_handler = shm_sig_handler;    sigemptyset(&sa.sa_mask);    sa.sa_flags = 0;    int ret;    while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);    if(ret != 0)    {      DBUG_PRINT("error",("Install failed"));      g_eventLogger.error("Failed to install signal handler for"			  " SHM transporter, signum %d, errno: %d (%s)",			  g_ndb_shm_signum, errno, strerror(errno));    }  }#endif // NDB_SHM_TRANSPORTER  DBUG_VOID_RETURN;}voidTransporterRegistry::stopReceiving(){#ifdef NDB_OSE_TRANSPORTER  if(theOSEReceiver != NULL){    theOSEReceiver->destroyPhantom();  }#endif  /**   * Disconnect all transporters, this includes detach from remote node   * and since that must be done from the same process that called attach   * it's done here in the receive thread   */  disconnectAll();#if defined NDB_OSE || defined NDB_SOFTOSE  if(theOSEJunkSocketRecv > 0)    close(theOSEJunkSocketRecv);  theOSEJunkSocketRecv = -1;#endif}voidTransporterRegistry::startSending(){#if defined NDB_OSE || defined NDB_SOFTOSE  theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0);#endif}voidTransporterRegistry::stopSending(){#if defined NDB_OSE || defined NDB_SOFTOSE  if(theOSEJunkSocketSend > 0)    close(theOSEJunkSocketSend);  theOSEJunkSocketSend = -1;#endif}NdbOut & operator <<(NdbOut & out, SignalHeader & sh){  out << "-- Signal Header --" << endl;  out << "theLength:    " << sh.theLength << endl;  out << "gsn:          " << sh.theVerId_signalNumber << endl;  out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;  out << "sendBlockRef: " << sh.theSendersBlockRef << endl;  out << "sendersSig:   " << sh.theSendersSignalId << endl;  out << "theSignalId:  " << sh.theSignalId << endl;  out << "trace:        " << (int)sh.theTrace << endl;  return out;} Transporter*TransporterRegistry::get_transporter(NodeId nodeId) {  return theTransporters[nodeId];}bool TransporterRegistry::connect_client(NdbMgmHandle *h){  DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");  Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);  if(!mgm_nodeid)  {    ndbout_c("%s: %d", __FILE__, __LINE__);    return false;  }  Transporter * t = theTransporters[mgm_nodeid];  if (!t)  {    ndbout_c("%s: %d", __FILE__, __LINE__);    return false;  }  DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));}/** * Given a connected NdbMgmHandle, turns it into a transporter * and returns the socket. */NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h){  struct ndb_mgm_reply mgm_reply;  if ( h==NULL || *h == NULL )  {    ndbout_c("%s: %d", __FILE__, __LINE__);    return NDB_INVALID_SOCKET;  }  for(unsigned int i=0;i < m_transporter_interface.size();i++)    if (m_transporter_interface[i].m_s_service_port < 0	&& ndb_mgm_set_connection_int_parameter(*h,				   get_localNodeId(),				   m_transporter_interface[i].m_remote_nodeId,				   CFG_CONNECTION_SERVER_PORT,				   m_transporter_interface[i].m_s_service_port,				   &mgm_reply) < 0)    {      ndbout_c("Error: %s: %d",	       ndb_mgm_get_latest_error_desc(*h),	       ndb_mgm_get_latest_error(*h));      ndbout_c("%s: %d", __FILE__, __LINE__);      ndb_mgm_destroy_handle(h);      return NDB_INVALID_SOCKET;    }  /**   * convert_to_transporter also disposes of the handle (i.e. we don't leak   * memory here.   */  NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);  if ( sockfd == NDB_INVALID_SOCKET)  {    ndbout_c("Error: %s: %d",	     ndb_mgm_get_latest_error_desc(*h),	     ndb_mgm_get_latest_error(*h));    ndbout_c("%s: %d", __FILE__, __LINE__);    ndb_mgm_destroy_handle(h);  }  return sockfd;}/** * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter * and returns the socket. */NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc){  NdbMgmHandle h= ndb_mgm_create_handle();  if ( h == NULL )  {    return NDB_INVALID_SOCKET;  }  /**   * Set connectstring   */  {    BaseString cs;    cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());    ndb_mgm_set_connectstring(h, cs.c_str());  }  if(ndb_mgm_connect(h, 0, 0, 0)<0)  {    ndb_mgm_destroy_handle(&h);    return NDB_INVALID_SOCKET;  }  return connect_ndb_mgmd(&h);}template class Vector<TransporterRegistry::Transporter_interface>;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -