📄 suma.cpp
字号:
SimplePropertiesSectionReader it(ptr, getSectionSegmentPool()); SimpleProperties::UnpackStatus s; DictTabInfo::Table tableDesc; tableDesc.init(); s = SimpleProperties::unpack(it, &tableDesc, DictTabInfo::TableMapping, DictTabInfo::TableMappingSize, true, true); ndbrequire(s == SimpleProperties::Break); TablePtr tabPtr; c_tables.find(tabPtr, tableId); if(!tabPtr.isNull() && tabPtr.p->m_schemaVersion != tableDesc.TableVersion){ jam(); tabPtr.p->release(* this); // oops wrong schema version in stored tabledesc // we need to find all subscriptions with old table desc // and all subscribers to this // hopefully none c_tables.release(tabPtr); tabPtr.setNull(); DLHashTable<SumaParticipant::Subscription>::Iterator i_subPtr; c_subscriptions.first(i_subPtr); SubscriptionPtr subPtr; for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){ jam(); c_subscriptions.getPtr(subPtr, i_subPtr.curr.i); SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI); if (tmp == syncPtr_p) { jam(); continue; } if (subPtr.p->m_tables[tableId]) { jam(); subPtr.p->m_tables[tableId] = 0; // remove this old table reference TableList::DataBufferIterator it; for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) { jam(); if (*it.data == tableId){ jam(); Uint32 *pdata = it.data; tmp->m_tableList.next(it); for(;!it.isNull();tmp->m_tableList.next(it)) { jam(); *pdata = *it.data; pdata = it.data; } *pdata = RNIL; // todo remove this last item... break; } } } } } if (tabPtr.isNull()) { jam(); /** * Uninitialized table record */ ndbrequire(c_tables.seize(tabPtr)); new (tabPtr.p) Table; tabPtr.p->m_schemaVersion = RNIL; tabPtr.p->m_tableId = tableId; tabPtr.p->m_hasTriggerDefined[0] = 0; tabPtr.p->m_hasTriggerDefined[1] = 0; tabPtr.p->m_hasTriggerDefined[2] = 0; tabPtr.p->m_triggerIds[0] = ILLEGAL_TRIGGER_ID; tabPtr.p->m_triggerIds[1] = ILLEGAL_TRIGGER_ID; tabPtr.p->m_triggerIds[2] = ILLEGAL_TRIGGER_ID;#if 0 ndbout_c("Get tab info conf %d", tableId);#endif c_tables.add(tabPtr); } if(tabPtr.p->m_attributes.getSize() != 0){ jam(); return true; } /** * Initialize table object */ Uint32 noAttribs = tableDesc.NoOfAttributes; Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable); tabPtr.p->m_schemaVersion = tableDesc.TableVersion; // The attribute buffer LocalDataBuffer<15> attrBuf(c_dataBufferPool, tabPtr.p->m_attributes); // Temporary buffer DataBuffer<15> theRest(c_dataBufferPool); if(!attrBuf.seize(noAttribs)){ ndbrequire(false); return false; } if(!theRest.seize(notFixed)){ ndbrequire(false); return false; } DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable DataBuffer<15>::DataBufferIterator restIt; // variable + nullable attrBuf.first(attrIt); theRest.first(restIt); for(Uint32 i = 0; i < noAttribs; i++) { DictTabInfo::Attribute attrDesc; attrDesc.init(); s = SimpleProperties::unpack(it, &attrDesc, DictTabInfo::AttributeMapping, DictTabInfo::AttributeMappingSize, true, true); ndbrequire(s == SimpleProperties::Break); if (!attrDesc.AttributeNullableFlag /* && !attrDesc.AttributeVariableFlag */) { jam(); * attrIt.data = attrDesc.AttributeId; attrBuf.next(attrIt); } else { jam(); * restIt.data = attrDesc.AttributeId; theRest.next(restIt); } // Move to next attribute it.next(); } /** * Put the rest in end of attrBuf */ theRest.first(restIt); for(; !restIt.isNull(); theRest.next(restIt)){ * attrIt.data = * restIt.data; attrBuf.next(attrIt); } theRest.release(); return true;}voidSumaParticipant::execDI_FCOUNTCONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13007); const Uint32 senderData = signal->theData[3]; SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDI_FCOUNTCONF(signal);}void SumaParticipant::execDIGETPRIMCONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13008); const Uint32 senderData = signal->theData[1]; SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDIGETPRIMCONF(signal);}/************************************************************************* * * */voidSumaParticipant::SyncRecord::startMeta(Signal* signal){ jam(); m_currentTable = 0; nextMeta(signal);}/** * m_tableList only contains UserTables */voidSumaParticipant::SyncRecord::nextMeta(Signal* signal){ jam(); TableList::DataBufferIterator it; if(!m_tableList.position(it, m_currentTable)){ completeMeta(signal); return; } GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend(); req->senderRef = suma.reference(); req->senderData = ptrI; req->requestType = GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf; req->tableId = * it.data;#if 0 ndbout_c("GET_TABINFOREQ id %d", req->tableId);#endif suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, GetTabInfoReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal){ jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); Uint32 type = subPtr.p->m_subscriptionType; bool do_continue = false; switch (type) { case SubCreateReq::TableEvent: jam(); break; case SubCreateReq::DatabaseSnapshot: jam(); do_continue = true; break; case SubCreateReq::SelectiveTableSnapshot: jam(); do_continue = true; break; case SubCreateReq::SingleTableScan: jam(); break; default: ndbrequire(false); break; } if (! do_continue) { m_error = true; completeMeta(signal); return; } m_currentTable++; nextMeta(signal); return; // now we need to clean-up}voidSumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){ jam(); GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr(); // const Uint32 gci = conf->gci; const Uint32 tableId = conf->tableId; TableList::DataBufferIterator it; ndbrequire(m_tableList.position(it, m_currentTable)); ndbrequire(* it.data == tableId); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); SegmentedSectionPtr ptr; signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO); bool okToSend = m_doSendSyncData; /* * If it is a selectivetablesnapshot and the table is not part of the * subscription, then do not send anything, just continue. * If it is a tablevent, don't send regardless since the APIs are not * interested in meta data. */ if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) if(!subPtr.p->m_tables[tableId]) okToSend = false; if(okToSend) { if(refToNode(subPtr.p->m_subscriberRef) == 0){ jam(); suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef), GSN_SUB_META_DATA, signal, SubMetaData::SignalLength); jamEntry(); suma.releaseSections(signal); } else { jam(); suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal, SubMetaData::SignalLength, JBB); } } TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); if(fragBuf.getSize() == 0){ /** * We need to gather fragment info */ jam(); signal->theData[0] = RNIL; signal->theData[1] = tableId; signal->theData[2] = ptrI; suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB); return; } m_currentTable++; nextMeta(signal);}void SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){ jam(); const Uint32 userPtr = signal->theData[0]; const Uint32 fragCount = signal->theData[1]; const Uint32 tableId = signal->theData[2]; ndbrequire(userPtr == RNIL && signal->length() == 5); TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); ndbrequire(fragBuf.getSize() == 0); m_currentFragment = fragCount; signal->theData[0] = RNIL; signal->theData[1] = ptrI; signal->theData[2] = tableId; signal->theData[3] = 0; // Frag no suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){ jam(); const Uint32 userPtr = signal->theData[0]; //const Uint32 senderData = signal->theData[1]; const Uint32 nodeCount = signal->theData[6]; const Uint32 tableId = signal->theData[7]; const Uint32 fragNo = signal->theData[8]; ndbrequire(userPtr == RNIL && signal->length() == 9); ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS); TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); /** * Add primary node for fragment to list */ FragmentDescriptor fd; fd.m_fragDesc.m_nodeId = signal->theData[2]; fd.m_fragDesc.m_fragmentNo = fragNo; signal->theData[2] = fd.m_dummy; fragBuf.append(&signal->theData[2], 1); const Uint32 nextFrag = fragNo + 1; if(nextFrag == m_currentFragment){ /** * Complete frag info for table */ m_currentTable++; nextMeta(signal); return; } signal->theData[0] = RNIL; signal->theData[1] = ptrI; signal->theData[2] = tableId; signal->theData[3] = nextFrag; // Frag no suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::completeMeta(Signal* signal){ jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); #if PRINT_ONLY ndbout_c("GSN_SUB_SYNC_CONF (meta)");#else suma.releaseSections(signal); if (m_error) { SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend(); ref->subscriptionId = subPtr.p->m_subscriptionId; ref->subscriptionKey = subPtr.p->m_subscriptionKey; ref->part = SubscriptionData::MetaData; ref->subscriberData = subPtr.p->m_subscriberData; ref->errorCode = SubSyncRef::Undefined; suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal, SubSyncRef::SignalLength, JBB); } else { SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->part = SubscriptionData::MetaData; conf->subscriberData = subPtr.p->m_subscriberData; suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal, SubSyncConf::SignalLength, JBB); }#endif}/********************************************************** * * Scan interface * */voidSumaParticipant::SyncRecord::startScan(Signal* signal){ jam(); /** * Get fraginfo */ m_currentTable = 0; m_currentFragment = 0; nextScan(signal);}boolSumaParticipant::SyncRecord::getNextFragment(TablePtr * tab, FragmentDescriptor * fd){ jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); TableList::DataBufferIterator tabIt; DataBuffer<15>::DataBufferIterator fragIt; m_tableList.position(tabIt, m_currentTable); for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){ TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -