📄 suma.cpp
字号:
// ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr(); ndbrequire(false);}voidSumaParticipant::execSCAN_FRAGCONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13011); ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr(); const Uint32 completed = conf->fragmentCompleted; const Uint32 senderData = conf->senderData; const Uint32 completedOps = conf->completedOps; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, senderData); if(completed != 2){ jam(); #if PRINT_ONLY SubSyncContinueConf * const conf = (SubSyncContinueConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; execSUB_SYNC_CONTINUE_CONF(signal);#else SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend(); req->subscriberData = subPtr.p->m_subscriberData; req->noOfRowsSent = completedOps; sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONTINUE_REQ, signal, SubSyncContinueReq::SignalLength, JBB);#endif return; } ndbrequire(completedOps == 0); SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI); tmp->m_currentFragment++; tmp->nextScan(signal);}voidSumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){ jamEntry(); CRASH_INSERTION(13012); SubSyncContinueConf * const conf = (SubSyncContinueConf*)signal->getDataPtr(); SubscriptionPtr subPtr; Subscription key; key.m_subscriptionId = conf->subscriptionId; key.m_subscriptionKey = conf->subscriptionKey; ndbrequire(c_subscriptions.find(subPtr, key)); ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend(); req->senderData = subPtr.i; req->closeFlag = 0; req->transId1 = 0; req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8); req->batch_size_rows = 16; req->batch_size_bytes = 0; sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB);}voidSumaParticipant::SyncRecord::completeScan(Signal* signal){ jam(); // m_tableList.release(); SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); #if PRINT_ONLY ndbout_c("GSN_SUB_SYNC_CONF (data)");#else SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend(); conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->part = SubscriptionData::TableData; conf->subscriberData = subPtr.p->m_subscriberData; suma.sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_SYNC_CONF, signal, SubSyncConf::SignalLength, JBB);#endif}voidSumaParticipant::execSCAN_HBREP(Signal* signal){ jamEntry();#if 0 ndbout << "execSCAN_HBREP" << endl << hex; for(int i = 0; i<signal->length(); i++){ ndbout << signal->theData[i] << " "; if(((i + 1) % 8) == 0) ndbout << endl << hex; } ndbout << endl;#endif}/********************************************************** * * Suma participant interface * * Creation of subscriber * */voidSumaParticipant::execSUB_START_REQ(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_START_REQ"); CRASH_INSERTION(13013); if (c_restartLock) { jam(); // ndbout_c("c_restartLock"); if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { jam(); sendSubStartRef(signal, /** Error Code */ 0, true); DBUG_VOID_RETURN; } // only allow other Suma's in the nodegroup to come through for restart purposes } Subscription key; SubStartReq * const req = (SubStartReq*)signal->getDataPtr(); Uint32 senderRef = req->senderRef; Uint32 senderData = req->senderData; Uint32 subscriberData = req->subscriberData; Uint32 subscriberRef = req->subscriberRef; SubscriptionData::Part part = (SubscriptionData::Part)req->part; key.m_subscriptionId = req->subscriptionId; key.m_subscriptionKey = req->subscriptionKey; SubscriptionPtr subPtr; if(!c_subscriptions.find(subPtr, key)){ jam(); sendSubStartRef(signal, /** Error Code */ 0); DBUG_VOID_RETURN; } Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); if (syncPtr.p->m_locked) { jam();#if 0 ndbout_c("Locked");#endif sendSubStartRef(signal, /** Error Code */ 0, true); DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; SubscriberPtr subbPtr; if(!c_subscriberPool.seize(subbPtr)){ jam(); syncPtr.p->m_locked = false; sendSubStartRef(signal, /** Error Code */ 0); DBUG_VOID_RETURN; } Uint32 type = subPtr.p->m_subscriptionType; subbPtr.p->m_senderRef = senderRef; subbPtr.p->m_senderData = senderData; switch (type) { case SubCreateReq::TableEvent: jam(); // we want the data to return to the API not DICT subbPtr.p->m_subscriberRef = subscriberRef; // ndbout_c("start ref = %u", signal->getSendersBlockRef()); // ndbout_c("ref = %u", subbPtr.p->m_subscriberRef); // we use the subscription id for now, should really be API choice subbPtr.p->m_subscriberData = subscriberData;#if 0 if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { jam(); for (Uint32 i = 0; i < c_noNodesInGroup; i++) { Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]); if (ref != reference()) { jam(); sendSubStartReq(subPtr, subbPtr, signal, ref); } else jam(); } }#endif break; case SubCreateReq::DatabaseSnapshot: case SubCreateReq::SelectiveTableSnapshot: jam(); ndbrequire(false); //subbPtr.p->m_subscriberRef = GREP_REF; subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData; break; case SubCreateReq::SingleTableScan: jam(); subbPtr.p->m_subscriberRef = subPtr.p->m_subscriberRef; subbPtr.p->m_subscriberData = subPtr.p->m_subscriberData; } subbPtr.p->m_subPtrI = subPtr.i; subbPtr.p->m_firstGCI = RNIL; if (type == SubCreateReq::TableEvent) subbPtr.p->m_lastGCI = 0; else subbPtr.p->m_lastGCI = RNIL; // disable usage of m_lastGCI bool ok = false; switch(part){ case SubscriptionData::MetaData: ok = true; jam(); c_metaSubscribers.add(subbPtr); sendSubStartComplete(signal, subbPtr, 0, part); break; case SubscriptionData::TableData: ok = true; jam(); c_prepDataSubscribers.add(subbPtr); syncPtr.p->startTrigger(signal); break; } ndbrequire(ok); DBUG_VOID_RETURN;}voidSumaParticipant::sendSubStartComplete(Signal* signal, SubscriberPtr subbPtr, Uint32 firstGCI, SubscriptionData::Part part){ jam(); SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); syncPtr.p->m_locked = false; SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = subbPtr.p->m_senderData; conf->subscriptionId = subPtr.p->m_subscriptionId; conf->subscriptionKey = subPtr.p->m_subscriptionKey; conf->firstGCI = firstGCI; conf->part = (Uint32) part; conf->subscriberData = subPtr.p->m_subscriberData; sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_CONF, signal, SubStartConf::SignalLength, JBB);}#if 0voidSumaParticipant::sendSubStartRef(SubscriptionPtr subPtr, Signal* signal, Uint32 errCode, bool temporary){ jam(); SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); xxx ref->senderRef = reference(); xxx ref->senderData = subPtr.p->m_senderData; ref->subscriptionId = subPtr.p->m_subscriptionId; ref->subscriptionKey = subPtr.p->m_subscriptionKey; ref->part = (Uint32) subPtr.p->m_subscriptionType; ref->subscriberData = subPtr.p->m_subscriberData; ref->err = errCode; if (temporary) { jam(); ref->setTemporary(); } releaseSections(signal); sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_START_REF, signal, SubStartRef::SignalLength, JBB);}#endifvoidSumaParticipant::sendSubStartRef(Signal* signal, Uint32 errCode, bool temporary){ jam(); SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend(); ref->senderRef = reference(); ref->err = errCode; if (temporary) { jam(); ref->setTemporary(); } releaseSections(signal); sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal, SubStartRef::SignalLength, JBB);}/********************************************************** * * Trigger admin interface * */voidSumaParticipant::SyncRecord::startTrigger(Signal* signal){ jam(); m_currentTable = 0; m_latestTriggerId = RNIL; nextTrigger(signal);}voidSumaParticipant::SyncRecord::nextTrigger(Signal* signal){ jam(); TableList::DataBufferIterator it; if(!m_tableList.position(it, m_currentTable)){ completeTrigger(signal); return; } SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); const Uint32 RT_BREAK = 48; Uint32 latestTriggerId = 0; for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){ TablePtr tabPtr;#if 0 ndbout_c("nextTrigger tableid %u", *it.data);#endif ndbrequire(suma.c_tables.find(tabPtr, *it.data)); AttributeMask attrMask; createAttributeMask(attrMask, tabPtr.p); for(Uint32 j = 0; j<3; j++){ i++; latestTriggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | tabPtr.p->m_tableId; if(tabPtr.p->m_hasTriggerDefined[j] == 0) { ndbrequire(tabPtr.p->m_triggerIds[j] == ILLEGAL_TRIGGER_ID);#if 0 ndbout_c("DEFINING trigger on table %u[%u]", tabPtr.p->m_tableId, j);#endif CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend(); req->setUserRef(SUMA_REF); req->setConnectionPtr(ptrI); req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE); req->setTriggerActionTime(TriggerActionTime::TA_DETACHED); req->setMonitorReplicas(true); req->setMonitorAllAttributes(false); req->setReceiverRef(SUMA_REF); req->setTriggerId(latestTriggerId); req->setTriggerEvent((TriggerEvent::Value)j); req->setTableId(tabPtr.p->m_tableId); req->setAttributeMask(attrMask); suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ, signal, CreateTrigReq::SignalLength, JBB); } else { /** * Faking that a trigger has been created in order to * simulate the proper behaviour. * Perhaps this should be a dummy signal instead of * (ab)using CREATE_TRIG_CONF. */ CreateTrigConf * conf = (CreateTrigConf*)signal->getDataPtrSend(); conf->setConnectionPtr(ptrI); conf->setTableId(tabPtr.p->m_tableId); conf->setTriggerId(latestTriggerId); suma.sendSignal(SUMA_REF,GSN_CREATE_TRIG_CONF, signal, CreateTrigConf::SignalLength, JBB); } } m_currentTable++; } m_latestTriggerId = latestTriggerId;}voidSumaParticipant::SyncRecord::createAttributeMask(AttributeMask& mask, Table * table){ jam(); mask.clear(); DataBuffer<15>::DataBufferIterator it; LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, table->m_attributes); for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){ mask.set(* it.data); }}voidSumaParticipant::SyncRecord::runCREATE_TRIG_CONF(Signal* signal){ jam(); CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr(); const Uint32 triggerId = conf->getTriggerId(); Uint32 type = (triggerId >> 16) & 0x3; Uint32 tableId = conf->getTableId(); TablePtr tabPtr; ndbrequire(suma.c_tables.find(tabPtr, tableId)); ndbrequire(type < 3); tabPtr.p->m_triggerIds[type] = triggerId; tabPtr.p->m_hasTriggerDefined[type]++; if(triggerId == m_latestTriggerId){ jam(); nextTrigger(signal); }}voidSumaParticipant::SyncRecord::completeTrigger(Signal* signal){ jam(); SubscriptionPtr subPtr; CRASH_INSERTION(13013);#ifdef EVENT_PH3_DEBUG ndbout_c("SumaParticipant: trigger completed");#endif Uint32 gci; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); SubscriberPtr subbPtr; { bool found = false; for(suma.c_prepDataSubscribers.first(subbPtr); !subbPtr.isNull(); suma.c_prepDataSubscribers.next(subbPtr)) { jam(); if(subbPtr.p->m_subPtrI == subPtr.i) { jam(); found = true; break; } } ndbrequire(found); gci = suma.getFirstGCI(signal); subbPtr.p->m_firstGCI = gci; suma.c_prepDataSubscribers.remove(subbPtr); suma.c_dataSubscribers.add(subbPtr); } suma.sendSubStartComplete(signal, subbPtr, gci, SubscriptionData::TableData);}voidSumaParticipant::SyncRecord::startDropTrigger(Signal* signal){ jam(); m_currentTable = 0; m_latestTriggerId = RNIL; nextDropTrigger(signal);}voidSumaParticipant::SyncRecord::nextDropTrigger(Signal* signal){ jam(); TableList::DataBufferIterator it; if(!m_tableList.position(it, m_currentTable)){ completeDropTrigger(signal); return; } SubscriptionPtr subPtr; suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI); ndbrequire(subPtr.p->m_syncPtrI == ptrI); const Uint32 RT_BREAK = 48; Uint32 latestTriggerId = 0; for(Uint32 i = 0; i<RT_BREAK && !it.isNull(); i++, m_tableList.next(it)){ jam(); TablePtr tabPtr;#if 0 ndbout_c("nextDropTrigger tableid %u", *it.data);#endif ndbrequire(suma.c_tables.find(tabPtr, * it.data)); for(Uint32 j = 0; j<3; j++){ jam(); ndbrequire(tab
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -