⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.27版本源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
  convertNameToId(subPtr, signal);}void SumaParticipant::execGET_TABLEID_REF(Signal * signal){  jamEntry();  GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr();  Uint32 senderData         = ref->senderData;  //  Uint32 err                = ref->err;    SubscriptionPtr subPtr;  c_subscriptions.getPtr(subPtr, senderData);  Uint32 subData = subPtr.p->m_subscriberData;  SubCreateRef * reff = (SubCreateRef*)ref;  /**   * @todo: map ref->err to GrepError.   */  reff->err = GrepError::SELECTED_TABLE_NOT_FOUND;  reff->subscriberData = subData;  sendSignal(subPtr.p->m_subscriberRef,	     GSN_SUB_CREATE_REF, 	     signal, 	     SubCreateRef::SignalLength,	     JBB);}#endif/************************************************************* * * Creation of subscription id's * ************************************************************/void Suma::execCREATE_SUBID_REQ(Signal* signal) {  jamEntry();  CRASH_INSERTION(13001);  CreateSubscriptionIdReq const * req =    (CreateSubscriptionIdReq*)signal->getDataPtr();  SubscriberPtr subbPtr;  if(!c_subscriberPool.seize(subbPtr)){    jam();    sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM);    return;  }  subbPtr.p->m_subscriberRef  = signal->getSendersBlockRef();   subbPtr.p->m_senderData     = req->senderData;  subbPtr.p->m_subscriberData = subbPtr.i;  UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();     utilReq->senderData  = subbPtr.p->m_subscriberData;  utilReq->sequenceId  = SUMA_SEQUENCE;  utilReq->requestType = UtilSequenceReq::NextVal;  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 	     signal, UtilSequenceReq::SignalLength, JBB);}voidSuma::execUTIL_SEQUENCE_CONF(Signal* signal){  jamEntry();  DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");  CRASH_INSERTION(13002);  UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();  if(conf->requestType == UtilSequenceReq::Create) {    jam();    createSequenceReply(signal, conf, NULL);    DBUG_VOID_RETURN;  }  Uint64 subId;  memcpy(&subId,conf->sequenceValue,8);  Uint32 subData = conf->senderData;  SubscriberPtr subbPtr;  c_subscriberPool.getPtr(subbPtr,subData);    CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;  subconf->subscriptionId = (Uint32)subId;  subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);  subconf->subscriberData = subbPtr.p->m_senderData;    sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal,	     CreateSubscriptionIdConf::SignalLength, JBB);  c_subscriberPool.release(subbPtr);  DBUG_VOID_RETURN;}voidSuma::execUTIL_SEQUENCE_REF(Signal* signal){  jamEntry();  DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");  UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();  if(ref->requestType == UtilSequenceReq::Create) {    jam();    createSequenceReply(signal, NULL, ref);    DBUG_VOID_RETURN;  }  Uint32 subData = ref->senderData;  SubscriberPtr subbPtr;  c_subscriberPool.getPtr(subbPtr,subData);  sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);  c_subscriberPool.release(subbPtr);  DBUG_VOID_RETURN;}//execUTIL_SEQUENCE_REF()voidSumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){  jam();  CreateSubscriptionIdRef  * ref =     (CreateSubscriptionIdRef *)signal->getDataPtrSend();  ref->err = errCode;  sendSignal(signal->getSendersBlockRef(), 	     GSN_CREATE_SUBID_REF,	     signal, 	     CreateSubscriptionIdRef::SignalLength,	     JBB);    releaseSections(signal);    return;}/********************************************************** * Suma participant interface * * Creation of subscriptions */voidSumaParticipant::execSUB_CREATE_REQ(Signal* signal) {#ifdef NODEFAIL_DEBUG  ndbout_c("SumaParticipant::execSUB_CREATE_REQ");#endif  jamEntry();                              CRASH_INSERTION(13003);  const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();        const Uint32 subId   = req.subscriptionId;  const Uint32 subKey  = req.subscriptionKey;  const Uint32 subRef  = req.subscriberRef;  const Uint32 subData = req.subscriberData;  const Uint32 type    = req.subscriptionType & SubCreateReq::RemoveFlags;  const Uint32 flags   = req.subscriptionType & SubCreateReq::GetFlags;  const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;  const bool restartFlag  = (flags & SubCreateReq::RestartFlag)  != 0;  const Uint32 sender = signal->getSendersBlockRef();  Subscription key;  key.m_subscriptionId  = subId;  key.m_subscriptionKey = subKey;  SubscriptionPtr subPtr;  Ptr<SyncRecord> syncPtr;    if (addTableFlag) {    ndbrequire(restartFlag);  //TODO remove this    if(!c_subscriptions.find(subPtr, key)) {      jam();      sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND);      return;    }    jam();    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  } else {    // Check that id/key is unique    if(c_subscriptions.find(subPtr, key)) {      jam();      sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE);      return;    }    if(!c_subscriptions.seize(subPtr)) {      jam();      sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);      return;    }    if(!c_syncPool.seize(syncPtr)) {      jam();      sendSubCreateRef(signal, req, GrepError::NOSPACE_IN_POOL);      return;    }    jam();    subPtr.p->m_subscriberRef    = subRef;    subPtr.p->m_subscriberData   = subData;    subPtr.p->m_subscriptionId   = subId;    subPtr.p->m_subscriptionKey  = subKey;    subPtr.p->m_subscriptionType = type;      /**     * ok to memset? Support on all compilers     * @todo find out if memset is supported by all compilers     */    memset(subPtr.p->m_tables,0,MAX_TABLES);    subPtr.p->m_maxTables    = 0;    subPtr.p->m_currentTable = 0;    subPtr.p->m_syncPtrI   = syncPtr.i;    subPtr.p->m_markRemove = false;    subPtr.p->m_nSubscribers = 0;    c_subscriptions.add(subPtr);    syncPtr.p->m_subscriptionPtrI = subPtr.i;    syncPtr.p->m_doSendSyncData   = true;    syncPtr.p->ptrI               = syncPtr.i;    syncPtr.p->m_locked           = false;    syncPtr.p->m_error            = false;  }  if (restartFlag ||       type == SubCreateReq::TableEvent) {    syncPtr.p->m_doSendSyncData = false;    ndbrequire(type != SubCreateReq::SingleTableScan);    jam();    if (subPtr.p->m_tables[req.tableId] != 0) {      ndbrequire(false); //TODO remove      jam();      sendSubCreateRef(signal, req, GrepError::SELECTED_TABLE_ALREADY_ADDED);      return;    }    if (addTableFlag) {      ndbrequire(type != SubCreateReq::TableEvent);      jam();    }    subPtr.p->m_maxTables++;    addTableId(req.tableId, subPtr, syncPtr.p);  } else {    switch(type){    case SubCreateReq::SingleTableScan:      {	jam();	syncPtr.p->m_tableList.append(&req.tableId, 1);	if(signal->getNoOfSections() > 0){	  SegmentedSectionPtr ptr;	  signal->getSection(ptr, SubCreateReq::ATTRIBUTE_LIST);	  LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);	  append(attrBuf, ptr, getSectionSegmentPool());	}      }    break;#if 0    case SubCreateReq::SelectiveTableSnapshot:      /**       * Tables specified by the user that does not exist       * in the database are just ignored. No error message       * is given, nor does the db nodes crash       * @todo: Memory is not release here (used tableBuf)       */      {	if(signal->getNoOfSections() == 0 ){	  jam();	  sendSubCreateRef(signal, req, GrepError::WRONG_NO_OF_SECTIONS);	  return;	}	jam();      	SegmentedSectionPtr ptr;	signal->getSection(ptr,0);// SubCreateReq::TABLE_LIST);	SimplePropertiesSectionReader r0(ptr, getSectionSegmentPool());	Uint32 i=0;	char table[MAX_TAB_NAME_SIZE];	r0.reset();	r0.first();	while(true){	  if ((r0.getValueType() != SimpleProperties::StringValue) ||	      (r0.getValueLen() <= 0)) {	    releaseSections(signal);	    ndbrequire(false);	  }	  r0.getString(table);	  strcpy(subPtr.p->m_tableNames[i],table);	  i++;	  if(!r0.next())	    break;	}	releaseSections(signal);	subPtr.p->m_maxTables    = i;	subPtr.p->m_currentTable = 0;	releaseSections(signal);	convertNameToId(subPtr, signal);	return;      }    break;#endif    case SubCreateReq::DatabaseSnapshot:      {	jam();      }    break;    default:      ndbrequire(false);    }  }  sendSubCreateConf(signal, sender, subPtr);  return;}voidSumaParticipant::sendSubCreateConf(Signal* signal, Uint32 sender,				   SubscriptionPtr subPtr){  SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();        conf->subscriptionId       = subPtr.p->m_subscriptionId;  conf->subscriptionKey      = subPtr.p->m_subscriptionKey;  conf->subscriberData       = subPtr.p->m_subscriberData;  sendSignal(sender, GSN_SUB_CREATE_CONF, signal,	     SubCreateConf::SignalLength, JBB);}voidSumaParticipant::sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errCode){  jam();  SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();  ref->subscriberRef  = reference();  ref->subscriberData = req.subscriberData;  ref->err = errCode;  releaseSections(signal);  sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal, 	     SubCreateRef::SignalLength, JBB);  return;}/********************************************************** * * Setting upp trigger for subscription * */void SumaParticipant::execSUB_SYNC_REQ(Signal* signal) {  jamEntry();  CRASH_INSERTION(13004);#ifdef EVENT_PH3_DEBUG  ndbout_c("SumaParticipant::execSUB_SYNC_REQ");#endif  SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();  SubscriptionPtr subPtr;  Subscription key;   key.m_subscriptionId = req->subscriptionId;  key.m_subscriptionKey = req->subscriptionKey;    if(!c_subscriptions.find(subPtr, key)){    jam();    sendSubSyncRef(signal, GrepError::SUBSCRIPTION_ID_NOT_FOUND);    return;  }  /**   * @todo Tomas, do you really need to do this?   */  if(subPtr.p->m_subscriptionType == SubCreateReq::TableEvent) {    jam();    subPtr.p->m_subscriberData = req->subscriberData;  }  bool ok = false;  SubscriptionData::Part part = (SubscriptionData::Part)req->part;    Ptr<SyncRecord> syncPtr;  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  switch(part){  case SubscriptionData::MetaData:    ok = true;    jam();    syncPtr.p->startMeta(signal);    break;  case SubscriptionData::TableData: {    ok = true;    jam();    syncPtr.p->startScan(signal);    break;  }  }  ndbrequire(ok);}voidSumaParticipant::sendSubSyncRef(Signal* signal, Uint32 errCode){  jam();  SubSyncRef  * ref =     (SubSyncRef *)signal->getDataPtrSend();  ref->err = errCode;  sendSignal(signal->getSendersBlockRef(), 	     GSN_SUB_SYNC_REF, 	     signal, 	     SubSyncRef::SignalLength,	     JBB);	       releaseSections(signal);    return;}/********************************************************** * Dict interface */voidSumaParticipant::execGET_TABINFOREF(Signal* signal){  jamEntry();  GetTabInfoRef* const ref = (GetTabInfoRef*)signal->getDataPtr();  SyncRecord* tmp = c_syncPool.getPtr(ref->senderData);  tmp->runGET_TABINFOREF(signal);}voidSumaParticipant::execGET_TABINFO_CONF(Signal* signal){  jamEntry();  CRASH_INSERTION(13006);  if(!assembleFragments(signal)){    return;  }    GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();    Uint32 tableId = conf->tableId;  Uint32 senderData = conf->senderData;  SyncRecord* tmp = c_syncPool.getPtr(senderData);  ndbrequire(parseTable(signal, conf, tableId, tmp));  tmp->runGET_TABINFO_CONF(signal);}boolSumaParticipant::parseTable(Signal* signal, GetTabInfoConf* conf, Uint32 tableId,			    SyncRecord* syncPtr_p){  SegmentedSectionPtr ptr;  signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -