⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qmgrmain.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 5 页
字号:
    return;  }//if    ndbrequire(c_start.m_gsn == GSN_CM_ADD);  c_start.m_nodes.clearWaitingFor(senderNodePtr.i);  if(!c_start.m_nodes.done()){    jam();    return;  }    switch (type) {  case CmAdd::Prepare:{    jam();    /*----------------------------------------------------------------------*/    /* ALL RUNNING NODES HAVE PREPARED THE INCLUSION OF THIS NEW NODE.      */    /*----------------------------------------------------------------------*/    c_start.m_gsn = GSN_CM_ADD;    c_start.m_nodes = c_clusterNodes;    CmAdd * const cmAdd = (CmAdd*)signal->getDataPtrSend();    cmAdd->requestType = CmAdd::AddCommit;    cmAdd->startingNodeId = addNodePtr.i;     cmAdd->startingVersion = getNodeInfo(addNodePtr.i).m_version;    NodeReceiverGroup rg(QMGR, c_clusterNodes);    sendSignal(rg, GSN_CM_ADD, signal, CmAdd::SignalLength, JBA);    DEBUG_START2(GSN_CM_ADD, rg, "AddCommit");    return;  }  case CmAdd::AddCommit:{    jam();    /****************************************/    /* Send commit to the new node so he    */    /* will change PHASE into ZRUNNING      */    /****************************************/    c_start.m_gsn = GSN_CM_ADD;    c_start.m_nodes.clearWaitingFor();    c_start.m_nodes.setWaitingFor(addNodePtr.i);    CmAdd * const cmAdd = (CmAdd*)signal->getDataPtrSend();    cmAdd->requestType = CmAdd::CommitNew;    cmAdd->startingNodeId = addNodePtr.i;     cmAdd->startingVersion = getNodeInfo(addNodePtr.i).m_version;        sendSignal(calcQmgrBlockRef(addNodePtr.i), GSN_CM_ADD, signal, 	       CmAdd::SignalLength, JBA);    DEBUG_START(GSN_CM_ADD, addNodePtr.i, "CommitNew");    return;  }  case CmAdd::CommitNew:    jam();    /**     * Tell arbitration about new node.     */    handleArbitNdbAdd(signal, addNodePtr.i);    c_start.reset();    if (c_start.m_starting_nodes.get(addNodePtr.i))    {      jam();      c_start.m_starting_nodes.clear(addNodePtr.i);      if (c_start.m_starting_nodes.isclear())      {	jam();	sendSttorryLab(signal);      }    }    return;  }//switch  ndbrequire(false);}//Qmgr::execCM_ACKADD()/**------------------------------------------------------------------------- * WE HAVE BEEN INCLUDED INTO THE CLUSTER. IT IS NOW TIME TO CALCULATE WHICH  * ARE OUR LEFT AND RIGHT NEIGHBOURS FOR THE HEARTBEAT PROTOCOL.  *--------------------------------------------------------------------------*/void Qmgr::findNeighbours(Signal* signal) {  UintR toldLeftNeighbour;  UintR tfnLeftFound;  UintR tfnMaxFound;  UintR tfnMinFound;  UintR tfnRightFound;  NodeRecPtr fnNodePtr;  NodeRecPtr fnOwnNodePtr;  toldLeftNeighbour = cneighbourl;  tfnLeftFound = 0;  tfnMaxFound = 0;  tfnMinFound = (UintR)-1;  tfnRightFound = (UintR)-1;  fnOwnNodePtr.i = getOwnNodeId();  ptrCheckGuard(fnOwnNodePtr, MAX_NDB_NODES, nodeRec);  for (fnNodePtr.i = 1; fnNodePtr.i < MAX_NDB_NODES; fnNodePtr.i++) {    jam();    ptrAss(fnNodePtr, nodeRec);    if (fnNodePtr.i != fnOwnNodePtr.i) {      if (fnNodePtr.p->phase == ZRUNNING) {        if (tfnMinFound > fnNodePtr.p->ndynamicId) {          jam();          tfnMinFound = fnNodePtr.p->ndynamicId;        }//if        if (tfnMaxFound < fnNodePtr.p->ndynamicId) {          jam();          tfnMaxFound = fnNodePtr.p->ndynamicId;        }//if        if (fnOwnNodePtr.p->ndynamicId > fnNodePtr.p->ndynamicId) {          jam();          if (fnNodePtr.p->ndynamicId > tfnLeftFound) {            jam();            tfnLeftFound = fnNodePtr.p->ndynamicId;          }//if        } else {          jam();          if (fnNodePtr.p->ndynamicId < tfnRightFound) {            jam();            tfnRightFound = fnNodePtr.p->ndynamicId;          }//if        }//if      }//if    }//if  }//for  if (tfnLeftFound == 0) {    if (tfnMinFound == (UintR)-1) {      jam();      cneighbourl = ZNIL;    } else {      jam();      cneighbourl = translateDynamicIdToNodeId(signal, tfnMaxFound);    }//if  } else {    jam();    cneighbourl = translateDynamicIdToNodeId(signal, tfnLeftFound);  }//if  if (tfnRightFound == (UintR)-1) {    if (tfnMaxFound == 0) {      jam();      cneighbourh = ZNIL;    } else {      jam();      cneighbourh = translateDynamicIdToNodeId(signal, tfnMinFound);    }//if  } else {    jam();    cneighbourh = translateDynamicIdToNodeId(signal, tfnRightFound);  }//if  if (toldLeftNeighbour != cneighbourl) {    jam();    if (cneighbourl != ZNIL) {      jam();      /**-------------------------------------------------------------------*/      /* WE ARE SUPERVISING A NEW LEFT NEIGHBOUR. WE START WITH ALARM COUNT        * EQUAL TO ZERO.       *---------------------------------------------------------------------*/      fnNodePtr.i = cneighbourl;      ptrCheckGuard(fnNodePtr, MAX_NDB_NODES, nodeRec);      setNodeInfo(fnNodePtr.i).m_heartbeat_cnt= 0;    }//if  }//if  signal->theData[0] = NDB_LE_FIND_NEIGHBOURS;  signal->theData[1] = getOwnNodeId();  signal->theData[2] = cneighbourl;  signal->theData[3] = cneighbourh;  signal->theData[4] = fnOwnNodePtr.p->ndynamicId;  UintR Tlen = 5;  for (fnNodePtr.i = 1; fnNodePtr.i < MAX_NDB_NODES; fnNodePtr.i++) {    jam();    ptrAss(fnNodePtr, nodeRec);    if (fnNodePtr.i != fnOwnNodePtr.i) {      if (fnNodePtr.p->phase == ZRUNNING) {        jam();        signal->theData[Tlen] = fnNodePtr.i;        signal->theData[Tlen + 1] = fnNodePtr.p->ndynamicId;        if (Tlen < 25) {	  /*----------------------------------------------------------------*/	  // This code can only report 11 nodes. 	  // We need to update this when increasing the number of nodes	  // supported.	  /*-----------------------------------------------------------------*/          Tlen += 2;        }      }//if    }//if  }//for  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, Tlen, JBB);}//Qmgr::findNeighbours()/*4.10.7 INIT_DATA        *//*---------------------------------------------------------------------------*//*---------------------------------------------------------------------------*/void Qmgr::initData(Signal* signal) {  NodeRecPtr nodePtr;  for (nodePtr.i = 1; nodePtr.i < MAX_NODES; nodePtr.i++) {    ptrAss(nodePtr, nodeRec);    nodePtr.p->ndynamicId = 0;	    if(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB){      nodePtr.p->phase = ZINIT;      c_definedNodes.set(nodePtr.i);    } else {      nodePtr.p->phase = ZAPI_INACTIVE;    }    setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;    nodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;    nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;    nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;    nodePtr.p->failState = NORMAL;    nodePtr.p->rcv[0] = 0;    nodePtr.p->rcv[1] = 0;  }//for  cfailureNr = 1;  ccommitFailureNr = 1;  cprepareFailureNr = 1;  cnoFailedNodes = 0;  cnoPrepFailedNodes = 0;  creadyDistCom = ZFALSE;  cpresident = ZNIL;  c_start.m_president_candidate = ZNIL;  c_start.m_president_candidate_gci = 0;  cpdistref = 0;  cneighbourh = ZNIL;  cneighbourl = ZNIL;  cdelayRegreq = ZDELAY_REGREQ;  cactivateApiCheck = 0;  ctoStatus = Q_NOT_ACTIVE;  interface_check_timer.setDelay(1000);  interface_check_timer.reset();  clatestTransactionCheck = 0;  cLqhTimeSignalCount = 0;  // catch-all for missing initializations  memset(&arbitRec, 0, sizeof(arbitRec));  /**   * Timeouts   */  const ndb_mgm_configuration_iterator * p =     theConfiguration.getOwnConfigIterator();  ndbrequire(p != 0);    Uint32 hbDBDB = 1500;  Uint32 hbDBAPI = 1500;  Uint32 arbitTimeout = 1000;  c_restartPartialTimeout = 30000;  c_restartPartionedTimeout = 60000;  c_restartFailureTimeout = ~0;  ndb_mgm_get_int_parameter(p, CFG_DB_HEARTBEAT_INTERVAL, &hbDBDB);  ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL, &hbDBAPI);  ndb_mgm_get_int_parameter(p, CFG_DB_ARBIT_TIMEOUT, &arbitTimeout);  ndb_mgm_get_int_parameter(p, CFG_DB_START_PARTIAL_TIMEOUT, 			    &c_restartPartialTimeout);  ndb_mgm_get_int_parameter(p, CFG_DB_START_PARTITION_TIMEOUT,			    &c_restartPartionedTimeout);  ndb_mgm_get_int_parameter(p, CFG_DB_START_FAILURE_TIMEOUT,			    &c_restartFailureTimeout);    if(c_restartPartialTimeout == 0)  {    c_restartPartialTimeout = ~0;  }    if (c_restartPartionedTimeout ==0)  {    c_restartPartionedTimeout = ~0;  }    if (c_restartFailureTimeout == 0)  {    c_restartFailureTimeout = ~0;  }  setHbDelay(hbDBDB);  setHbApiDelay(hbDBAPI);  setArbitTimeout(arbitTimeout);    arbitRec.state = ARBIT_NULL;          // start state for all nodes  arbitRec.apiMask[0].clear();          // prepare for ARBIT_CFG  ArbitSignalData* const sd = (ArbitSignalData*)&signal->theData[0];  for (unsigned rank = 1; rank <= 2; rank++) {    sd->sender = getOwnNodeId();    sd->code = rank;    sd->node = 0;    sd->ticket.clear();    sd->mask.clear();    ndb_mgm_configuration_iterator * iter =      theConfiguration.getClusterConfigIterator();    for (ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)) {      Uint32 tmp = 0;      if (ndb_mgm_get_int_parameter(iter, CFG_NODE_ARBIT_RANK, &tmp) == 0 && 	  tmp == rank){	Uint32 nodeId = 0;	ndbrequire(!ndb_mgm_get_int_parameter(iter, CFG_NODE_ID, &nodeId));	sd->mask.set(nodeId);      }    }        execARBIT_CFG(signal);  }  setNodeInfo(getOwnNodeId()).m_version = NDB_VERSION;}//Qmgr::initData()/**--------------------------------------------------------------------------- * HERE WE RECEIVE THE JOB TABLE SIGNAL EVERY 10 MILLISECONDS.  * WE WILL USE THIS TO CHECK IF IT IS TIME TO CHECK THE NEIGHBOUR NODE.  * WE WILL ALSO SEND A SIGNAL TO BLOCKS THAT NEED A TIME SIGNAL AND  * DO NOT WANT TO USE JOB TABLE SIGNALS. *---------------------------------------------------------------------------*/void Qmgr::timerHandlingLab(Signal* signal) {  NDB_TICKS TcurrentTime = NdbTick_CurrentMillisecond();  NodeRecPtr myNodePtr;  myNodePtr.i = getOwnNodeId();  ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec);  if (myNodePtr.p->phase == ZRUNNING) {    jam();    /**---------------------------------------------------------------------     * WE ARE ONLY PART OF HEARTBEAT CLUSTER IF WE ARE UP AND RUNNING.      *---------------------------------------------------------------------*/    if (hb_send_timer.check(TcurrentTime)) {      jam();      sendHeartbeat(signal);      hb_send_timer.reset();    }    if (hb_check_timer.check(TcurrentTime)) {      jam();      checkHeartbeat(signal);      hb_check_timer.reset();    }  }  if (interface_check_timer.check(TcurrentTime)) {    jam();    interface_check_timer.reset();    checkStartInterface(signal);  }  if (cactivateApiCheck != 0) {    jam();    if (hb_api_timer.check(TcurrentTime)) {      jam();      hb_api_timer.reset();      apiHbHandlingLab(signal);    }//if    if (clatestTransactionCheck == 0) {      //-------------------------------------------------------------      // Initialise the Transaction check timer.      //-------------------------------------------------------------      clatestTransactionCheck = TcurrentTime;    }//if    int counter = 0;    while (TcurrentTime > ((NDB_TICKS)10 + clatestTransactionCheck)) {      jam();      clatestTransactionCheck += (NDB_TICKS)10;      sendSignal(DBTC_REF, GSN_TIME_SIGNAL, signal, 1, JBB);      cLqhTimeSignalCount++;      if (cLqhTimeSignalCount >= 100) {	cLqhTimeSignalCount = 0;	sendSignal(DBLQH_REF, GSN_TIME_SIGNAL, signal, 1, JBB);                }//if      counter++;      if (counter > 1) {	jam();	break;      } else {	;      }//if    }//while  }//if    //--------------------------------------------------  // Resend this signal with 10 milliseconds delay.  //--------------------------------------------------  signal->theData[0] = ZTIMER_HANDLING;  sendSignalWithDelay(QMGR_REF, GSN_CONTINUEB, signal, 10, 1);  return;}//Qmgr::timerHandlingLab()/*---------------------------------------------------------------------------*//*       THIS MODULE HANDLES THE SENDING AND RECEIVING OF HEARTBEATS.        *//*---------------------------------------------------------------------------*/void Qmgr::sendHeartbeat(Signal* signal) {  NodeRecPtr localNodePtr;  localNodePtr.i = cneighbourh;  if (localNodePtr.i == ZNIL) {    jam();    /**---------------------------------------------------------------------     * THERE ARE NO NEIGHBOURS. THIS IS POSSIBLE IF WE ARE THE ONLY NODE IN      * THE CLUSTER.IN THIS CASE WE DO NOT NEED TO SEND ANY HEARTBEAT SIGNALS.     *-----------------------------------------------------------------------*/    return;  }//if  ptrCheckGuard(localNodePtr, MAX_NDB_NODES, nodeRec);  signal->theData[0] = getOwnNodeId();  sendSignal(localNodePtr.p->blockRef, GSN_CM_HEARTBEAT, signal, 1, JBA);#ifdef VM_TRACE  signal->theData[0] = NDB_LE_SentHeartbeat;  signal->theData[1] = localNodePtr.i;  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);  #endif}//Qmgr::sendHeartbeat()void Qmgr::checkHeartbeat(Signal* signal) {  NodeRecPtr nodePtr;  nodePtr.i = cneighbourl;  if (nodePtr.i == ZNIL) {    jam();    /**---------------------------------------------------------------------     * THERE ARE NO NEIGHBOURS. THIS IS POSSIBLE IF WE ARE THE ONLY NODE IN      * THE CLUSTER. IN THIS CASE WE DO NOT NEED TO CHECK ANY HEARTBEATS.     *-----------------------------------------------------------------------*/    return;  }//if  ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);    setNodeInfo(nodePtr.i).m_heartbeat_cnt++;  ndbrequire(nodePtr.p->phase == ZRUNNING);  ndbrequire(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB);  if(getNodeInfo(nodePtr.i).m_heartbeat_cnt > 2){    signal->theData[0] = NDB_LE_MissedHeartbeat;    signal->theData[1] = nodePtr.i;    signal->theData[2] = getNodeInfo(nodePtr.i).m_heartbeat_cnt - 1;    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);  }  if (getNode

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -