📄 suma.cpp
字号:
tmp->runCREATE_TRIG_CONF(signal); /** * dodido * @todo: I (Johan) dont know what to do here. Jonas, what do you mean? */ DBUG_VOID_RETURN;}voidSumaParticipant::execCREATE_TRIG_REF(Signal* signal){ jamEntry(); ndbrequire(false);}voidSumaParticipant::execDROP_TRIG_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF"); CRASH_INSERTION(13010); DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr(); const Uint32 senderData = conf->getConnectionPtr(); SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDROP_TRIG_CONF(signal); DBUG_VOID_RETURN;}voidSumaParticipant::execDROP_TRIG_REF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execDROP_TRIG_CONF"); DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr(); const Uint32 senderData = ref->getConnectionPtr(); SyncRecord* tmp = c_syncPool.getPtr(senderData); tmp->runDROP_TRIG_CONF(signal); DBUG_VOID_RETURN;}/************************************************************************* * * */voidSumaParticipant::SyncRecord::runLIST_TABLES_CONF(Signal* signal){ jam(); ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr(); const Uint32 len = signal->length() - ListTablesConf::HeaderLength; SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); for (unsigned i = 0; i < len; i++) { subPtr.p->m_maxTables++; suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this); } // for (unsigned i = 0; i < len; i++) // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]); // m_tableList.append(&conf->tableData[0], len);#if 0 TableList::DataBufferIterator it; int i = 0; for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) { ndbout_c("%u listtableconf tableid %d", i++, *it.data); }#endif if(len == ListTablesConf::DataLength){ jam(); // we expect more LIST_TABLE_CONF return; }#if 0 subPtr.p->m_currentTable = 0; subPtr.p->m_maxTables = 0; TableList::DataBufferIterator it; for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) { subPtr.p->m_maxTables++; suma.addTableId(*it.data, subPtr, NULL);#ifdef NODEFAIL_DEBUG ndbout_c(" listtableconf tableid %d",*it.data);#endif }#endif startMeta(signal);}voidSumaParticipant::SyncRecord::startMeta(Signal* signal){ jam(); m_currentTable = 0; nextMeta(signal);}/** * m_tableList only contains UserTables */voidSumaParticipant::SyncRecord::nextMeta(Signal* signal){ jam(); TableList::DataBufferIterator it; if(!m_tableList.position(it, m_currentTable)){ completeMeta(signal); return; } GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend(); req->senderRef = suma.reference(); req->senderData = ptrI; req->requestType = GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf; req->tableId = * it.data;#if 0 ndbout_c("GET_TABINFOREQ id %d", req->tableId);#endif suma.sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal, GetTabInfoReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::runGET_TABINFOREF(Signal* signal){ jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); Uint32 type = subPtr.p->m_subscriptionType; bool do_continue = false; switch (type) { case SubCreateReq::TableEvent: jam(); break; case SubCreateReq::DatabaseSnapshot: jam(); do_continue = true; break; case SubCreateReq::SelectiveTableSnapshot: jam(); do_continue = true; break; case SubCreateReq::SingleTableScan: jam(); break; default: ndbrequire(false); break; } if (! do_continue) { m_error = true; completeMeta(signal); return; } m_currentTable++; nextMeta(signal); return; // now we need to clean-up}voidSumaParticipant::SyncRecord::runGET_TABINFO_CONF(Signal* signal){ jam(); GetTabInfoConf * const conf = (GetTabInfoConf*)signal->getDataPtr(); // const Uint32 gci = conf->gci; const Uint32 tableId = conf->tableId; TableList::DataBufferIterator it; ndbrequire(m_tableList.position(it, m_currentTable)); ndbrequire(* it.data == tableId); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); SegmentedSectionPtr ptr; signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO); SubMetaData * data = (SubMetaData*)signal->getDataPtrSend(); /** * sending lastCompleteGCI. Used by Lars in interval calculations * incremenet by one, since last_CompleteGCI is the not the current gci. */ data->gci = suma.c_lastCompleteGCI + 1; data->tableId = tableId; data->senderData = subPtr.p->m_subscriberData;#if PRINT_ONLY ndbout_c("GSN_SUB_META_DATA Table %d", tableId);#else bool okToSend = m_doSendSyncData; /* * If it is a selectivetablesnapshot and the table is not part of the * subscription, then do not send anything, just continue. * If it is a tablevent, don't send regardless since the APIs are not * interested in meta data. */ if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) if(!subPtr.p->m_tables[tableId]) okToSend = false; if(okToSend) { if(refToNode(subPtr.p->m_subscriberRef) == 0){ jam(); suma.EXECUTE_DIRECT(refToBlock(subPtr.p->m_subscriberRef), GSN_SUB_META_DATA, signal, SubMetaData::SignalLength); jamEntry(); suma.releaseSections(signal); } else { jam(); suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal, SubMetaData::SignalLength, JBB); } }#endif TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); if(fragBuf.getSize() == 0){ /** * We need to gather fragment info */ jam(); signal->theData[0] = RNIL; signal->theData[1] = tableId; signal->theData[2] = ptrI; suma.sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal, 3, JBB); return; } m_currentTable++; nextMeta(signal);}void SumaParticipant::SyncRecord::runDI_FCOUNTCONF(Signal* signal){ jam(); const Uint32 userPtr = signal->theData[0]; const Uint32 fragCount = signal->theData[1]; const Uint32 tableId = signal->theData[2]; ndbrequire(userPtr == RNIL && signal->length() == 5); TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); ndbrequire(fragBuf.getSize() == 0); m_currentFragment = fragCount; signal->theData[0] = RNIL; signal->theData[1] = ptrI; signal->theData[2] = tableId; signal->theData[3] = 0; // Frag no suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::runDIGETPRIMCONF(Signal* signal){ jam(); const Uint32 userPtr = signal->theData[0]; //const Uint32 senderData = signal->theData[1]; const Uint32 nodeCount = signal->theData[6]; const Uint32 tableId = signal->theData[7]; const Uint32 fragNo = signal->theData[8]; ndbrequire(userPtr == RNIL && signal->length() == 9); ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS); TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); /** * Add primary node for fragment to list */ FragmentDescriptor fd; fd.m_fragDesc.m_nodeId = signal->theData[2]; fd.m_fragDesc.m_fragmentNo = fragNo; signal->theData[2] = fd.m_dummy; fragBuf.append(&signal->theData[2], 1); const Uint32 nextFrag = fragNo + 1; if(nextFrag == m_currentFragment){ /** * Complete frag info for table */ m_currentTable++; nextMeta(signal); return; } signal->theData[0] = RNIL; signal->theData[1] = ptrI; signal->theData[2] = tableId; signal->theData[3] = nextFrag; // Frag no suma.sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);}voidSumaParticipant::SyncRecord::completeMeta(Signal* signal){ jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); #if PRINT_ONLY ndbout_c("GSN_SUB_SYNC_CONF (meta)");#else suma.releaseSections(signal); if (m_error) { SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend(); ref->subscriptionId = subPtr.p->m_subscriptionId; ref->subscriptionKey = subPtr.p->m_subscriptionKey; ref->part = SubscriptionData::MetaData; ref->subscriberData = subPtr.p->m_subscriberData; ref->errorCode = SubSyncRef::Undefined; suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_REF, signal, SubSyncRef::SignalLength, JBB); } else { SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->part = SubscriptionData::MetaData; conf->subscriberData = subPtr.p->m_subscriberData; suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal, SubSyncConf::SignalLength, JBB); }#endif}/********************************************************** * * Scan interface * */voidSumaParticipant::SyncRecord::startScan(Signal* signal){ jam(); /** * Get fraginfo */ m_currentTable = 0; m_currentFragment = 0; nextScan(signal);}boolSumaParticipant::SyncRecord::getNextFragment(TablePtr * tab, FragmentDescriptor * fd){ jam(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); TableList::DataBufferIterator tabIt; DataBuffer<15>::DataBufferIterator fragIt; m_tableList.position(tabIt, m_currentTable); for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++){ TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data)); if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) { if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) { *tab = tabPtr; return true; } } LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); fragBuf.position(fragIt, m_currentFragment); for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){ FragmentDescriptor tmp; tmp.m_dummy = * fragIt.data; if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){ * fd = tmp; * tab = tabPtr; return true; } } m_currentFragment = 0; } return false;}voidSumaParticipant::SyncRecord::nextScan(Signal* signal){ jam(); TablePtr tabPtr; FragmentDescriptor fd; SubscriptionPtr subPtr; if(!getNextFragment(&tabPtr, &fd)){ jam(); completeScan(signal); return; } suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) { jam(); if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) { /* * table is not part of the subscription. Check next table */ m_currentTable++; nextScan(signal); return; } } DataBuffer<15>::Head head = m_attributeList; if(head.getSize() == 0){ head = tabPtr.p->m_attributes; } LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head); ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend(); const Uint32 parallelism = 16; const Uint32 attrLen = 5 + attrBuf.getSize(); req->senderData = m_subscriptionPtrI; req->resultRef = suma.reference(); req->tableId = tabPtr.p->m_tableId; req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; req->schemaVersion = tabPtr.p->m_schemaVersion; req->transId1 = 0; req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); req->clientOpPtr = (ptrI << 16); req->batch_size_rows= 16; req->batch_size_bytes= 0; suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); signal->theData[0] = ptrI; signal->theData[1] = 0; signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8); // Return all signal->theData[3] = attrBuf.getSize(); signal->theData[4] = 0; signal->theData[5] = 0; signal->theData[6] = 0; signal->theData[7] = 0; Uint32 dataPos = 8; DataBuffer<15>::DataBufferIterator it; for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){ AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0); if(dataPos == 25){ suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB); dataPos = 3; } } if(dataPos != 3){ suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB); } m_currentTableId = tabPtr.p->m_tableId; m_currentNoOfAttributes = attrBuf.getSize(); }voidSumaParticipant::execSCAN_FRAGREF(Signal* signal){ jamEntry();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -