⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.27版本源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
    if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)       {	if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {	  *tab = tabPtr;	  return true;	}      }    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);        fragBuf.position(fragIt, m_currentFragment);    for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){      FragmentDescriptor tmp;      tmp.m_dummy = * fragIt.data;      if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){	* fd = tmp;	* tab = tabPtr;	return true;      }    }    m_currentFragment = 0;  }  return false;}voidSumaParticipant::SyncRecord::nextScan(Signal* signal){  jam();  TablePtr tabPtr;  FragmentDescriptor fd;  SubscriptionPtr subPtr;  if(!getNextFragment(&tabPtr, &fd)){    jam();    completeScan(signal);    return;  }  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);   if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) {    jam();    if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {      /*       * table is not part of the subscription. Check next table       */      m_currentTable++;      nextScan(signal);      return;    }  }  DataBuffer<15>::Head head = m_attributeList;  if(head.getSize() == 0){    head = tabPtr.p->m_attributes;  }  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);    ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();  const Uint32 parallelism = 16;  const Uint32 attrLen = 5 + attrBuf.getSize();  req->senderData = m_subscriptionPtrI;  req->resultRef = suma.reference();  req->tableId = tabPtr.p->m_tableId;  req->requestInfo = 0;  req->savePointId = 0;  ScanFragReq::setLockMode(req->requestInfo, 0);  ScanFragReq::setHoldLockFlag(req->requestInfo, 1);  ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);  ScanFragReq::setAttrLen(req->requestInfo, attrLen);  req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;  req->schemaVersion = tabPtr.p->m_schemaVersion;  req->transId1 = 0;  req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);  req->clientOpPtr = (ptrI << 16);  req->batch_size_rows= 16;  req->batch_size_bytes= 0;  suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 		  ScanFragReq::SignalLength, JBB);    signal->theData[0] = ptrI;  signal->theData[1] = 0;  signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);    // Return all  signal->theData[3] = attrBuf.getSize();  signal->theData[4] = 0;  signal->theData[5] = 0;  signal->theData[6] = 0;  signal->theData[7] = 0;    Uint32 dataPos = 8;  DataBuffer<15>::DataBufferIterator it;  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){    AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);    if(dataPos == 25){      suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB);	dataPos = 3;    }  }  if(dataPos != 3){    suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);  }    m_currentTableId = tabPtr.p->m_tableId;  m_currentNoOfAttributes = attrBuf.getSize();        }voidSumaParticipant::execSCAN_FRAGREF(Signal* signal){  jamEntry();//  ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();  ndbrequire(false);}voidSumaParticipant::execSCAN_FRAGCONF(Signal* signal){  jamEntry();  CRASH_INSERTION(13011);  ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();    const Uint32 completed = conf->fragmentCompleted;  const Uint32 senderData = conf->senderData;  const Uint32 completedOps = conf->completedOps;  SubscriptionPtr subPtr;  c_subscriptions.getPtr(subPtr, senderData);    if(completed != 2){    jam();    #if PRINT_ONLY    SubSyncContinueConf * const conf =       (SubSyncContinueConf*)signal->getDataPtrSend();      conf->subscriptionId = subPtr.p->m_subscriptionId;    conf->subscriptionKey = subPtr.p->m_subscriptionKey;    execSUB_SYNC_CONTINUE_CONF(signal);#else    SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();    req->subscriberData = subPtr.p->m_subscriberData;    req->noOfRowsSent = completedOps;    sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,	       SubSyncContinueReq::SignalLength, JBB);#endif    return;  }  ndbrequire(completedOps == 0);    SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);    tmp->m_currentFragment++;  tmp->nextScan(signal);}voidSumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){  jamEntry();    CRASH_INSERTION(13012);  SubSyncContinueConf * const conf =     (SubSyncContinueConf*)signal->getDataPtr();      SubscriptionPtr subPtr;  Subscription key;   key.m_subscriptionId = conf->subscriptionId;  key.m_subscriptionKey = conf->subscriptionKey;    ndbrequire(c_subscriptions.find(subPtr, key));  ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();  req->senderData = subPtr.i;  req->closeFlag = 0;  req->transId1 = 0;  req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);  req->batch_size_rows = 16;  req->batch_size_bytes = 0;  sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, 	     ScanFragNextReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::completeScan(Signal* signal){  jam();  //  m_tableList.release();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  #if PRINT_ONLY  ndbout_c("GSN_SUB_SYNC_CONF (data)");#else  SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();  conf->subscriptionId = subPtr.p->m_subscriptionId;  conf->subscriptionKey = subPtr.p->m_subscriptionKey;  conf->part = SubscriptionData::TableData;  conf->subscriberData = subPtr.p->m_subscriberData;  suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,		  SubSyncConf::SignalLength, JBB);#endif}voidSumaParticipant::execSCAN_HBREP(Signal* signal){  jamEntry();#if 0  ndbout << "execSCAN_HBREP" << endl << hex;  for(int i = 0; i<signal->length(); i++){    ndbout << signal->theData[i] << " ";    if(((i + 1) % 8) == 0)      ndbout << endl << hex;  }  ndbout << endl;#endif}/********************************************************** * Scan data interface * * Assumption: one execTRANSID_AI contains all attr info * */#define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS#define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1static Uint32 f_bufferLock = 0;static Uint32 f_buffer[SUMA_BUF_SZ];static Uint32 f_trigBufferSize = 0;static Uint32 b_bufferLock = 0;static Uint32 b_buffer[SUMA_BUF_SZ];static Uint32 b_trigBufferSize = 0;voidSumaParticipant::execTRANSID_AI(Signal* signal){  jamEntry();  CRASH_INSERTION(13015);  TransIdAI * const data = (TransIdAI*)signal->getDataPtr();  const Uint32 opPtrI = data->connectPtr;  const Uint32 length = signal->length() - 3;  if(f_bufferLock == 0){    f_bufferLock = opPtrI;  } else {    ndbrequire(f_bufferLock == opPtrI);  }    Ptr<SyncRecord> syncPtr;  c_syncPool.getPtr(syncPtr, (opPtrI >> 16));    Uint32 sum = 0;  Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;  Uint32 * headers = f_buffer;  const Uint32 * src = &data->attrData[0];  const Uint32 * const end = &src[length];    const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;  for(Uint32 i = 0; i<attribs; i++){    Uint32 tmp = * src++;    * headers++ = tmp;    Uint32 len = AttributeHeader::getDataSize(tmp);        memcpy(dst, src, 4 * len);    dst += len;    src += len;    sum += len;  }    ndbrequire(src == end);  /**   * Send data to subscriber   */  LinearSectionPtr ptr[3];  ptr[0].p = f_buffer;  ptr[0].sz = attribs;    ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;  ptr[1].sz = sum;  SubscriptionPtr subPtr;  c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);    /**   * Initialize signal   */    SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();  Uint32 ref = subPtr.p->m_subscriberRef;  sdata->tableId = syncPtr.p->m_currentTableId;  sdata->senderData = subPtr.p->m_subscriberData;  sdata->operation = 3; // Scan  sdata->gci = 1; // Undefined#if PRINT_ONLY  ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);#else  sendSignal(ref,	     GSN_SUB_TABLE_DATA,	     signal, 	     SubTableData::SignalLength, JBB,	     ptr, 2);#endif    /**   * Reset f_bufferLock   */  f_bufferLock = 0;}/************************************************************** * * Removing subscription * */voidSumaParticipant::execSUB_REMOVE_REQ(Signal* signal) {  jamEntry();  Uint32 senderRef = signal->getSendersBlockRef();  CRASH_INSERTION(13021);  const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();  SubscriptionPtr subPtr;  Subscription key;  key.m_subscriptionId  = req.subscriptionId;  key.m_subscriptionKey = req.subscriptionKey;    if(!c_subscriptions.find(subPtr, key)) {    jam();    sendSubRemoveRef(signal, req, (Uint32) GrepError::SUBSCRIPTION_ID_NOT_FOUND);    return;  }    int count = 0;  {    jam();    SubscriberPtr i_subbPtr;    c_metaSubscribers.first(i_subbPtr);    while(!i_subbPtr.isNull()){      jam();      SubscriberPtr subbPtr = i_subbPtr;      c_metaSubscribers.next(i_subbPtr);      if( subbPtr.p->m_subPtrI == subPtr.i ){	jam();	c_metaSubscribers.release(subbPtr);      }    }  }  subPtr.p->m_senderRef  = senderRef;  subPtr.p->m_senderData = req.senderData;  completeSubRemoveReq(signal, subPtr);}voidSumaParticipant::completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr) {  Uint32 subscriptionId  = subPtr.p->m_subscriptionId;  Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;  Uint32 senderRef       = subPtr.p->m_senderRef;  Uint32 senderData      = subPtr.p->m_senderData;  {    Ptr<SyncRecord> syncPtr;    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);	    syncPtr.p->release();    c_syncPool.release(syncPtr);  }  //  if (subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {  //    jam();  //    senderRef = subPtr.p->m_subscriberRef;  //  }  c_subscriptions.release(subPtr);  /**   * I was the last subscription to be remove so clear c_tables   */#if 0  ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",	   c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());#endif  if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) {    jam();#if 0    ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");#endif    KeyTable<Table>::Iterator it;    for(c_tables.first(it); !it.isNull(); ){            it.curr.p->release(* this);            TablePtr tabPtr = it.curr;            c_tables.next(it);      c_tables.release(tabPtr);    }  }    SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();  conf->senderRef            = reference();  conf->senderData           = senderData;  conf->subscriptionId       = subscriptionId;  conf->subscriptionKey      = subscriptionKey;  sendSignal(senderRef, GSN_SUB_REMOVE_CONF, signal,	     SubRemoveConf::SignalLength, JBB);}voidSumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,				  Uint32 errCode, bool temporary){  jam();  SubRemoveRef  * ref = (SubRemoveRef *)signal->getDataPtrSend();  ref->senderRef  = reference();  ref->subscriptionId = req.subscriptionId;  ref->subscriptionKey = req.subscriptionKey;  ref->senderData = req.senderData;  ref->err = errCode;  if (temporary)    ref->setTemporary();  releaseSections(signal);  sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF, 	     signal, SubRemoveRef::SignalLength, JBB);  return;}voidSumaParticipant::Table::release(SumaParticipant & suma){  jam();  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);  attrBuf.release();  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);  fragBuf.release();}voidSumaParticipant::SyncRecord::release(){  jam();  m_tableList.release();  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);  attrBuf.release();  }template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -