⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 5 页
字号:
//  ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();  ndbrequire(false);}voidSumaParticipant::execSCAN_FRAGCONF(Signal* signal){  jamEntry();  CRASH_INSERTION(13011);  ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();    const Uint32 completed = conf->fragmentCompleted;  const Uint32 senderData = conf->senderData;  const Uint32 completedOps = conf->completedOps;  SubscriptionPtr subPtr;  c_subscriptions.getPtr(subPtr, senderData);    if(completed != 2){    jam();    #if PRINT_ONLY    SubSyncContinueConf * const conf =       (SubSyncContinueConf*)signal->getDataPtrSend();      conf->subscriptionId = subPtr.p->m_subscriptionId;    conf->subscriptionKey = subPtr.p->m_subscriptionKey;    execSUB_SYNC_CONTINUE_CONF(signal);#else    SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();    req->subscriberData = subPtr.p->m_subscriberData;    req->noOfRowsSent = completedOps;    sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,	       SubSyncContinueReq::SignalLength, JBB);#endif    return;  }  ndbrequire(completedOps == 0);    SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);    tmp->m_currentFragment++;  tmp->nextScan(signal);}voidSumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){  jamEntry();    CRASH_INSERTION(13012);  SubSyncContinueConf * const conf =     (SubSyncContinueConf*)signal->getDataPtr();      SubscriptionPtr subPtr;  Subscription key;   key.m_subscriptionId = conf->subscriptionId;  key.m_subscriptionKey = conf->subscriptionKey;    ndbrequire(c_subscriptions.find(subPtr, key));  ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();  req->senderData = subPtr.i;  req->closeFlag = 0;  req->transId1 = 0;  req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);  req->batch_size_rows = 16;  req->batch_size_bytes = 0;  sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 	     ScanFragNextReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::completeScan(Signal* signal){  jam();  //  m_tableList.release();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  #if PRINT_ONLY  ndbout_c("GSN_SUB_SYNC_CONF (data)");#else  SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();  conf->subscriptionId = subPtr.p->m_subscriptionId;  conf->subscriptionKey = subPtr.p->m_subscriptionKey;  conf->part = SubscriptionData::TableData;  conf->subscriberData = subPtr.p->m_subscriberData;  suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,		  SubSyncConf::SignalLength, JBB);#endif}voidSumaParticipant::execSCAN_HBREP(Signal* signal){  jamEntry();#if 0  ndbout << "execSCAN_HBREP" << endl << hex;  for(int i = 0; i<signal->length(); i++){    ndbout << signal->theData[i] << " ";    if(((i + 1) % 8) == 0)      ndbout << endl << hex;  }  ndbout << endl;#endif}/********************************************************** * * Suma participant interface * * Creation of subscriber * */voidSumaParticipant::execSUB_START_REQ(Signal* signal){  jamEntry();  DBUG_ENTER("SumaParticipant::execSUB_START_REQ");  CRASH_INSERTION(13013);  if (c_restartLock) {    jam();    //    ndbout_c("c_restartLock");    if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {      jam();      sendSubStartRef(signal, /** Error Code */ 0, true);      DBUG_VOID_RETURN;    }    // only allow other Suma's in the nodegroup to come through for restart purposes  }  Subscription key;   SubStartReq * const req = (SubStartReq*)signal->getDataPtr();  Uint32 senderRef            = req->senderRef;  Uint32 senderData           = req->senderData;  Uint32 subscriberData       = req->subscriberData;  Uint32 subscriberRef        = req->subscriberRef;  SubscriptionData::Part part = (SubscriptionData::Part)req->part;  key.m_subscriptionId        = req->subscriptionId;  key.m_subscriptionKey       = req->subscriptionKey;  SubscriptionPtr subPtr;  if(!c_subscriptions.find(subPtr, key)){    jam();    sendSubStartRef(signal, /** Error Code */ 0);    DBUG_VOID_RETURN;  }    Ptr<SyncRecord> syncPtr;  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  if (syncPtr.p->m_locked) {    jam();#if 0    ndbout_c("Locked");#endif    sendSubStartRef(signal, /** Error Code */ 0, true);    DBUG_VOID_RETURN;  }  syncPtr.p->m_locked = true;  SubscriberPtr subbPtr;  if(!c_subscriberPool.seize(subbPtr)){    jam();    syncPtr.p->m_locked = false;    sendSubStartRef(signal, /** Error Code */ 0);    DBUG_VOID_RETURN;  }  Uint32 type = subPtr.p->m_subscriptionType;  subbPtr.p->m_senderRef  = senderRef;  subbPtr.p->m_senderData = senderData;  switch (type) {  case SubCreateReq::TableEvent:    jam();    // we want the data to return to the API not DICT    subbPtr.p->m_subscriberRef = subscriberRef;    //    ndbout_c("start ref = %u", signal->getSendersBlockRef());    //    ndbout_c("ref = %u", subbPtr.p->m_subscriberRef);    // we use the subscription id for now, should really be API choice    subbPtr.p->m_subscriberData = subscriberData;#if 0    if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {      jam();      for (Uint32 i = 0; i < c_noNodesInGroup; i++) {	Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]);	if (ref != reference()) {	  jam();	  sendSubStartReq(subPtr, subbPtr, signal, ref);	} else	  jam();      }    }#endif    break;  case SubCreateReq::DatabaseSnapshot:  case SubCreateReq::SelectiveTableSnapshot:    jam();    ndbrequire(false);    //subbPtr.p->m_subscriberRef = GREP_REF;    subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;    break;  case SubCreateReq::SingleTableScan:    jam();    subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef;    subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData;  }    subbPtr.p->m_subPtrI = subPtr.i;  subbPtr.p->m_firstGCI = RNIL;  if (type == SubCreateReq::TableEvent)    subbPtr.p->m_lastGCI = 0;  else    subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI  bool ok = false;    switch(part){  case SubscriptionData::MetaData:    ok = true;    jam();    c_metaSubscribers.add(subbPtr);    sendSubStartComplete(signal, subbPtr, 0, part);    break;  case SubscriptionData::TableData:     ok = true;    jam();    c_prepDataSubscribers.add(subbPtr);    syncPtr.p->startTrigger(signal);    break;  }  ndbrequire(ok);  DBUG_VOID_RETURN;}voidSumaParticipant::sendSubStartComplete(Signal* signal,				      SubscriberPtr subbPtr, 				      Uint32 firstGCI,				      SubscriptionData::Part part){  jam();  SubscriptionPtr subPtr;  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);  Ptr<SyncRecord> syncPtr;  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);  syncPtr.p->m_locked = false;  SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();        conf->senderRef       = reference();  conf->senderData      = subbPtr.p->m_senderData;  conf->subscriptionId  = subPtr.p->m_subscriptionId;  conf->subscriptionKey = subPtr.p->m_subscriptionKey;  conf->firstGCI = firstGCI;  conf->part = (Uint32) part;    conf->subscriberData = subPtr.p->m_subscriberData;  sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal,	     SubStartConf::SignalLength, JBB);}#if 0voidSumaParticipant::sendSubStartRef(SubscriptionPtr subPtr,				 Signal* signal, Uint32 errCode,				 bool temporary){  jam();  SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();  xxx ref->senderRef       = reference();  xxx ref->senderData      = subPtr.p->m_senderData;  ref->subscriptionId  = subPtr.p->m_subscriptionId;  ref->subscriptionKey = subPtr.p->m_subscriptionKey;  ref->part            = (Uint32) subPtr.p->m_subscriptionType;  ref->subscriberData  = subPtr.p->m_subscriberData;  ref->err             = errCode;  if (temporary) {    jam();    ref->setTemporary();  }  releaseSections(signal);  sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal, 	     SubStartRef::SignalLength, JBB);}#endifvoidSumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode,				 bool temporary){  jam();  SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();  ref->senderRef  = reference();  ref->err = errCode;  if (temporary) {    jam();    ref->setTemporary();  }  releaseSections(signal);  sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, 	     SubStartRef::SignalLength, JBB);}/********************************************************** * * Trigger admin interface * */voidSumaParticipant::SyncRecord::startTrigger(Signal* signal){  jam();  m_currentTable = 0;  m_latestTriggerId = RNIL;  nextTrigger(signal);}voidSumaParticipant::SyncRecord::nextTrigger(Signal* signal){  jam();  TableList::DataBufferIterator it;    if(!m_tableList.position(it, m_currentTable)){    completeTrigger(signal);    return;  }  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  const Uint32 RT_BREAK = 48;  Uint32 latestTriggerId = 0;  for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){       TablePtr tabPtr;#if 0    ndbout_c("nextTrigger tableid %u", *it.data);#endif    ndbrequire(suma.c_tables.find(tabPtr, *it.data));    AttributeMask attrMask;    createAttributeMask(attrMask, tabPtr.p);    for(Uint32 j = 0; j<3; j++){      i++;      latestTriggerId = (tabPtr.p->m_schemaVersion << 18) |	(j << 16) | tabPtr.p->m_tableId;      if(tabPtr.p->m_hasTriggerDefined[j] == 0) {	ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID);#if 0	ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j);#endif	CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();	req->setUserRef(SUMA_REF);	req->setConnectionPtr(ptrI);	req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);	req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);	req->setMonitorReplicas(true);	req->setMonitorAllAttributes(false);	req->setReceiverRef(SUMA_REF);	req->setTriggerId(latestTriggerId);	req->setTriggerEvent((TriggerEvent::Value)j);	req->setTableId(tabPtr.p->m_tableId);	req->setAttributeMask(attrMask);	suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, 			signal, CreateTrigReq::SignalLength, JBB);      } else {	/**	 * Faking that a trigger has been created in order to	 * simulate the proper behaviour.	 * Perhaps this should be a dummy signal instead of 	 * (ab)using CREATE_TRIG_CONF.	 */ 	CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend();	conf->setConnectionPtr(ptrI);	conf->setTableId(tabPtr.p->m_tableId);	conf->setTriggerId(latestTriggerId);	suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF,			signal, CreateTrigConf::SignalLength, JBB);	        }    }    m_currentTable++;  }  m_latestTriggerId = latestTriggerId;}voidSumaParticipant::SyncRecord::createAttributeMask(AttributeMask& mask, 						 Table * table){  jam();  mask.clear();  DataBuffer<15>::DataBufferIterator it;  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes);  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){    mask.set(* it.data);  }}voidSumaParticipant::SyncRecord::runCREATE_TRIG_CONF(Signal* signal){  jam();    CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();  const Uint32 triggerId = conf->getTriggerId();  Uint32 type = (triggerId >> 16) & 0x3;  Uint32 tableId = conf->getTableId();    TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));  ndbrequire(type < 3);  tabPtr.p->m_triggerIds[type] = triggerId;  tabPtr.p->m_hasTriggerDefined[type]++;  if(triggerId == m_latestTriggerId){    jam();    nextTrigger(signal);  }}voidSumaParticipant::SyncRecord::completeTrigger(Signal* signal){  jam();  SubscriptionPtr subPtr;  CRASH_INSERTION(13013);#ifdef EVENT_PH3_DEBUG  ndbout_c("SumaParticipant: trigger completed");#endif  Uint32 gci;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  SubscriberPtr subbPtr;  {    bool found = false;    for(suma.c_prepDataSubscribers.first(subbPtr);	!subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) {      jam();      if(subbPtr.p->m_subPtrI == subPtr.i) {	jam();	found = true;	break;      }    }    ndbrequire(found);    gci = suma.getFirstGCI(signal);    subbPtr.p->m_firstGCI = gci;    suma.c_prepDataSubscribers.remove(subbPtr);    suma.c_dataSubscribers.add(subbPtr);  }  suma.sendSubStartComplete(signal, subbPtr, gci,  SubscriptionData::TableData);}voidSumaParticipant::SyncRecord::startDropTrigger(Signal* signal){  jam();  m_currentTable = 0;  m_latestTriggerId = RNIL;  nextDropTrigger(signal);}voidSumaParticipant::SyncRecord::nextDropTrigger(Signal* signal){  jam();  TableList::DataBufferIterator it;    if(!m_tableList.position(it, m_currentTable)){    completeDropTrigger(signal);    return;  }  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  const Uint32 RT_BREAK = 48;  Uint32 latestTriggerId = 0;  for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){    jam();    TablePtr tabPtr;#if 0    ndbout_c("nextDropTrigger tableid %u", *it.data);#endif    ndbrequire(suma.c_tables.find(tabPtr, * it.data));    for(Uint32 j = 0; j<3; j++){      jam();      ndbrequire(tab

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -