⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 5 页
字号:
  }  SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();  req->subscriberRef = subscriberRef;  req->subscriberData = subscriberData;  req->subscriptionId = subscriptionId;  req->subscriptionKey = subscriptionKey;  req->part = part;  sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB);  DBUG_VOID_RETURN;}voidSuma::execNODE_FAILREP(Signal* signal){  jamEntry();  DBUG_ENTER("Suma::execNODE_FAILREP");  NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr();    bool changed = false;  NodePtr nodePtr;#ifdef NODEFAIL_DEBUG  ndbout_c("Suma: nodefailrep");#endif  c_nodeFailGCI = getFirstGCI(signal);  for(c_nodes.first(nodePtr); nodePtr.i != RNIL; c_nodes.next(nodePtr)){    if(NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)){      if(nodePtr.p->alive){	ndbassert(c_aliveNodes.get(nodePtr.p->nodeId));	changed = true;	jam();      } else {	ndbassert(!c_aliveNodes.get(nodePtr.p->nodeId));	jam();      }            if (c_preparingNodes.get(nodePtr.p->nodeId)) {	jam();	// we are currently preparing this node that died	// it's ok just to clear and go back to waiting for it to start up	Restart.resetNode(calcSumaBlockRef(nodePtr.p->nodeId));	c_preparingNodes.clear(nodePtr.p->nodeId);      } else if (c_handoverToDo) {	jam();	// TODO what if I'm a SUMA that is currently restarting and the SUMA	// responsible for restarting me is the one that died?	// a node has failed whilst handover is going on	// let's check if we're in the process of handover with that node	c_handoverToDo = false;	for( int i = 0; i < NO_OF_BUCKETS; i++) {	  if (c_buckets[i].handover) {	    // I'm doing handover, but is it with the dead node?	    if (getResponsibleSumaNodeId(i) == nodePtr.p->nodeId) {	      // so it was the dead node, has handover started?	      if (c_buckets[i].handover_started) {		jam();		// we're not ok and will have lost data!		// set not active to indicate this -		// this will generate takeover behaviour		c_buckets[i].active = false;		c_buckets[i].handover_started = false;	      } // else we're ok to revert back to state before 	      c_buckets[i].handover = false;	    } else {	      jam();	      // ok, we're doing handover with a different node	      c_handoverToDo = true;	    }	  }	}      }      c_failoverBuffer.nodeFailRep();      nodePtr.p->alive = 0;      c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above    }  }  DBUG_VOID_RETURN;}voidSuma::execINCL_NODEREQ(Signal* signal){  jamEntry();    //const Uint32 senderRef = signal->theData[0];  const Uint32 inclNode  = signal->theData[1];  NodePtr node;  for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)){    jam();    const Uint32 nodeId = node.p->nodeId;    if(inclNode == nodeId){      jam();            ndbrequire(node.p->alive == 0);      ndbrequire(!c_aliveNodes.get(nodeId));            for (Uint32 j = 0; j < c_noNodesInGroup; j++) {        jam();	if (c_nodesInGroup[j] == nodeId) {	  // the starting node is part of my node group          jam();	  c_preparingNodes.set(nodeId); // set as being prepared	  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {            jam();	    if (i == c_idInNodeGroup) {              jam();	      // I'm responsible for restarting this SUMA	      // ALL dict's should have meta data info so it is ok to start	      Restart.startNode(signal, calcSumaBlockRef(nodeId));	      break;	    }//if	    if (c_aliveNodes.get(c_nodesInGroup[i])) {              jam();	      break; // another Suma takes care of this	    }//if	  }//for	  break;	}//if      }//for      node.p->alive = 1;      c_aliveNodes.set(nodeId);      break;    }//if  }//for#if 0 // if we include this DIH's got to be prepared, later if needed...  signal->theData[0] = reference();    sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);#endif}voidSuma::execSIGNAL_DROPPED_REP(Signal* signal){  jamEntry();  ndbrequire(false);}/******************************************************************** * * Dump state * */static unsignedcount_subscribers(const DLList<SumaParticipant::Subscriber> &subs){  unsigned n= 0;  SumaParticipant::SubscriberPtr i_subbPtr;  subs.first(i_subbPtr);  while(!i_subbPtr.isNull()){    n++;    subs.next(i_subbPtr);  }  return n;}voidSuma::execDUMP_STATE_ORD(Signal* signal){  jamEntry();  Uint32 tCase = signal->theData[0];  if(tCase >= 8000 && tCase <= 8003){    SubscriptionPtr subPtr;    c_subscriptions.getPtr(subPtr, g_subPtrI);        Ptr<SyncRecord> syncPtr;    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);        if(tCase == 8000){      syncPtr.p->startMeta(signal);    }        if(tCase == 8001){      syncPtr.p->startScan(signal);    }    if(tCase == 8002){      syncPtr.p->startTrigger(signal);    }        if(tCase == 8003){      subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;      LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);      Uint32 tab = 0;      Uint32 att[] = { 0, 1, 1 };      syncPtr.p->m_tableList.append(&tab, 1);      attrs.append(att, 3);    }  }  if(tCase == 8004){    infoEvent("Suma: c_subscriberPool  size: %d free: %d",	      c_subscriberPool.getSize(),	      c_subscriberPool.getNoOfFree());    infoEvent("Suma: c_tablePool  size: %d free: %d",	      c_tablePool_.getSize(),	      c_tablePool_.getNoOfFree());    infoEvent("Suma: c_subscriptionPool  size: %d free: %d",	      c_subscriptionPool.getSize(),	      c_subscriptionPool.getNoOfFree());    infoEvent("Suma: c_syncPool  size: %d free: %d",	      c_syncPool.getSize(),	      c_syncPool.getNoOfFree());    infoEvent("Suma: c_dataBufferPool  size: %d free: %d",	      c_dataBufferPool.getSize(),	      c_dataBufferPool.getNoOfFree());    infoEvent("Suma: c_metaSubscribers count: %d",	      count_subscribers(c_metaSubscribers));    infoEvent("Suma: c_dataSubscribers count: %d",	      count_subscribers(c_dataSubscribers));    infoEvent("Suma: c_prepDataSubscribers count: %d",	      count_subscribers(c_prepDataSubscribers));    infoEvent("Suma: c_removeDataSubscribers count: %d",	      count_subscribers(c_removeDataSubscribers));  }}/******************************************************************** * * Convert a table name (db+schema+tablename) to tableId * */#if 0voidSumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal){  jam();  if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) {    jam();    GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend();    char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable];    const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated    req->senderRef  = reference();    req->senderData = subPtr.i;    req->len        = strLen;    LinearSectionPtr ptr[1];    ptr[0].p  = (Uint32*)tableName;    ptr[0].sz = strLen;    sendSignal(DBDICT_REF,	       GSN_GET_TABLEID_REQ, 	       signal, 	       GetTableIdReq::SignalLength,	       JBB,	       ptr,	       1);  } else {    jam();    sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr);  }}#endifvoid SumaParticipant::addTableId(Uint32 tableId,			    SubscriptionPtr subPtr, SyncRecord *psyncRec){#ifdef NODEFAIL_DEBUG  ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u",	   tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);#endif  subPtr.p->m_tables[tableId] = 1;  subPtr.p->m_currentTable++;  if(psyncRec != NULL)    psyncRec->m_tableList.append(&tableId, 1);  }#if 0void SumaParticipant::execGET_TABLEID_CONF(Signal * signal){  jamEntry();  GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr();  Uint32 tableId = conf->tableId;  //Uint32 schemaVersion = conf->schemaVersion;    Uint32 senderData = conf->senderData;  SubscriptionPtr subPtr;  Ptr<SyncRecord> syncPtr;  c_subscriptions.getPtr(subPtr, senderData);  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);    /*   * add to m_tableList   */  addTableId(tableId, subPtr, syncPtr.p);  convertNameToId(subPtr, signal);}void SumaParticipant::execGET_TABLEID_REF(Signal * signal){  jamEntry();  GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr();  Uint32 senderData         = ref->senderData;  //  Uint32 err                = ref->err;    SubscriptionPtr subPtr;  c_subscriptions.getPtr(subPtr, senderData);  Uint32 subData = subPtr.p->m_subscriberData;  SubCreateRef * reff = (SubCreateRef*)ref;  /**   * @todo: map ref->err to GrepError.   */  reff->err = GrepError::SELECTED_TABLE_NOT_FOUND;  reff->subscriberData = subData;  sendSignal(subPtr.p->m_subscriberRef,	     GSN_SUB_CREATE_REF, 	     signal, 	     SubCreateRef::SignalLength,	     JBB);}#endif/************************************************************* * * Creation of subscription id's * ************************************************************/void Suma::execCREATE_SUBID_REQ(Signal* signal) {  jamEntry();  CRASH_INSERTION(13001);  CreateSubscriptionIdReq const * req =    (CreateSubscriptionIdReq*)signal->getDataPtr();  SubscriberPtr subbPtr;  if(!c_subscriberPool.seize(subbPtr)){    jam();    sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM);    return;  }  subbPtr.p->m_subscriberRef  = signal->getSendersBlockRef();   subbPtr.p->m_senderData     = req->senderData;  subbPtr.p->m_subscriberData = subbPtr.i;  UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();     utilReq->senderData  = subbPtr.p->m_subscriberData;  utilReq->sequenceId  = SUMA_SEQUENCE;  utilReq->requestType = UtilSequenceReq::NextVal;  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 	     signal, UtilSequenceReq::SignalLength, JBB);}voidSuma::execUTIL_SEQUENCE_CONF(Signal* signal){  jamEntry();  DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");  CRASH_INSERTION(13002);  UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();  if(conf->requestType == UtilSequenceReq::Create) {    jam();    createSequenceReply(signal, conf, NULL);    DBUG_VOID_RETURN;  }  Uint64 subId;  memcpy(&subId,conf->sequenceValue,8);  Uint32 subData = conf->senderData;  SubscriberPtr subbPtr;  c_subscriberPool.getPtr(subbPtr,subData);    CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;  subconf->subscriptionId = (Uint32)subId;  subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);  subconf->subscriberData = subbPtr.p->m_senderData;    sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal,	     CreateSubscriptionIdConf::SignalLength, JBB);  c_subscriberPool.release(subbPtr);  DBUG_VOID_RETURN;}voidSuma::execUTIL_SEQUENCE_REF(Signal* signal){  jamEntry();  DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");  UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();  if(ref->requestType == UtilSequenceReq::Create) {    jam();    createSequenceReply(signal, NULL, ref);    DBUG_VOID_RETURN;  }  Uint32 subData = ref->senderData;  SubscriberPtr subbPtr;  c_subscriberPool.getPtr(subbPtr,subData);  sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);  c_subscriberPool.release(subbPtr);  DBUG_VOID_RETURN;}//execUTIL_SEQUENCE_REF()voidSumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){  jam();  CreateSubscriptionIdRef  * ref =     (CreateSubscriptionIdRef *)signal->getDataPtrSend();  ref->err = errCode;  sendSignal(signal->getSendersBlockRef(), 	     GSN_CREATE_SUBID_REF,	     signal, 	     CreateSubscriptionIdRef::SignalLength,	     JBB);    releaseSections(signal);    return;}/********************************************************** * Suma participant interface * * Creation of subscriptions */voidSumaParticipant::execSUB_CREATE_REQ(Signal* signal) {#ifdef NODEFAIL_DEBUG  ndbout_c("SumaParticipant::execSUB_CREATE_REQ");#endif  jamEntry();                              CRASH_INSERTION(13003);  const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();        const Uint32 subId   = req.subscriptionId;  const Uint32 subKey  = req.subscriptionKey;  const Uint32 subRef  = req.subscriberRef;  const Uint32 subData = req.subscriberData;  const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;  const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;  const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;  const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;  const Uint32 sender = signal->getSendersBlockRef();  Subscription key;  key.m_subscriptionId  = subId;  key.m_subscriptionKey = subKey;  SubscriptionPtr subPtr;  Ptr<SyncRecord> syncPtr;    if (addTableFlag) {    ndbrequire(restartFlag);  //TODO remove this    if(!c_subscriptions.find(subPtr, key)) {      jam();      sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND);      return;    }    jam();    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  } else {    // Check that id/key is unique    if(c_subscriptions.find(subPtr, key)) {      jam();      sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE);      return;    }    if(!c_subscriptions.seize(subPtr)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -