📄 suma.cpp
字号:
if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) { if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) { *tab = tabPtr; return true; } } LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments); fragBuf.position(fragIt, m_currentFragment); for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++){ FragmentDescriptor tmp; tmp.m_dummy = * fragIt.data; if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){ * fd = tmp; * tab = tabPtr; return true; } } m_currentFragment = 0; } return false;}voidSumaParticipant::SyncRecord::nextScan(Signal* signal){ jam(); TablePtr tabPtr; FragmentDescriptor fd; SubscriptionPtr subPtr; if(!getNextFragment(&tabPtr, &fd)){ jam(); completeScan(signal); return; } suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); if(subPtr.p->m_subscriptionType == SubCreateReq::SelectiveTableSnapshot) { jam(); if(!subPtr.p->m_tables[tabPtr.p->m_tableId]) { /* * table is not part of the subscription. Check next table */ m_currentTable++; nextScan(signal); return; } } DataBuffer<15>::Head head = m_attributeList; if(head.getSize() == 0){ head = tabPtr.p->m_attributes; } LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head); ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend(); const Uint32 parallelism = 16; const Uint32 attrLen = 5 + attrBuf.getSize(); req->senderData = m_subscriptionPtrI; req->resultRef = suma.reference(); req->tableId = tabPtr.p->m_tableId; req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; req->schemaVersion = tabPtr.p->m_schemaVersion; req->transId1 = 0; req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); req->clientOpPtr = (ptrI << 16); req->batch_size_rows= 16; req->batch_size_bytes= 0; suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); signal->theData[0] = ptrI; signal->theData[1] = 0; signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8); // Return all signal->theData[3] = attrBuf.getSize(); signal->theData[4] = 0; signal->theData[5] = 0; signal->theData[6] = 0; signal->theData[7] = 0; Uint32 dataPos = 8; DataBuffer<15>::DataBufferIterator it; for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){ AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0); if(dataPos == 25){ suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB); dataPos = 3; } } if(dataPos != 3){ suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB); } m_currentTableId = tabPtr.p->m_tableId; m_currentNoOfAttributes = attrBuf.getSize(); }voidSumaParticipant::execSCAN_FRAGREF(Signal* signal){ jamEntry();// ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr(); ndbrequire(false);}voidSumaParticipant::execSCAN_FRAGCONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13011); ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr(); const Uint32 completed = conf->fragmentCompleted; const Uint32 senderData = conf->senderData; const Uint32 completedOps = conf->completedOps; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, senderData); if(completed != 2){ jam(); #if PRINT_ONLY SubSyncContinueConf * const conf = (SubSyncContinueConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; execSUB_SYNC_CONTINUE_CONF(signal);#else SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend(); req->subscriberData = subPtr.p->m_subscriberData; req->noOfRowsSent = completedOps; sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal, SubSyncContinueReq::SignalLength, JBB);#endif return; } ndbrequire(completedOps == 0); SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI); tmp->m_currentFragment++; tmp->nextScan(signal);}voidSumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13012); SubSyncContinueConf * const conf = (SubSyncContinueConf*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = conf->subscriptionId; key.m_subscriptionKey = conf->subscriptionKey; ndbrequire(c_subscriptions.find(subPtr, key)); ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend(); req->senderData = subPtr.i; req->closeFlag = 0; req->transId1 = 0; req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8); req->batch_size_rows = 16; req->batch_size_bytes = 0; sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::completeScan(Signal* signal){ jam(); // m_tableList.release(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); #if PRINT_ONLY ndbout_c("GSN_SUB_SYNC_CONF (data)");#else SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->part = SubscriptionData::TableData; conf->subscriberData = subPtr.p->m_subscriberData; suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal, SubSyncConf::SignalLength, JBB);#endif}voidSumaParticipant::execSCAN_HBREP(Signal* signal){ jamEntry();#if 0 ndbout << "execSCAN_HBREP" << endl << hex; for(int i = 0; i<signal->length(); i++){ ndbout << signal->theData[i] << " "; if(((i + 1) % 8) == 0) ndbout << endl << hex; } ndbout << endl;#endif}/********************************************************** * Scan data interface * * Assumption: one execTRANSID_AI contains all attr info * */#define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS#define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1static Uint32 f_bufferLock = 0;static Uint32 f_buffer[SUMA_BUF_SZ];static Uint32 f_trigBufferSize = 0;static Uint32 b_bufferLock = 0;static Uint32 b_buffer[SUMA_BUF_SZ];static Uint32 b_trigBufferSize = 0;voidSumaParticipant::execTRANSID_AI(Signal* signal){ jamEntry(); CRASH_INSERTION(13015); TransIdAI * const data = (TransIdAI*)signal->getDataPtr(); const Uint32 opPtrI = data->connectPtr; const Uint32 length = signal->length() - 3; if(f_bufferLock == 0){ f_bufferLock = opPtrI; } else { ndbrequire(f_bufferLock == opPtrI); } Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, (opPtrI >> 16)); Uint32 sum = 0; Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE; Uint32 * headers = f_buffer; const Uint32 * src = &data->attrData[0]; const Uint32 * const end = &src[length]; const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes; for(Uint32 i = 0; i<attribs; i++){ Uint32 tmp = * src++; * headers++ = tmp; Uint32 len = AttributeHeader::getDataSize(tmp); memcpy(dst, src, 4 * len); dst += len; src += len; sum += len; } ndbrequire(src == end); /** * Send data to subscriber */ LinearSectionPtr ptr[3]; ptr[0].p = f_buffer; ptr[0].sz = attribs; ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE; ptr[1].sz = sum; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI); /** * Initialize signal */ SubTableData * sdata = (SubTableData*)signal->getDataPtrSend(); Uint32 ref = subPtr.p->m_subscriberRef; sdata->tableId = syncPtr.p->m_currentTableId; sdata->senderData = subPtr.p->m_subscriberData; sdata->operation = 3; // Scan sdata->gci = 1; // Undefined#if PRINT_ONLY ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);#else sendSignal(ref, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB, ptr, 2);#endif /** * Reset f_bufferLock */ f_bufferLock = 0;}/************************************************************** * * Removing subscription * */voidSumaParticipant::execSUB_REMOVE_REQ(Signal* signal) { jamEntry(); Uint32 senderRef = signal->getSendersBlockRef(); CRASH_INSERTION(13021); const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = req.subscriptionId; key.m_subscriptionKey = req.subscriptionKey; if(!c_subscriptions.find(subPtr, key)) { jam(); sendSubRemoveRef(signal, req, (Uint32) GrepError::SUBSCRIPTION_ID_NOT_FOUND); return; } int count = 0; { jam(); SubscriberPtr i_subbPtr; c_metaSubscribers.first(i_subbPtr); while(!i_subbPtr.isNull()){ jam(); SubscriberPtr subbPtr = i_subbPtr; c_metaSubscribers.next(i_subbPtr); if( subbPtr.p->m_subPtrI == subPtr.i ){ jam(); c_metaSubscribers.release(subbPtr); } } } subPtr.p->m_senderRef = senderRef; subPtr.p->m_senderData = req.senderData; completeSubRemoveReq(signal, subPtr);}voidSumaParticipant::completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr) { Uint32 subscriptionId = subPtr.p->m_subscriptionId; Uint32 subscriptionKey = subPtr.p->m_subscriptionKey; Uint32 senderRef = subPtr.p->m_senderRef; Uint32 senderData = subPtr.p->m_senderData; { Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); syncPtr.p->release(); c_syncPool.release(syncPtr); } // if (subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) { // jam(); // senderRef = subPtr.p->m_subscriberRef; // } c_subscriptions.release(subPtr); /** * I was the last subscription to be remove so clear c_tables */#if 0 ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d", c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());#endif if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) { jam();#if 0 ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");#endif KeyTable<Table>::Iterator it; for(c_tables.first(it); !it.isNull(); ){ it.curr.p->release(* this); TablePtr tabPtr = it.curr; c_tables.next(it); c_tables.release(tabPtr); } } SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; conf->subscriptionId = subscriptionId; conf->subscriptionKey = subscriptionKey; sendSignal(senderRef, GSN_SUB_REMOVE_CONF, signal, SubRemoveConf::SignalLength, JBB);}voidSumaParticipant::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req, Uint32 errCode, bool temporary){ jam(); SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend(); ref->senderRef = reference(); ref->subscriptionId = req.subscriptionId; ref->subscriptionKey = req.subscriptionKey; ref->senderData = req.senderData; ref->err = errCode; if (temporary) ref->setTemporary(); releaseSections(signal); sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF, signal, SubRemoveRef::SignalLength, JBB); return;}voidSumaParticipant::Table::release(SumaParticipant & suma){ jam(); LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes); attrBuf.release(); LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments); fragBuf.release();}voidSumaParticipant::SyncRecord::release(){ jam(); m_tableList.release(); LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList); attrBuf.release(); }template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -