⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 5 页
字号:
  tmp->runCREATE_TRIG_CONF(signal);    /**   * dodido   * @todo: I (Johan) dont know what to do here. Jonas, what do you mean?   */  DBUG_VOID_RETURN;}voidSumaParticipant::execCREATE_TRIG_REF(Signal* signal){  jamEntry();  ndbrequire(false);}voidSumaParticipant::execDROP_TRIG_CONF(Signal* signal){  jamEntry();  DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");  CRASH_INSERTION(13010);  DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();  const Uint32 senderData = conf->getConnectionPtr();  SyncRecord* tmp = c_syncPool.getPtr(senderData);  tmp->runDROP_TRIG_CONF(signal);  DBUG_VOID_RETURN;}voidSumaParticipant::execDROP_TRIG_REF(Signal* signal){  jamEntry();  DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF");  DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();  const Uint32 senderData = ref->getConnectionPtr();  SyncRecord* tmp = c_syncPool.getPtr(senderData);  tmp->runDROP_TRIG_CONF(signal);  DBUG_VOID_RETURN;}/************************************************************************* * * */voidSumaParticipant::SyncRecord::runLIST_TABLES_CONF(Signal* signal){  jam();  ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();  const Uint32 len = signal->length() - ListTablesConf::HeaderLength;  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  for (unsigned i = 0; i < len; i++) {    subPtr.p->m_maxTables++;    suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);  }  //  for (unsigned i = 0; i < len; i++)  //    conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);  //  m_tableList.append(&conf->tableData[0], len);#if 0   TableList::DataBufferIterator it;  int i = 0;  for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {    ndbout_c("%u listtableconf tableid %d", i++, *it.data);  }#endif  if(len == ListTablesConf::DataLength){    jam();    // we expect more LIST_TABLE_CONF    return;  }#if 0  subPtr.p->m_currentTable = 0;  subPtr.p->m_maxTables    = 0;  TableList::DataBufferIterator it;  for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {    subPtr.p->m_maxTables++;    suma.addTableId(*it.data, subPtr, NULL);#ifdef NODEFAIL_DEBUG    ndbout_c(" listtableconf tableid %d",*it.data);#endif  }#endif    startMeta(signal);}voidSumaParticipant::SyncRecord::startMeta(Signal* signal){  jam();  m_currentTable = 0;  nextMeta(signal);}/** * m_tableList only contains UserTables */voidSumaParticipant::SyncRecord::nextMeta(Signal* signal){  jam();    TableList::DataBufferIterator it;  if(!m_tableList.position(it, m_currentTable)){    completeMeta(signal);    return;  }  GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();  req->senderRef = suma.reference();  req->senderData = ptrI;  req->requestType =     GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;  req->tableId = * it.data;#if 0  ndbout_c("GET_TABINFOREQ id %d", req->tableId);#endif  suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 		  GetTabInfoReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal){  jam();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  Uint32 type = subPtr.p->m_subscriptionType;  bool do_continue = false;  switch (type) {  case SubCreateReq::TableEvent:    jam();    break;  case SubCreateReq::DatabaseSnapshot:    jam();    do_continue = true;    break;  case SubCreateReq::SelectiveTableSnapshot:    jam();    do_continue = true;    break;  case SubCreateReq::SingleTableScan:    jam();    break;  default:    ndbrequire(false);    break;  }  if (! do_continue) {    m_error = true;    completeMeta(signal);    return;  }  m_currentTable++;  nextMeta(signal);  return;  // now we need to clean-up}voidSumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){  jam();    GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr();  //  const Uint32 gci = conf->gci;  const Uint32 tableId = conf->tableId;  TableList::DataBufferIterator it;    ndbrequire(m_tableList.position(it, m_currentTable));  ndbrequire(* it.data == tableId);    SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);    SegmentedSectionPtr ptr;  signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);  SubMetaData * data = (SubMetaData*)signal->getDataPtrSend();  /**    * sending lastCompleteGCI. Used by Lars in interval calculations   * incremenet by one, since last_CompleteGCI is the not the current gci.   */  data->gci = suma.c_lastCompleteGCI + 1;  data->tableId = tableId;  data->senderData = subPtr.p->m_subscriberData;#if PRINT_ONLY  ndbout_c("GSN_SUB_META_DATA Table %d", tableId);#else  bool okToSend = m_doSendSyncData;  /*   * If it is a selectivetablesnapshot and the table is not part of the    * subscription, then do not send anything, just continue.   * If it is a tablevent, don't send regardless since the APIs are not   * interested in meta data.   */  if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)    if(!subPtr.p->m_tables[tableId])      okToSend = false;  if(okToSend) {    if(refToNode(subPtr.p->m_subscriberRef) == 0){      jam();      suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef),			  GSN_SUB_META_DATA,			  signal, 			  SubMetaData::SignalLength);       jamEntry();      suma.releaseSections(signal);    } else {      jam();      suma.sendSignal(subPtr.p->m_subscriberRef, 		      GSN_SUB_META_DATA,		      signal, 		      SubMetaData::SignalLength, JBB);    }  }#endif    TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);  if(fragBuf.getSize() == 0){    /**     * We need to gather fragment info     */    jam();    signal->theData[0] = RNIL;    signal->theData[1] = tableId;    signal->theData[2] = ptrI;    suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);        return;  }    m_currentTable++;  nextMeta(signal);}void SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){  jam();  const Uint32 userPtr = signal->theData[0];  const Uint32 fragCount = signal->theData[1];  const Uint32 tableId = signal->theData[2];  ndbrequire(userPtr == RNIL && signal->length() == 5);  TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);    ndbrequire(fragBuf.getSize() == 0);    m_currentFragment = fragCount;  signal->theData[0] = RNIL;  signal->theData[1] = ptrI;  signal->theData[2] = tableId;  signal->theData[3] = 0; // Frag no  suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){  jam();  const Uint32 userPtr = signal->theData[0];  //const Uint32 senderData = signal->theData[1];  const Uint32 nodeCount = signal->theData[6];  const Uint32 tableId = signal->theData[7];  const Uint32 fragNo = signal->theData[8];    ndbrequire(userPtr == RNIL && signal->length() == 9);  ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);    TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);    /**   * Add primary node for fragment to list   */  FragmentDescriptor fd;  fd.m_fragDesc.m_nodeId = signal->theData[2];  fd.m_fragDesc.m_fragmentNo = fragNo;  signal->theData[2] = fd.m_dummy;  fragBuf.append(&signal->theData[2], 1);    const Uint32 nextFrag = fragNo + 1;  if(nextFrag == m_currentFragment){    /**     * Complete frag info for table     */    m_currentTable++;    nextMeta(signal);    return;  }  signal->theData[0] = RNIL;  signal->theData[1] = ptrI;  signal->theData[2] = tableId;  signal->theData[3] = nextFrag; // Frag no  suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::completeMeta(Signal* signal){  jam();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  #if PRINT_ONLY  ndbout_c("GSN_SUB_SYNC_CONF (meta)");#else   suma.releaseSections(signal);  if (m_error) {    SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();    ref->subscriptionId = subPtr.p->m_subscriptionId;    ref->subscriptionKey = subPtr.p->m_subscriptionKey;    ref->part = SubscriptionData::MetaData;    ref->subscriberData = subPtr.p->m_subscriberData;    ref->errorCode = SubSyncRef::Undefined;    suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal,		    SubSyncRef::SignalLength, JBB);  } else {    SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();    conf->subscriptionId = subPtr.p->m_subscriptionId;    conf->subscriptionKey = subPtr.p->m_subscriptionKey;    conf->part = SubscriptionData::MetaData;    conf->subscriberData = subPtr.p->m_subscriberData;    suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,		    SubSyncConf::SignalLength, JBB);  }#endif}/********************************************************** * * Scan interface * */voidSumaParticipant::SyncRecord::startScan(Signal* signal){  jam();    /**   * Get fraginfo   */  m_currentTable = 0;  m_currentFragment = 0;    nextScan(signal);}boolSumaParticipant::SyncRecord::getNextFragment(TablePtr * tab, 					     FragmentDescriptor * fd){  jam();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  TableList::DataBufferIterator tabIt;  DataBuffer<15>::DataBufferIterator fragIt;    m_tableList.position(tabIt, m_currentTable);  for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){    TablePtr tabPtr;    ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));    if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)       {	if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {	  *tab = tabPtr;	  return true;	}      }    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);        fragBuf.position(fragIt, m_currentFragment);    for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){      FragmentDescriptor tmp;      tmp.m_dummy = * fragIt.data;      if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){	* fd = tmp;	* tab = tabPtr;	return true;      }    }    m_currentFragment = 0;  }  return false;}voidSumaParticipant::SyncRecord::nextScan(Signal* signal){  jam();  TablePtr tabPtr;  FragmentDescriptor fd;  SubscriptionPtr subPtr;  if(!getNextFragment(&tabPtr, &fd)){    jam();    completeScan(signal);    return;  }  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);   if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) {    jam();    if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) {      /*       * table is not part of the subscription. Check next table       */      m_currentTable++;      nextScan(signal);      return;    }  }  DataBuffer<15>::Head head = m_attributeList;  if(head.getSize() == 0){    head = tabPtr.p->m_attributes;  }  LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);    ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();  const Uint32 parallelism = 16;  const Uint32 attrLen = 5 + attrBuf.getSize();  req->senderData = m_subscriptionPtrI;  req->resultRef = suma.reference();  req->tableId = tabPtr.p->m_tableId;  req->requestInfo = 0;  req->savePointId = 0;  ScanFragReq::setLockMode(req->requestInfo, 0);  ScanFragReq::setHoldLockFlag(req->requestInfo, 1);  ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);  ScanFragReq::setAttrLen(req->requestInfo, attrLen);  req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;  req->schemaVersion = tabPtr.p->m_schemaVersion;  req->transId1 = 0;  req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);  req->clientOpPtr = (ptrI << 16);  req->batch_size_rows= 16;  req->batch_size_bytes= 0;  suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 		  ScanFragReq::SignalLength, JBB);    signal->theData[0] = ptrI;  signal->theData[1] = 0;  signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);    // Return all  signal->theData[3] = attrBuf.getSize();  signal->theData[4] = 0;  signal->theData[5] = 0;  signal->theData[6] = 0;  signal->theData[7] = 0;    Uint32 dataPos = 8;  DataBuffer<15>::DataBufferIterator it;  for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){    AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);    if(dataPos == 25){      suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB);	dataPos = 3;    }  }  if(dataPos != 3){    suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);  }    m_currentTableId = tabPtr.p->m_tableId;  m_currentNoOfAttributes = attrBuf.getSize();        }voidSumaParticipant::execSCAN_FRAGREF(Signal* signal){  jamEntry();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -