⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.27版本源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
    SimplePropertiesSectionReader it(ptr, getSectionSegmentPool());    SimpleProperties::UnpackStatus s;  DictTabInfo::Table tableDesc; tableDesc.init();  s = SimpleProperties::unpack(it, &tableDesc, 			       DictTabInfo::TableMapping, 			       DictTabInfo::TableMappingSize, 			       true, true);    ndbrequire(s == SimpleProperties::Break);  TablePtr tabPtr;  c_tables.find(tabPtr, tableId);    if(!tabPtr.isNull() &&     tabPtr.p->m_schemaVersion != tableDesc.TableVersion){    jam();    tabPtr.p->release(* this);    // oops wrong schema version in stored tabledesc    // we need to find all subscriptions with old table desc    // and all subscribers to this    // hopefully none    c_tables.release(tabPtr);    tabPtr.setNull();    DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr;    c_subscriptions.first(i_subPtr);    SubscriptionPtr subPtr;    for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){      jam();      c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);      SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);      if (tmp == syncPtr_p) {	jam();	continue;      }      if (subPtr.p->m_tables[tableId]) {	jam();	subPtr.p->m_tables[tableId] = 0; // remove this old table reference	TableList::DataBufferIterator it;	for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {	  jam();	  if (*it.data == tableId){	    jam();	    Uint32 *pdata = it.data;	    tmp->m_tableList.next(it);	    for(;!it.isNull();tmp->m_tableList.next(it)) {	      jam();	      *pdata = *it.data;	      pdata = it.data;	    }	    *pdata = RNIL; // todo remove this last item...	    break;	  }	}      }    }  }  if (tabPtr.isNull()) {    jam();    /**     * Uninitialized table record     */    ndbrequire(c_tables.seize(tabPtr));    new (tabPtr.p) Table;    tabPtr.p->m_schemaVersion = RNIL;    tabPtr.p->m_tableId = tableId;    tabPtr.p->m_hasTriggerDefined[0] = 0;    tabPtr.p->m_hasTriggerDefined[1] = 0;    tabPtr.p->m_hasTriggerDefined[2] = 0;    tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID;    tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID;    tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;#if 0    ndbout_c("Get tab info conf %d", tableId);#endif    c_tables.add(tabPtr);  }  if(tabPtr.p->m_attributes.getSize() != 0){    jam();    return true;  }  /**   * Initialize table object   */  Uint32 noAttribs = tableDesc.NoOfAttributes;  Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);  tabPtr.p->m_schemaVersion = tableDesc.TableVersion;    // The attribute buffer  LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes);    // Temporary buffer  DataBuffer<15> theRest(c_dataBufferPool);  if(!attrBuf.seize(noAttribs)){    ndbrequire(false);    return false;  }    if(!theRest.seize(notFixed)){    ndbrequire(false);    return false;  }    DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable  DataBuffer<15>::DataBufferIterator restIt; // variable + nullable  attrBuf.first(attrIt);  theRest.first(restIt);    for(Uint32 i = 0; i < noAttribs; i++) {    DictTabInfo::Attribute attrDesc; attrDesc.init();    s = SimpleProperties::unpack(it, &attrDesc, 				 DictTabInfo::AttributeMapping, 				 DictTabInfo::AttributeMappingSize, 				 true, true);    ndbrequire(s == SimpleProperties::Break);    if (!attrDesc.AttributeNullableFlag 	/* && !attrDesc.AttributeVariableFlag */) {      jam();      * attrIt.data = attrDesc.AttributeId;      attrBuf.next(attrIt);    } else {      jam();      * restIt.data = attrDesc.AttributeId;      theRest.next(restIt);    }        // Move to next attribute    it.next();  }  /**   * Put the rest in end of attrBuf   */  theRest.first(restIt);  for(; !restIt.isNull(); theRest.next(restIt)){    * attrIt.data = * restIt.data;    attrBuf.next(attrIt);  }  theRest.release();    return true;}voidSumaParticipant::execDI_FCOUNTCONF(Signal* signal){  jamEntry();    CRASH_INSERTION(13007);  const Uint32 senderData = signal->theData[3];  SyncRecord* tmp = c_syncPool.getPtr(senderData);  tmp->runDI_FCOUNTCONF(signal);}void SumaParticipant::execDIGETPRIMCONF(Signal* signal){  jamEntry();    CRASH_INSERTION(13008);  const Uint32 senderData = signal->theData[1];  SyncRecord* tmp = c_syncPool.getPtr(senderData);  tmp->runDIGETPRIMCONF(signal);}/************************************************************************* * * */voidSumaParticipant::SyncRecord::startMeta(Signal* signal){  jam();  m_currentTable = 0;  nextMeta(signal);}/** * m_tableList only contains UserTables */voidSumaParticipant::SyncRecord::nextMeta(Signal* signal){  jam();    TableList::DataBufferIterator it;  if(!m_tableList.position(it, m_currentTable)){    completeMeta(signal);    return;  }  GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();  req->senderRef = suma.reference();  req->senderData = ptrI;  req->requestType =     GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;  req->tableId = * it.data;#if 0  ndbout_c("GET_TABINFOREQ id %d", req->tableId);#endif  suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, 		  GetTabInfoReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal){  jam();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  Uint32 type = subPtr.p->m_subscriptionType;  bool do_continue = false;  switch (type) {  case SubCreateReq::TableEvent:    jam();    break;  case SubCreateReq::DatabaseSnapshot:    jam();    do_continue = true;    break;  case SubCreateReq::SelectiveTableSnapshot:    jam();    do_continue = true;    break;  case SubCreateReq::SingleTableScan:    jam();    break;  default:    ndbrequire(false);    break;  }  if (! do_continue) {    m_error = true;    completeMeta(signal);    return;  }  m_currentTable++;  nextMeta(signal);  return;  // now we need to clean-up}voidSumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){  jam();    GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr();  //  const Uint32 gci = conf->gci;  const Uint32 tableId = conf->tableId;  TableList::DataBufferIterator it;    ndbrequire(m_tableList.position(it, m_currentTable));  ndbrequire(* it.data == tableId);    SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);    SegmentedSectionPtr ptr;  signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);  bool okToSend = m_doSendSyncData;  /*   * If it is a selectivetablesnapshot and the table is not part of the    * subscription, then do not send anything, just continue.   * If it is a tablevent, don't send regardless since the APIs are not   * interested in meta data.   */  if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot)    if(!subPtr.p->m_tables[tableId])      okToSend = false;  if(okToSend) {    if(refToNode(subPtr.p->m_subscriberRef) == 0){      jam();      suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef),			  GSN_SUB_META_DATA,			  signal, 			  SubMetaData::SignalLength);       jamEntry();      suma.releaseSections(signal);    } else {      jam();      suma.sendSignal(subPtr.p->m_subscriberRef, 		      GSN_SUB_META_DATA,		      signal, 		      SubMetaData::SignalLength, JBB);    }  }    TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);  if(fragBuf.getSize() == 0){    /**     * We need to gather fragment info     */    jam();    signal->theData[0] = RNIL;    signal->theData[1] = tableId;    signal->theData[2] = ptrI;    suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB);        return;  }    m_currentTable++;  nextMeta(signal);}void SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){  jam();  const Uint32 userPtr = signal->theData[0];  const Uint32 fragCount = signal->theData[1];  const Uint32 tableId = signal->theData[2];  ndbrequire(userPtr == RNIL && signal->length() == 5);  TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));    LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);    ndbrequire(fragBuf.getSize() == 0);    m_currentFragment = fragCount;  signal->theData[0] = RNIL;  signal->theData[1] = ptrI;  signal->theData[2] = tableId;  signal->theData[3] = 0; // Frag no  suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){  jam();  const Uint32 userPtr = signal->theData[0];  //const Uint32 senderData = signal->theData[1];  const Uint32 nodeCount = signal->theData[6];  const Uint32 tableId = signal->theData[7];  const Uint32 fragNo = signal->theData[8];    ndbrequire(userPtr == RNIL && signal->length() == 9);  ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);    TablePtr tabPtr;  ndbrequire(suma.c_tables.find(tabPtr, tableId));  LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool,  tabPtr.p->m_fragments);    /**   * Add primary node for fragment to list   */  FragmentDescriptor fd;  fd.m_fragDesc.m_nodeId = signal->theData[2];  fd.m_fragDesc.m_fragmentNo = fragNo;  signal->theData[2] = fd.m_dummy;  fragBuf.append(&signal->theData[2], 1);    const Uint32 nextFrag = fragNo + 1;  if(nextFrag == m_currentFragment){    /**     * Complete frag info for table     */    m_currentTable++;    nextMeta(signal);    return;  }  signal->theData[0] = RNIL;  signal->theData[1] = ptrI;  signal->theData[2] = tableId;  signal->theData[3] = nextFrag; // Frag no  suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::completeMeta(Signal* signal){  jam();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  ndbrequire(subPtr.p->m_syncPtrI == ptrI);  #if PRINT_ONLY  ndbout_c("GSN_SUB_SYNC_CONF (meta)");#else   suma.releaseSections(signal);  if (m_error) {    SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();    ref->subscriptionId = subPtr.p->m_subscriptionId;    ref->subscriptionKey = subPtr.p->m_subscriptionKey;    ref->part = SubscriptionData::MetaData;    ref->subscriberData = subPtr.p->m_subscriberData;    ref->errorCode = SubSyncRef::Undefined;    suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal,		    SubSyncRef::SignalLength, JBB);  } else {    SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();    conf->subscriptionId = subPtr.p->m_subscriptionId;    conf->subscriptionKey = subPtr.p->m_subscriptionKey;    conf->part = SubscriptionData::MetaData;    conf->subscriberData = subPtr.p->m_subscriberData;    suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal,		    SubSyncConf::SignalLength, JBB);  }#endif}/********************************************************** * * Scan interface * */voidSumaParticipant::SyncRecord::startScan(Signal* signal){  jam();    /**   * Get fraginfo   */  m_currentTable = 0;  m_currentFragment = 0;    nextScan(signal);}boolSumaParticipant::SyncRecord::getNextFragment(TablePtr * tab, 					     FragmentDescriptor * fd){  jam();  SubscriptionPtr subPtr;  suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);  TableList::DataBufferIterator tabIt;  DataBuffer<15>::DataBufferIterator fragIt;    m_tableList.position(tabIt, m_currentTable);  for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){    TablePtr tabPtr;    ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -