📄 suma.cpp
字号:
} SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend(); req->subscriberRef = subscriberRef; req->subscriberData = subscriberData; req->subscriptionId = subscriptionId; req->subscriptionKey = subscriptionKey; req->part = part; sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); DBUG_VOID_RETURN;}voidSuma::execNODE_FAILREP(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execNODE_FAILREP"); NodeFailRep * const rep = (NodeFailRep*)signal->getDataPtr(); bool changed = false; NodePtr nodePtr;#ifdef NODEFAIL_DEBUG ndbout_c("Suma: nodefailrep");#endif c_nodeFailGCI = getFirstGCI(signal); for(c_nodes.first(nodePtr); nodePtr.i != RNIL; c_nodes.next(nodePtr)){ if(NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)){ if(nodePtr.p->alive){ ndbassert(c_aliveNodes.get(nodePtr.p->nodeId)); changed = true; jam(); } else { ndbassert(!c_aliveNodes.get(nodePtr.p->nodeId)); jam(); } if (c_preparingNodes.get(nodePtr.p->nodeId)) { jam(); // we are currently preparing this node that died // it's ok just to clear and go back to waiting for it to start up Restart.resetNode(calcSumaBlockRef(nodePtr.p->nodeId)); c_preparingNodes.clear(nodePtr.p->nodeId); } else if (c_handoverToDo) { jam(); // TODO what if I'm a SUMA that is currently restarting and the SUMA // responsible for restarting me is the one that died? // a node has failed whilst handover is going on // let's check if we're in the process of handover with that node c_handoverToDo = false; for( int i = 0; i < NO_OF_BUCKETS; i++) { if (c_buckets[i].handover) { // I'm doing handover, but is it with the dead node? if (getResponsibleSumaNodeId(i) == nodePtr.p->nodeId) { // so it was the dead node, has handover started? if (c_buckets[i].handover_started) { jam(); // we're not ok and will have lost data! // set not active to indicate this - // this will generate takeover behaviour c_buckets[i].active = false; c_buckets[i].handover_started = false; } // else we're ok to revert back to state before c_buckets[i].handover = false; } else { jam(); // ok, we're doing handover with a different node c_handoverToDo = true; } } } } c_failoverBuffer.nodeFailRep(); nodePtr.p->alive = 0; c_aliveNodes.clear(nodePtr.p->nodeId); // this has to be done after the loop above } } DBUG_VOID_RETURN;}voidSuma::execINCL_NODEREQ(Signal* signal){ jamEntry(); //const Uint32 senderRef = signal->theData[0]; const Uint32 inclNode = signal->theData[1]; NodePtr node; for(c_nodes.first(node); node.i != RNIL; c_nodes.next(node)){ jam(); const Uint32 nodeId = node.p->nodeId; if(inclNode == nodeId){ jam(); ndbrequire(node.p->alive == 0); ndbrequire(!c_aliveNodes.get(nodeId)); for (Uint32 j = 0; j < c_noNodesInGroup; j++) { jam(); if (c_nodesInGroup[j] == nodeId) { // the starting node is part of my node group jam(); c_preparingNodes.set(nodeId); // set as being prepared for (Uint32 i = 0; i < c_noNodesInGroup; i++) { jam(); if (i == c_idInNodeGroup) { jam(); // I'm responsible for restarting this SUMA // ALL dict's should have meta data info so it is ok to start Restart.startNode(signal, calcSumaBlockRef(nodeId)); break; }//if if (c_aliveNodes.get(c_nodesInGroup[i])) { jam(); break; // another Suma takes care of this }//if }//for break; }//if }//for node.p->alive = 1; c_aliveNodes.set(nodeId); break; }//if }//for#if 0 // if we include this DIH's got to be prepared, later if needed... signal->theData[0] = reference(); sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);#endif}voidSuma::execSIGNAL_DROPPED_REP(Signal* signal){ jamEntry(); ndbrequire(false);}/******************************************************************** * * Dump state * */static unsignedcount_subscribers(const DLList<SumaParticipant::Subscriber> &subs){ unsigned n= 0; SumaParticipant::SubscriberPtr i_subbPtr; subs.first(i_subbPtr); while(!i_subbPtr.isNull()){ n++; subs.next(i_subbPtr); } return n;}voidSuma::execDUMP_STATE_ORD(Signal* signal){ jamEntry(); Uint32 tCase = signal->theData[0]; if(tCase >= 8000 && tCase <= 8003){ SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, g_subPtrI); Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); if(tCase == 8000){ syncPtr.p->startMeta(signal); } if(tCase == 8001){ syncPtr.p->startScan(signal); } if(tCase == 8002){ syncPtr.p->startTrigger(signal); } if(tCase == 8003){ subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan; LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList); Uint32 tab = 0; Uint32 att[] = { 0, 1, 1 }; syncPtr.p->m_tableList.append(&tab, 1); attrs.append(att, 3); } } if(tCase == 8004){ infoEvent("Suma: c_subscriberPool size: %d free: %d", c_subscriberPool.getSize(), c_subscriberPool.getNoOfFree()); infoEvent("Suma: c_tablePool size: %d free: %d", c_tablePool_.getSize(), c_tablePool_.getNoOfFree()); infoEvent("Suma: c_subscriptionPool size: %d free: %d", c_subscriptionPool.getSize(), c_subscriptionPool.getNoOfFree()); infoEvent("Suma: c_syncPool size: %d free: %d", c_syncPool.getSize(), c_syncPool.getNoOfFree()); infoEvent("Suma: c_dataBufferPool size: %d free: %d", c_dataBufferPool.getSize(), c_dataBufferPool.getNoOfFree()); infoEvent("Suma: c_metaSubscribers count: %d", count_subscribers(c_metaSubscribers)); infoEvent("Suma: c_dataSubscribers count: %d", count_subscribers(c_dataSubscribers)); infoEvent("Suma: c_prepDataSubscribers count: %d", count_subscribers(c_prepDataSubscribers)); infoEvent("Suma: c_removeDataSubscribers count: %d", count_subscribers(c_removeDataSubscribers)); }}/******************************************************************** * * Convert a table name (db+schema+tablename) to tableId * */#if 0voidSumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal){ jam(); if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) { jam(); GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend(); char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable]; const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated req->senderRef = reference(); req->senderData = subPtr.i; req->len = strLen; LinearSectionPtr ptr[1]; ptr[0].p = (Uint32*)tableName; ptr[0].sz = strLen; sendSignal(DBDICT_REF, GSN_GET_TABLEID_REQ, signal, GetTableIdReq::SignalLength, JBB, ptr, 1); } else { jam(); sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr); }}#endifvoid SumaParticipant::addTableId(Uint32 tableId, SubscriptionPtr subPtr, SyncRecord *psyncRec){#ifdef NODEFAIL_DEBUG ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u", tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);#endif subPtr.p->m_tables[tableId] = 1; subPtr.p->m_currentTable++; if(psyncRec != NULL) psyncRec->m_tableList.append(&tableId, 1); }#if 0void SumaParticipant::execGET_TABLEID_CONF(Signal * signal){ jamEntry(); GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr(); Uint32 tableId = conf->tableId; //Uint32 schemaVersion = conf->schemaVersion; Uint32 senderData = conf->senderData; SubscriptionPtr subPtr; Ptr<SyncRecord> syncPtr; c_subscriptions.getPtr(subPtr, senderData); c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); /* * add to m_tableList */ addTableId(tableId, subPtr, syncPtr.p); convertNameToId(subPtr, signal);}void SumaParticipant::execGET_TABLEID_REF(Signal * signal){ jamEntry(); GetTableIdRef const * ref = (GetTableIdRef *)signal->getDataPtr(); Uint32 senderData = ref->senderData; // Uint32 err = ref->err; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, senderData); Uint32 subData = subPtr.p->m_subscriberData; SubCreateRef * reff = (SubCreateRef*)ref; /** * @todo: map ref->err to GrepError. */ reff->err = GrepError::SELECTED_TABLE_NOT_FOUND; reff->subscriberData = subData; sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_CREATE_REF, signal, SubCreateRef::SignalLength, JBB);}#endif/************************************************************* * * Creation of subscription id's * ************************************************************/void Suma::execCREATE_SUBID_REQ(Signal* signal) { jamEntry(); CRASH_INSERTION(13001); CreateSubscriptionIdReq const * req = (CreateSubscriptionIdReq*)signal->getDataPtr(); SubscriberPtr subbPtr; if(!c_subscriberPool.seize(subbPtr)){ jam(); sendSubIdRef(signal, GrepError::SUBSCRIPTION_ID_NOMEM); return; } subbPtr.p->m_subscriberRef = signal->getSendersBlockRef(); subbPtr.p->m_senderData = req->senderData; subbPtr.p->m_subscriberData = subbPtr.i; UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend(); utilReq->senderData = subbPtr.p->m_subscriberData; utilReq->sequenceId = SUMA_SEQUENCE; utilReq->requestType = UtilSequenceReq::NextVal; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB);}voidSuma::execUTIL_SEQUENCE_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); CRASH_INSERTION(13002); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); if(conf->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, conf, NULL); DBUG_VOID_RETURN; } Uint64 subId; memcpy(&subId,conf->sequenceValue,8); Uint32 subData = conf->senderData; SubscriberPtr subbPtr; c_subscriberPool.getPtr(subbPtr,subData); CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf; subconf->subscriptionId = (Uint32)subId; subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF); subconf->subscriberData = subbPtr.p->m_senderData; sendSignal(subbPtr.p->m_subscriberRef, GSN_CREATE_SUBID_CONF, signal, CreateSubscriptionIdConf::SignalLength, JBB); c_subscriberPool.release(subbPtr); DBUG_VOID_RETURN;}voidSuma::execUTIL_SEQUENCE_REF(Signal* signal){ jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); if(ref->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, NULL, ref); DBUG_VOID_RETURN; } Uint32 subData = ref->senderData; SubscriberPtr subbPtr; c_subscriberPool.getPtr(subbPtr,subData); sendSubIdRef(signal, GrepError::SEQUENCE_ERROR); c_subscriberPool.release(subbPtr); DBUG_VOID_RETURN;}//execUTIL_SEQUENCE_REF()voidSumaParticipant::sendSubIdRef(Signal* signal, Uint32 errCode){ jam(); CreateSubscriptionIdRef * ref = (CreateSubscriptionIdRef *)signal->getDataPtrSend(); ref->err = errCode; sendSignal(signal->getSendersBlockRef(), GSN_CREATE_SUBID_REF, signal, CreateSubscriptionIdRef::SignalLength, JBB); releaseSections(signal); return;}/********************************************************** * Suma participant interface * * Creation of subscriptions */voidSumaParticipant::execSUB_CREATE_REQ(Signal* signal) {#ifdef NODEFAIL_DEBUG ndbout_c("SumaParticipant::execSUB_CREATE_REQ");#endif jamEntry(); CRASH_INSERTION(13003); const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr(); const Uint32 subId = req.subscriptionId; const Uint32 subKey = req.subscriptionKey; const Uint32 subRef = req.subscriberRef; const Uint32 subData = req.subscriberData; const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags; const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags; const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0; const bool restartFlag = (flags & SubCreateReq::RestartFlag) != 0; const Uint32 sender = signal->getSendersBlockRef(); Subscription key; key.m_subscriptionId = subId; key.m_subscriptionKey = subKey; SubscriptionPtr subPtr; Ptr<SyncRecord> syncPtr; if (addTableFlag) { ndbrequire(restartFlag); //TODO remove this if(!c_subscriptions.find(subPtr, key)) { jam(); sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_NOT_FOUND); return; } jam(); c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); } else { // Check that id/key is unique if(c_subscriptions.find(subPtr, key)) { jam(); sendSubCreateRef(signal, req, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE); return; } if(!c_subscriptions.seize(subPtr)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -