📄 suma.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include "Suma.hpp"#include <ndb_version.h>#include <NdbTCP.h>#include <Bitmask.hpp>#include <SimpleProperties.hpp>#include <signaldata/NodeFailRep.hpp>#include <signaldata/ReadNodesConf.hpp>#include <signaldata/ListTables.hpp>#include <signaldata/GetTabInfo.hpp>#include <signaldata/GetTableId.hpp>#include <signaldata/DictTabInfo.hpp>#include <signaldata/SumaImpl.hpp>#include <signaldata/ScanFrag.hpp>#include <signaldata/TransIdAI.hpp>#include <signaldata/CreateTrig.hpp>#include <signaldata/AlterTrig.hpp>#include <signaldata/DropTrig.hpp>#include <signaldata/FireTrigOrd.hpp>#include <signaldata/TrigAttrInfo.hpp>#include <signaldata/CheckNodeGroups.hpp>#include <signaldata/GCPSave.hpp>#include <GrepError.hpp>#include <DebuggerNames.hpp>//#define HANDOVER_DEBUG//#define NODEFAIL_DEBUG//#define NODEFAIL_DEBUG2//#define DEBUG_SUMA_SEQUENCE//#define EVENT_DEBUG//#define EVENT_PH3_DEBUG//#define EVENT_DEBUG2#if 0#undef DBUG_ENTER#undef DBUG_PRINT#undef DBUG_RETURN#undef DBUG_VOID_RETURN#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }#endif/** * @todo: * SUMA crashes if an index is created at the same time as * global replication. Very easy to reproduce using testIndex. * Note: This only happens occasionally, but is quite easy to reprod. */Uint32 g_subPtrI = RNIL;static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;/************************************************************** * * Start of suma * */#define PRINT_ONLY 0static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;voidSuma::getNodeGroupMembers(Signal* signal) { jam(); /** * Ask DIH for nodeGroupMembers */ CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend(); sd->blockRef = reference(); sd->requestType = CheckNodeGroups::Direct | CheckNodeGroups::GetNodeGroupMembers; sd->nodeId = getOwnNodeId(); EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, CheckNodeGroups::SignalLength); jamEntry(); c_nodeGroup = sd->output; c_noNodesInGroup = 0; for (int i = 0; i < MAX_NDB_NODES; i++) { if (sd->mask.get(i)) { if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup; c_nodesInGroup[c_noNodesInGroup] = i; c_noNodesInGroup++; } } // ndbout_c("c_noNodesInGroup=%d", c_noNodesInGroup); ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup#ifdef NODEFAIL_DEBUG for (Uint32 i = 0; i < c_noNodesInGroup; i++) { ndbout_c ("Suma: NodeGroup %u, me %u, me in group %u, member[%u] %u", c_nodeGroup, getOwnNodeId(), c_idInNodeGroup, i, c_nodesInGroup[i]); }#endif}void Suma::execREAD_CONFIG_REQ(Signal* signal){ jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); // SumaParticipant Uint32 noTables; ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables); /** * @todo: fix pool sizes */ c_tablePool_.setSize(noTables); c_tables.setSize(noTables); c_subscriptions.setSize(20); //10 c_subscriberPool.setSize(64); c_subscriptionPool.setSize(64); //2 c_syncPool.setSize(20); //2 c_dataBufferPool.setSize(128); { SLList<SyncRecord> tmp(c_syncPool); Ptr<SyncRecord> ptr; while(tmp.seize(ptr)) new (ptr.p) SyncRecord(* this, c_dataBufferPool); tmp.release(); } // Suma c_nodePool.setSize(MAX_NDB_NODES); c_masterNodeId = getOwnNodeId(); c_nodeGroup = c_noNodesInGroup = c_idInNodeGroup = 0; for (int i = 0; i < MAX_REPLICAS; i++) { c_nodesInGroup[i] = 0; } c_subCoordinatorPool.setSize(10); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB);}voidSuma::execSTTOR(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSTTOR"); const Uint32 startphase = signal->theData[1]; const Uint32 typeOfStart = signal->theData[7]; DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart)); if(startphase == 1){ jam(); c_restartLock = true; } if(startphase == 3){ jam(); g_TypeOfStart = typeOfStart; signal->theData[0] = reference(); sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);#if 0 /** * Debug */ SubscriptionPtr subPtr; Ptr<SyncRecord> syncPtr; ndbrequire(c_subscriptions.seize(subPtr)); ndbrequire(c_syncPool.seize(syncPtr)); ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i); subPtr.p->m_syncPtrI = syncPtr.i; subPtr.p->m_subscriptionType = SubCreateReq::DatabaseSnapshot; syncPtr.p->m_subscriptionPtrI = subPtr.i; syncPtr.p->ptrI = syncPtr.i; g_subPtrI = subPtr.i; // sendSTTORRY(signal);#endif DBUG_VOID_RETURN; } if(startphase == 5) { getNodeGroupMembers(signal); if (g_TypeOfStart == NodeState::ST_NODE_RESTART) { jam(); for (Uint32 i = 0; i < c_noNodesInGroup; i++) { Uint32 ref = calcSumaBlockRef(c_nodesInGroup[i]); if (ref != reference()) sendSignal(ref, GSN_SUMA_START_ME, signal, 1 /*SumaStartMe::SignalLength*/, JBB); } } } if(startphase == 7) { c_restartLock = false; // may be set false earlier with HANDOVER_REQ if (g_TypeOfStart != NodeState::ST_NODE_RESTART) { for( int i = 0; i < NO_OF_BUCKETS; i++) { if (getResponsibleSumaNodeId(i) == refToNode(reference())) { // I'm running this bucket DBUG_PRINT("info",("bucket %u set to true", i)); c_buckets[i].active = true; } } } if(g_TypeOfStart == NodeState::ST_INITIAL_START && c_masterNodeId == getOwnNodeId()) { jam(); createSequence(signal); DBUG_VOID_RETURN; }//if }//if sendSTTORRY(signal); DBUG_VOID_RETURN;}voidSuma::createSequence(Signal* signal){ jam(); DBUG_ENTER("Suma::createSequence"); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); req->senderData = RNIL; req->sequenceId = SUMA_SEQUENCE; req->requestType = UtilSequenceReq::Create; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); // execUTIL_SEQUENCE_CONF will call createSequenceReply() DBUG_VOID_RETURN;}voidSuma::createSequenceReply(Signal* signal, UtilSequenceConf * conf, UtilSequenceRef * ref){ jam(); if (ref != NULL) ndbrequire(false); sendSTTORRY(signal);}voidSuma::execREAD_NODESCONF(Signal* signal){ jamEntry(); ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr(); c_aliveNodes.clear(); c_preparingNodes.clear(); Uint32 count = 0; for(Uint32 i = 0; i < MAX_NDB_NODES; i++){ if(NodeBitmask::get(conf->allNodes, i)){ jam(); count++; NodePtr node; ndbrequire(c_nodes.seize(node)); node.p->nodeId = i; if(NodeBitmask::get(conf->inactiveNodes, i)){ jam(); node.p->alive = 0; } else { jam(); node.p->alive = 1; c_aliveNodes.set(i); } } else jam(); } c_masterNodeId = conf->masterNodeId; ndbrequire(count == conf->noOfNodes); sendSTTORRY(signal);}voidSuma::sendSTTORRY(Signal* signal){ signal->theData[0] = 0; signal->theData[3] = 1; signal->theData[4] = 3; signal->theData[5] = 5; signal->theData[6] = 7; signal->theData[7] = 255; // No more start phases from missra sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);}voidSuma::execNDB_STTOR(Signal* signal) { jamEntry(); }voidSuma::execCONTINUEB(Signal* signal){ jamEntry();}voidSumaParticipant::execCONTINUEB(Signal* signal) { jamEntry();}/***************************************************************************** * * Node state handling * *****************************************************************************/void Suma::execAPI_FAILREQ(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execAPI_FAILREQ"); Uint32 failedApiNode = signal->theData[0]; //BlockReference retRef = signal->theData[1]; c_failedApiNodes.set(failedApiNode); bool found = removeSubscribersOnNode(signal, failedApiNode); if(!found){ jam(); c_failedApiNodes.clear(failedApiNode); } DBUG_VOID_RETURN;}//execAPI_FAILREQ()boolSumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId){ DBUG_ENTER("SumaParticipant::removeSubscribersOnNode"); bool found = false; SubscriberPtr i_subbPtr; c_dataSubscribers.first(i_subbPtr); while(!i_subbPtr.isNull()){ SubscriberPtr subbPtr = i_subbPtr; c_dataSubscribers.next(i_subbPtr); jam(); if (refToNode(subbPtr.p->m_subscriberRef) == nodeId) { jam(); c_dataSubscribers.remove(subbPtr); c_removeDataSubscribers.add(subbPtr); found = true; } } if(found){ jam(); sendSubStopReq(signal); } DBUG_RETURN(found);}voidSumaParticipant::sendSubStopReq(Signal *signal, bool unlock){ DBUG_ENTER("SumaParticipant::sendSubStopReq"); static bool remove_lock = false; jam(); SubscriberPtr subbPtr; c_removeDataSubscribers.first(subbPtr); if (subbPtr.isNull()){ jam();#if 0 signal->theData[0] = failedApiNode; signal->theData[1] = reference(); sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);#endif c_failedApiNodes.clear(); remove_lock = false; DBUG_VOID_RETURN; } if(remove_lock && !unlock) { jam(); DBUG_VOID_RETURN; } remove_lock = true; SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI); SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend(); req->senderRef = reference(); req->senderData = subbPtr.i; req->subscriberRef = subbPtr.p->m_subscriberRef; req->subscriberData = subbPtr.p->m_subscriberData; req->subscriptionId = subPtr.p->m_subscriptionId; req->subscriptionKey = subPtr.p->m_subscriptionKey; req->part = SubscriptionData::TableData; sendSignal(SUMA_REF, GSN_SUB_STOP_REQ, signal, SubStopReq::SignalLength, JBB); DBUG_VOID_RETURN;}voidSumaParticipant::execSUB_STOP_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF"); SubStopConf * const conf = (SubStopConf*)signal->getDataPtr(); // Uint32 subscriberData = conf->subscriberData; // Uint32 subscriberRef = conf->subscriberRef; Subscription key; key.m_subscriptionId = conf->subscriptionId; key.m_subscriptionKey = conf->subscriptionKey; SubscriptionPtr subPtr; if(c_subscriptions.find(subPtr, key)) { jam(); if (subPtr.p->m_markRemove) { jam(); ndbrequire(false); ndbrequire(subPtr.p->m_nSubscribers > 0); subPtr.p->m_nSubscribers--; if (subPtr.p->m_nSubscribers == 0){ jam(); completeSubRemoveReq(signal, subPtr); } } } sendSubStopReq(signal,true); DBUG_VOID_RETURN;}voidSumaParticipant::execSUB_STOP_REF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_STOP_REF"); SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); Uint32 subscriptionId = ref->subscriptionId; Uint32 subscriptionKey = ref->subscriptionKey; Uint32 part = ref->part; Uint32 subscriberData = ref->subscriberData; Uint32 subscriberRef = ref->subscriberRef; // Uint32 err = ref->err; if(!ref->isTemporary()){ ndbrequire(false);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -