⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qmgrmain.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 5 页
字号:
      goto start_report;    }  }  ndbrequire(false);start_report:  jam();  {    Uint32 sz = NdbNodeBitmask::Size;    signal->theData[0] = NDB_LE_StartReport;    signal->theData[3] = sz;    Uint32* ptr = signal->theData+4;    c_definedNodes.copyto(sz, ptr); ptr += sz;    c_start.m_starting_nodes.copyto(sz, ptr); ptr += sz;    c_start.m_skip_nodes.copyto(sz, ptr); ptr += sz;    report_mask.copyto(sz, ptr); ptr+= sz;    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 	       4+4*NdbNodeBitmask::Size, JBB);  }  return retVal;  missing_nodegroup:  jam();  char buf[100], mask1[100], mask2[100];  c_start.m_starting_nodes.getText(mask1);  tmp.assign(c_start.m_starting_nodes);  tmp.bitANDC(c_start.m_starting_nodes_w_log);  tmp.getText(mask2);  BaseString::snprintf(buf, sizeof(buf),		       "Unable to start missing node group! "		       " starting: %s (missing fs for: %s)",		       mask1, mask2);  progError(__LINE__, NDBD_EXIT_SR_RESTARTCONFLICT, buf);}voidQmgr::electionWon(Signal* signal){  NodeRecPtr myNodePtr;  cpresident = getOwnNodeId(); /* This node becomes president. */  myNodePtr.i = getOwnNodeId();  ptrCheckGuard(myNodePtr, MAX_NDB_NODES, nodeRec);    myNodePtr.p->phase = ZRUNNING;  cpdistref = reference();  cneighbourl = ZNIL;  cneighbourh = ZNIL;  myNodePtr.p->ndynamicId = 1;  c_maxDynamicId = 1;  c_clusterNodes.clear();  c_clusterNodes.set(getOwnNodeId());    cpresidentAlive = ZTRUE;  c_start_election_time = ~0;  c_start.reset();  signal->theData[0] = NDB_LE_CM_REGCONF;  signal->theData[1] = getOwnNodeId();  signal->theData[2] = cpresident;  signal->theData[3] = 1;  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB);  c_start.m_starting_nodes.clear(getOwnNodeId());  if (c_start.m_starting_nodes.isclear())  {    jam();    sendSttorryLab(signal);  }}/*4.4.11 CONTINUEB *//*--------------------------------------------------------------------------*//*                                                                          *//*--------------------------------------------------------------------------*//****************************>---------------------------------------------*//* CONTINUEB                 >        SENDER: Own block, Own node          *//****************************>-------+INPUT : TCONTINUEB_TYPE              *//*--------------------------------------------------------------*/void Qmgr::regreqTimeLimitLab(Signal* signal) {  if(cpresident == ZNIL)  {    if (c_start.m_president_candidate == ZNIL)    {      jam();      c_start.m_president_candidate = getOwnNodeId();    }        cmInfoconf010Lab(signal);  }}//Qmgr::regreqTimelimitLab()/**--------------------------------------------------------------------------- * The new node will take care of giving information about own node and ask  * all other nodes for nodeinfo. The new node will use CM_NODEINFOREQ for  * that purpose. When the setup of connections to all running, the president  * will send a commit to all running nodes + the new node  * INPUT: NODE_PTR1, must be set as ZNIL if we don't enter CONNECT_NODES)  *                   from signal CM_NODEINFOCONF.  *---------------------------------------------------------------------------*//*******************************//* CM_NODEINFOCONF            *//*******************************/void Qmgr::execCM_NODEINFOCONF(Signal* signal) {  DEBUG_START3(signal, "");  jamEntry();  CmNodeInfoConf * const conf = (CmNodeInfoConf*)signal->getDataPtr();  const Uint32 nodeId = conf->nodeId;  const Uint32 dynamicId = conf->dynamicId;  const Uint32 version = conf->version;  NodeRecPtr nodePtr;    nodePtr.i = getOwnNodeId();  ptrAss(nodePtr, nodeRec);    ndbrequire(nodePtr.p->phase == ZSTARTING);  ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ);  c_start.m_nodes.clearWaitingFor(nodeId);  /**   * Update node info   */  NodeRecPtr replyNodePtr;  replyNodePtr.i = nodeId;  ptrCheckGuard(replyNodePtr, MAX_NDB_NODES, nodeRec);  replyNodePtr.p->ndynamicId = dynamicId;  replyNodePtr.p->blockRef = signal->getSendersBlockRef();  setNodeInfo(replyNodePtr.i).m_version = version;  if(!c_start.m_nodes.done()){    jam();    return;  }  /**********************************************<*/  /* Send an ack. back to the president.          */  /* CM_ACKADD                                    */  /* The new node has been registered by all      */  /* running nodes and has stored nodeinfo about  */  /* all running nodes. The new node has to wait  */  /* for CM_ADD (commit) from president to become */  /* a running node in the cluster.               */  /**********************************************<*/  sendCmAckAdd(signal, getOwnNodeId(), CmAdd::Prepare);  return;}//Qmgr::execCM_NODEINFOCONF()/**--------------------------------------------------------------------------- * A new node sends nodeinfo about himself. The new node asks for  * corresponding nodeinfo back in the  CM_NODEINFOCONF.   *---------------------------------------------------------------------------*//*******************************//* CM_NODEINFOREQ             *//*******************************/void Qmgr::execCM_NODEINFOREQ(Signal* signal) {  jamEntry();  const Uint32 Tblockref = signal->getSendersBlockRef();  NodeRecPtr nodePtr;    nodePtr.i = getOwnNodeId();  ptrAss(nodePtr, nodeRec);    if(nodePtr.p->phase != ZRUNNING){    jam();    signal->theData[0] = reference();    signal->theData[1] = getOwnNodeId();    signal->theData[2] = ZNOT_RUNNING;    sendSignal(Tblockref, GSN_CM_NODEINFOREF, signal, 3, JBB);    return;  }  NodeRecPtr addNodePtr;  CmNodeInfoReq * const req = (CmNodeInfoReq*)signal->getDataPtr();  addNodePtr.i = req->nodeId;  ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec);  addNodePtr.p->ndynamicId = req->dynamicId;  addNodePtr.p->blockRef = signal->getSendersBlockRef();  setNodeInfo(addNodePtr.i).m_version = req->version;  c_maxDynamicId = req->dynamicId;  cmAddPrepare(signal, addNodePtr, nodePtr.p);}//Qmgr::execCM_NODEINFOREQ()voidQmgr::cmAddPrepare(Signal* signal, NodeRecPtr nodePtr, const NodeRec * self){  jam();  switch(nodePtr.p->phase){  case ZINIT:    jam();    nodePtr.p->phase = ZSTARTING;    return;  case ZFAIL_CLOSING:    jam();    #if 1    warningEvent("Recieved request to incorperate node %u, "		 "while error handling has not yet completed",		 nodePtr.i);        ndbrequire(getOwnNodeId() != cpresident);    ndbrequire(signal->header.theVerId_signalNumber == GSN_CM_ADD);    c_start.m_nodes.clearWaitingFor();    c_start.m_nodes.setWaitingFor(nodePtr.i);    c_start.m_gsn = GSN_CM_NODEINFOCONF;#else    warningEvent("Enabling communication to CM_ADD node %u state=%d", 		 nodePtr.i,		 nodePtr.p->phase);    nodePtr.p->phase = ZSTARTING;    nodePtr.p->failState = NORMAL;    signal->theData[0] = 0;    signal->theData[1] = nodePtr.i;    sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA);#endif    return;  case ZSTARTING:    break;  case ZRUNNING:  case ZPREPARE_FAIL:  case ZAPI_ACTIVE:  case ZAPI_INACTIVE:    ndbrequire(false);  }    sendCmAckAdd(signal, nodePtr.i, CmAdd::Prepare);  /* President have prepared us */  CmNodeInfoConf * conf = (CmNodeInfoConf*)signal->getDataPtrSend();  conf->nodeId = getOwnNodeId();  conf->dynamicId = self->ndynamicId;  conf->version = getNodeInfo(getOwnNodeId()).m_version;  sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal,	     CmNodeInfoConf::SignalLength, JBB);  DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), "");}voidQmgr::sendCmAckAdd(Signal * signal, Uint32 nodeId, CmAdd::RequestType type){    CmAckAdd * cmAckAdd = (CmAckAdd*)signal->getDataPtrSend();  cmAckAdd->requestType = type;  cmAckAdd->startingNodeId = nodeId;  cmAckAdd->senderNodeId = getOwnNodeId();  sendSignal(cpdistref, GSN_CM_ACKADD, signal, CmAckAdd::SignalLength, JBA);  DEBUG_START(GSN_CM_ACKADD, cpresident, "");  switch(type){  case CmAdd::Prepare:    return;  case CmAdd::AddCommit:  case CmAdd::CommitNew:    break;  }  signal->theData[0] = nodeId;  EXECUTE_DIRECT(NDBCNTR, GSN_CM_ADD_REP, signal, 1);  jamEntry();}/*4.4.11 CM_ADD *//**-------------------------------------------------------------------------- * Prepare a running node to add a new node to the cluster. The running node  * will change phase of the new node fron ZINIT to ZWAITING. The running node  * will also mark that we have received a prepare. When the new node has sent  * us nodeinfo we can send an acknowledgement back to the president. When all  * running nodes has acknowledged the new node, the president will send a  * commit and we can change phase of the new node to ZRUNNING. The president  * will also send CM_ADD to himself.     *---------------------------------------------------------------------------*//*******************************//* CM_ADD                     *//*******************************/void Qmgr::execCM_ADD(Signal* signal) {  NodeRecPtr addNodePtr;  jamEntry();  NodeRecPtr nodePtr;  nodePtr.i = getOwnNodeId();  ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);  CmAdd * const cmAdd = (CmAdd*)signal->getDataPtr();  const CmAdd::RequestType type = (CmAdd::RequestType)cmAdd->requestType;  addNodePtr.i = cmAdd->startingNodeId;  //const Uint32 startingVersion = cmAdd->startingVersion;  ptrCheckGuard(addNodePtr, MAX_NDB_NODES, nodeRec);  DEBUG_START3(signal, type);  if(nodePtr.p->phase == ZSTARTING){    jam();    /**     * We are joining...     */    ndbrequire(addNodePtr.i == nodePtr.i);    switch(type){    case CmAdd::Prepare:      ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ);      /**       * Wait for CM_NODEINFO_CONF       */      return;    case CmAdd::CommitNew:      /**       * Tata. we're in the cluster       */      joinedCluster(signal, addNodePtr);      return;    case CmAdd::AddCommit:      ndbrequire(false);    }  }  switch (type) {  case CmAdd::Prepare:    cmAddPrepare(signal, addNodePtr, nodePtr.p);    break;  case CmAdd::AddCommit:{    jam();    ndbrequire(addNodePtr.p->phase == ZSTARTING);    addNodePtr.p->phase = ZRUNNING;    setNodeInfo(addNodePtr.i).m_heartbeat_cnt= 0;    c_clusterNodes.set(addNodePtr.i);    findNeighbours(signal);    /**     * SEND A HEARTBEAT IMMEDIATELY TO DECREASE THE RISK THAT WE MISS EARLY     * HEARTBEATS.      */    sendHeartbeat(signal);    /**     *  ENABLE COMMUNICATION WITH ALL BLOCKS WITH THE NEWLY ADDED NODE     */    signal->theData[0] = addNodePtr.i;    sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA);    sendCmAckAdd(signal, addNodePtr.i, CmAdd::AddCommit);    if(getOwnNodeId() != cpresident){      jam();      c_start.reset();    }    break;  }  case CmAdd::CommitNew:    jam();    ndbrequire(false);  }}//Qmgr::execCM_ADD()voidQmgr::joinedCluster(Signal* signal, NodeRecPtr nodePtr){  /**   * WE HAVE BEEN INCLUDED IN THE CLUSTER WE CAN START BEING PART OF THE    * HEARTBEAT PROTOCOL AND WE WILL ALSO ENABLE COMMUNICATION WITH ALL    * NODES IN THE CLUSTER.   */  nodePtr.p->phase = ZRUNNING;  setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;  findNeighbours(signal);  c_clusterNodes.set(nodePtr.i);  c_start.reset();  /**   * SEND A HEARTBEAT IMMEDIATELY TO DECREASE THE RISK    * THAT WE MISS EARLY HEARTBEATS.    */  sendHeartbeat(signal);  /**   * ENABLE COMMUNICATION WITH ALL BLOCKS IN THE CURRENT CLUSTER AND SET    * THE NODES IN THE CLUSTER TO BE RUNNING.    */  for (nodePtr.i = 1; nodePtr.i < MAX_NDB_NODES; nodePtr.i++) {    jam();    ptrAss(nodePtr, nodeRec);    if ((nodePtr.p->phase == ZRUNNING) && (nodePtr.i != getOwnNodeId())) {      /*-------------------------------------------------------------------*/      // Enable full communication to all other nodes. Not really necessary       // to open communication to ourself.      /*-------------------------------------------------------------------*/      jam();      signal->theData[0] = nodePtr.i;      sendSignal(CMVMI_REF, GSN_ENABLE_COMORD, signal, 1, JBA);    }//if  }//for    sendSttorryLab(signal);    /**   * Start timer handling    */  signal->theData[0] = ZTIMER_HANDLING;  sendSignal(QMGR_REF, GSN_CONTINUEB, signal, 10, JBB);    sendCmAckAdd(signal, getOwnNodeId(), CmAdd::CommitNew);}/*  4.10.7 CM_ACKADD        - PRESIDENT IS RECEIVER -       *//*---------------------------------------------------------------------------*//* Entry point for an ack add signal.  * The TTYPE defines if it is a prepare or a commit.                         *//*---------------------------------------------------------------------------*/void Qmgr::execCM_ACKADD(Signal* signal) {  NodeRecPtr addNodePtr;  NodeRecPtr senderNodePtr;  jamEntry();  CmAckAdd * const cmAckAdd = (CmAckAdd*)signal->getDataPtr();  const CmAdd::RequestType type = (CmAdd::RequestType)cmAckAdd->requestType;  addNodePtr.i = cmAckAdd->startingNodeId;  senderNodePtr.i = cmAckAdd->senderNodeId;  DEBUG_START3(signal, type);  if (cpresident != getOwnNodeId()) {    jam();    /*-----------------------------------------------------------------------*/    /* IF WE ARE NOT PRESIDENT THEN WE SHOULD NOT RECEIVE THIS MESSAGE.      */    /*------------------------------------------------------------_----------*/    warningEvent("Received CM_ACKADD from %d president=%d",		 senderNodePtr.i, cpresident);    return;  }//if  if (addNodePtr.i != c_start.m_startNode) {    jam();    /*----------------------------------------------------------------------*/    /* THIS IS NOT THE STARTING NODE. WE ARE ACTIVE NOW WITH ANOTHER START. */    /*----------------------------------------------------------------------*/    warningEvent("Received CM_ACKADD from %d with startNode=%d != own %d",		 senderNodePtr.i, addNodePtr.i, c_start.m_startNode);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -