⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transporterfacade.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
 * Note that this function need no locking since its * only called from the destructor of Ndb (the NdbObject) *  * Which is protected by a mutex */voidTransporterFacade::stop_instance(){  DBUG_ENTER("TransporterFacade::stop_instance");  if(theFacadeInstance)    theFacadeInstance->doStop();  DBUG_VOID_RETURN;}voidTransporterFacade::doStop(){  DBUG_ENTER("TransporterFacade::doStop");  /**   * First stop the ClusterMgr because it needs to send one more signal   * and also uses theFacadeInstance to lock/unlock theMutexPtr   */  if (theClusterMgr != NULL) theClusterMgr->doStop();  if (theArbitMgr != NULL) theArbitMgr->doStop(NULL);    /**   * Now stop the send and receive threads   */  void *status;  theStopReceive = 1;  if (theReceiveThread) {    NdbThread_WaitFor(theReceiveThread, &status);    NdbThread_Destroy(&theReceiveThread);  }  if (theSendThread) {    NdbThread_WaitFor(theSendThread, &status);    NdbThread_Destroy(&theSendThread);  }  DBUG_VOID_RETURN;}extern "C" void* runSendRequest_C(void * me){  ((TransporterFacade*) me)->threadMainSend();  return 0;}void TransporterFacade::threadMainSend(void){  theTransporterRegistry->startSending();  if (!theTransporterRegistry->start_clients()){    ndbout_c("Unable to start theTransporterRegistry->start_clients");    exit(0);  }  m_socket_server.startServer();  while(!theStopReceive) {    NdbSleep_MilliSleep(10);    NdbMutex_Lock(theMutexPtr);    if (sendPerformedLastInterval == 0) {      theTransporterRegistry->performSend();    }    sendPerformedLastInterval = 0;    NdbMutex_Unlock(theMutexPtr);  }  theTransporterRegistry->stopSending();  m_socket_server.stopServer();  m_socket_server.stopSessions(true);  theTransporterRegistry->stop_clients();}extern "C" void* runReceiveResponse_C(void * me){  ((TransporterFacade*) me)->threadMainReceive();  return 0;}void TransporterFacade::threadMainReceive(void){  theTransporterRegistry->startReceiving();  NdbMutex_Lock(theMutexPtr);  theTransporterRegistry->update_connections();  NdbMutex_Unlock(theMutexPtr);  while(!theStopReceive) {    for(int i = 0; i<10; i++){      const int res = theTransporterRegistry->pollReceive(10);      if(res > 0){        NdbMutex_Lock(theMutexPtr);        theTransporterRegistry->performReceive();        NdbMutex_Unlock(theMutexPtr);      }    }    NdbMutex_Lock(theMutexPtr);    theTransporterRegistry->update_connections();    NdbMutex_Unlock(theMutexPtr);  }//while  theTransporterRegistry->stopReceiving();}TransporterFacade::TransporterFacade() :  theTransporterRegistry(0),  theStopReceive(0),  theSendThread(NULL),  theReceiveThread(NULL),  m_fragmented_signal_id(0){  DBUG_ENTER("TransporterFacade::TransporterFacade");  theOwnId = 0;  theMutexPtr = NdbMutex_Create();  sendPerformedLastInterval = 0;  checkCounter = 4;  currentSendLimit = 1;  theClusterMgr = NULL;  theArbitMgr = NULL;  theStartNodeId = 1;  m_scan_batch_size= MAX_SCAN_BATCH_SIZE;  m_batch_byte_size= SCAN_BATCH_SIZE;  m_batch_size= DEF_BATCH_SIZE;  m_max_trans_id = 0;  theClusterMgr = new ClusterMgr(* this);  DBUG_VOID_RETURN;}boolTransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props){  DBUG_ENTER("TransporterFacade::init");  theOwnId = nodeId;  theTransporterRegistry = new TransporterRegistry(this);  const int res = IPCConfig::configureTransporters(nodeId, 						   * props, 						   * theTransporterRegistry);  if(res <= 0){    TRP_DEBUG( "configureTransporters returned 0 or less" );    DBUG_RETURN(false);  }    ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);  iter.first();  theClusterMgr->init(iter);    iter.first();  if(iter.find(CFG_NODE_ID, nodeId)){    TRP_DEBUG( "Node info missing from config." );    DBUG_RETURN(false);  }    Uint32 rank = 0;  if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){    theArbitMgr = new ArbitMgr(* this);    theArbitMgr->setRank(rank);    Uint32 delay = 0;    iter.get(CFG_NODE_ARBIT_DELAY, &delay);    theArbitMgr->setDelay(delay);  }  Uint32 scan_batch_size= 0;  if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {    m_scan_batch_size= scan_batch_size;  }  Uint32 batch_byte_size= 0;  if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {    m_batch_byte_size= batch_byte_size;  }  Uint32 batch_size= 0;  if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {    m_batch_size= batch_size;  }    Uint32 timeout = 120000;  iter.first();  for (iter.first(); iter.valid(); iter.next())  {    Uint32 tmp1 = 0, tmp2 = 0;    iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);    iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);    tmp1 += tmp2;    if (tmp1 > timeout)      timeout = tmp1;  }  m_waitfor_timeout = timeout;    if (!theTransporterRegistry->start_service(m_socket_server)){    ndbout_c("Unable to start theTransporterRegistry->start_service");    DBUG_RETURN(false);  }  theReceiveThread = NdbThread_Create(runReceiveResponse_C,                                      (void**)this,                                      32768,                                      "ndb_receive",                                      NDB_THREAD_PRIO_LOW);  theSendThread = NdbThread_Create(runSendRequest_C,                                   (void**)this,                                   32768,                                   "ndb_send",                                   NDB_THREAD_PRIO_LOW);  theClusterMgr->startThread();  #ifdef API_TRACE  signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);#endif    DBUG_RETURN(true);}voidTransporterFacade::connected(){  DBUG_ENTER("TransporterFacade::connected");  Uint32 sz = m_threads.m_statusNext.size();  for (Uint32 i = 0; i < sz ; i ++) {    if (m_threads.getInUse(i)){      void * obj = m_threads.m_objectExecute[i].m_object;      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];      (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);    }  }  DBUG_VOID_RETURN;}voidTransporterFacade::ReportNodeDead(NodeId tNodeId){  /**   * When a node fails we must report this to each Ndb object.    * The function that is used for communicating node failures is called.   * This is to ensure that the Ndb objects do not think their connections    * are correct after a failure followed by a restart.    * After the restart the node is up again and the Ndb object    * might not have noticed the failure.   */  Uint32 sz = m_threads.m_statusNext.size();  for (Uint32 i = 0; i < sz ; i ++) {    if (m_threads.getInUse(i)){      void * obj = m_threads.m_objectExecute[i].m_object;      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];      (*RegPC) (obj, tNodeId, false, false);    }  }}voidTransporterFacade::ReportNodeFailureComplete(NodeId tNodeId){  /**   * When a node fails we must report this to each Ndb object.    * The function that is used for communicating node failures is called.   * This is to ensure that the Ndb objects do not think their connections    * are correct after a failure followed by a restart.    * After the restart the node is up again and the Ndb object    * might not have noticed the failure.   */  DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");  DBUG_PRINT("enter",("nodeid= %d", tNodeId));  Uint32 sz = m_threads.m_statusNext.size();  for (Uint32 i = 0; i < sz ; i ++) {    if (m_threads.getInUse(i)){      void * obj = m_threads.m_objectExecute[i].m_object;      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];      (*RegPC) (obj, tNodeId, false, true);    }  }  DBUG_VOID_RETURN;}voidTransporterFacade::ReportNodeAlive(NodeId tNodeId){  /**   * When a node fails we must report this to each Ndb object.    * The function that is used for communicating node failures is called.   * This is to ensure that the Ndb objects do not think there connections    * are correct after a failure   * followed by a restart.    * After the restart the node is up again and the Ndb object    * might not have noticed the failure.   */  Uint32 sz = m_threads.m_statusNext.size();  for (Uint32 i = 0; i < sz ; i ++) {    if (m_threads.getInUse(i)){      void * obj = m_threads.m_objectExecute[i].m_object;      NodeStatusFunction RegPC = m_threads.m_statusFunction[i];      (*RegPC) (obj, tNodeId, true, false);    }  }}int TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id){  NdbMutex_Lock(theMutexPtr);  Uint32 low_bits = (Uint32)trans_id;  m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits;  close_local(blockNumber);  NdbMutex_Unlock(theMutexPtr);  return 0;}int TransporterFacade::close_local(BlockNumber blockNumber){  m_threads.close(blockNumber);  return 0;}intTransporterFacade::open(void* objRef,                         ExecuteFunction fun,                         NodeStatusFunction statusFun){  DBUG_ENTER("TransporterFacade::open");  int r= m_threads.open(objRef, fun, statusFun);  if (r < 0)    DBUG_RETURN(r);#if 1  if (theOwnId > 0) {    (*statusFun)(objRef, numberToRef(r, theOwnId), true, true);  }#endif  DBUG_RETURN(r);}TransporterFacade::~TransporterFacade(){    DBUG_ENTER("TransporterFacade::~TransporterFacade");  NdbMutex_Lock(theMutexPtr);  delete theClusterMgr;    delete theArbitMgr;  delete theTransporterRegistry;  NdbMutex_Unlock(theMutexPtr);  NdbMutex_Destroy(theMutexPtr);#ifdef API_TRACE  signalLogger.setOutputStream(0);#endif  DBUG_VOID_RETURN;}void TransporterFacade::calculateSendLimit(){  Uint32 Ti;  Uint32 TthreadCount = 0;    Uint32 sz = m_threads.m_statusNext.size();  for (Ti = 0; Ti < sz; Ti++) {    if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){      TthreadCount++;      m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;    }  }  currentSendLimit = TthreadCount;  if (currentSendLimit == 0) {    currentSendLimit = 1;  }  checkCounter = currentSendLimit << 2;}//-------------------------------------------------// Force sending but still report the sending to the// adaptive algorithm.//-------------------------------------------------void TransporterFacade::forceSend(Uint32 block_number) {  checkCounter--;  m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;  sendPerformedLastInterval = 1;  if (checkCounter < 0) {    calculateSendLimit();  }  theTransporterRegistry->forceSendCheck(0);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -