📄 suma.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include "Suma.hpp"#include <ndb_version.h>#include <NdbTCP.h>#include <Bitmask.hpp>#include <SimpleProperties.hpp>#include <signaldata/NodeFailRep.hpp>#include <signaldata/ReadNodesConf.hpp>#include <signaldata/ListTables.hpp>#include <signaldata/GetTabInfo.hpp>#include <signaldata/GetTableId.hpp>#include <signaldata/DictTabInfo.hpp>#include <signaldata/SumaImpl.hpp>#include <signaldata/ScanFrag.hpp>#include <signaldata/TransIdAI.hpp>#include <signaldata/CreateTrig.hpp>#include <signaldata/AlterTrig.hpp>#include <signaldata/DropTrig.hpp>#include <signaldata/FireTrigOrd.hpp>#include <signaldata/TrigAttrInfo.hpp>#include <signaldata/CheckNodeGroups.hpp>#include <signaldata/GCPSave.hpp>#include <GrepError.hpp>#include <DebuggerNames.hpp>//#define HANDOVER_DEBUG//#define NODEFAIL_DEBUG//#define NODEFAIL_DEBUG2//#define DEBUG_SUMA_SEQUENCE//#define EVENT_DEBUG//#define EVENT_PH3_DEBUG//#define EVENT_DEBUG2#if 0#undef DBUG_ENTER#undef DBUG_PRINT#undef DBUG_RETURN#undef DBUG_VOID_RETURN#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }#endif/** * @todo: * SUMA crashes if an index is created at the same time as * global replication. Very easy to reproduce using testIndex. * Note: This only happens occasionally, but is quite easy to reprod. */Uint32 g_subPtrI = RNIL;static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;/************************************************************** * * Start of suma * */#define PRINT_ONLY 0static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;void Suma::execREAD_CONFIG_REQ(Signal* signal){ jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = theConfiguration.getOwnConfigIterator(); ndbrequire(p != 0); // SumaParticipant Uint32 noTables; ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES, &noTables); /** * @todo: fix pool sizes */ c_tablePool_.setSize(noTables); c_tables.setSize(noTables); c_subscriptions.setSize(20); //10 c_subscriberPool.setSize(64); c_subscriptionPool.setSize(64); //2 c_syncPool.setSize(20); //2 c_dataBufferPool.setSize(128); { SLList<SyncRecord> tmp(c_syncPool); Ptr<SyncRecord> ptr; while(tmp.seize(ptr)) new (ptr.p) SyncRecord(* this, c_dataBufferPool); tmp.release(); } // Suma c_nodePool.setSize(MAX_NDB_NODES); c_masterNodeId = getOwnNodeId(); c_nodeGroup = c_noNodesInGroup = c_idInNodeGroup = 0; for (int i = 0; i < MAX_REPLICAS; i++) { c_nodesInGroup[i] = 0; } c_subCoordinatorPool.setSize(10); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB);}voidSuma::execSTTOR(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSTTOR"); const Uint32 startphase = signal->theData[1]; const Uint32 typeOfStart = signal->theData[7]; DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart)); if(startphase == 3){ jam(); g_TypeOfStart = typeOfStart; signal->theData[0] = reference(); sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);#if 0 /** * Debug */ SubscriptionPtr subPtr; Ptr<SyncRecord> syncPtr; ndbrequire(c_subscriptions.seize(subPtr)); ndbrequire(c_syncPool.seize(syncPtr)); ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i); subPtr.p->m_syncPtrI = syncPtr.i; subPtr.p->m_subscriptionType = SubCreateReq::DatabaseSnapshot; syncPtr.p->m_subscriptionPtrI = subPtr.i; syncPtr.p->ptrI = syncPtr.i; g_subPtrI = subPtr.i; // sendSTTORRY(signal);#endif DBUG_VOID_RETURN; } if(startphase == 7) { if(g_TypeOfStart == NodeState::ST_INITIAL_START && c_masterNodeId == getOwnNodeId()) { jam(); createSequence(signal); DBUG_VOID_RETURN; }//if }//if sendSTTORRY(signal); DBUG_VOID_RETURN;}voidSuma::createSequence(Signal* signal){ jam(); DBUG_ENTER("Suma::createSequence"); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); req->senderData = RNIL; req->sequenceId = SUMA_SEQUENCE; req->requestType = UtilSequenceReq::Create; sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); // execUTIL_SEQUENCE_CONF will call createSequenceReply() DBUG_VOID_RETURN;}voidSuma::createSequenceReply(Signal* signal, UtilSequenceConf * conf, UtilSequenceRef * ref){ jam(); if (ref != NULL) ndbrequire(false); sendSTTORRY(signal);}voidSuma::execREAD_NODESCONF(Signal* signal){ jamEntry(); ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr(); c_aliveNodes.clear(); c_preparingNodes.clear(); Uint32 count = 0; for(Uint32 i = 0; i < MAX_NDB_NODES; i++){ if(NodeBitmask::get(conf->allNodes, i)){ jam(); count++; NodePtr node; ndbrequire(c_nodes.seize(node)); node.p->nodeId = i; if(NodeBitmask::get(conf->inactiveNodes, i)){ jam(); node.p->alive = 0; } else { jam(); node.p->alive = 1; c_aliveNodes.set(i); } } else jam(); } c_masterNodeId = conf->masterNodeId; ndbrequire(count == conf->noOfNodes); sendSTTORRY(signal);}voidSuma::sendSTTORRY(Signal* signal){ signal->theData[0] = 0; signal->theData[3] = 1; signal->theData[4] = 3; signal->theData[5] = 5; signal->theData[6] = 7; signal->theData[7] = 255; // No more start phases from missra sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);}voidSuma::execNDB_STTOR(Signal* signal) { jamEntry(); }voidSuma::execCONTINUEB(Signal* signal){ jamEntry();}voidSumaParticipant::execCONTINUEB(Signal* signal) { jamEntry();}/***************************************************************************** * * Node state handling * *****************************************************************************/voidSuma::execSIGNAL_DROPPED_REP(Signal* signal){ jamEntry(); ndbrequire(false);}/******************************************************************** * * Dump state * */static unsignedcount_subscribers(const DLList<SumaParticipant::Subscriber> &subs){ unsigned n= 0; SumaParticipant::SubscriberPtr i_subbPtr; subs.first(i_subbPtr); while(!i_subbPtr.isNull()){ n++; subs.next(i_subbPtr); } return n;}voidSuma::execDUMP_STATE_ORD(Signal* signal){ jamEntry(); Uint32 tCase = signal->theData[0]; if(tCase >= 8000 && tCase <= 8003){ SubscriptionPtr subPtr; c_subscriptions.getPtr(subPtr, g_subPtrI); Ptr<SyncRecord> syncPtr; c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); if(tCase == 8000){ syncPtr.p->startMeta(signal); } if(tCase == 8001){ syncPtr.p->startScan(signal); } if(tCase == 8003){ subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan; LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList); Uint32 tab = 0; Uint32 att[] = { 0, 1, 1 }; syncPtr.p->m_tableList.append(&tab, 1); attrs.append(att, 3); } } if(tCase == 8004){ infoEvent("Suma: c_subscriberPool size: %d free: %d", c_subscriberPool.getSize(), c_subscriberPool.getNoOfFree()); infoEvent("Suma: c_tablePool size: %d free: %d", c_tablePool_.getSize(), c_tablePool_.getNoOfFree()); infoEvent("Suma: c_subscriptionPool size: %d free: %d", c_subscriptionPool.getSize(), c_subscriptionPool.getNoOfFree()); infoEvent("Suma: c_syncPool size: %d free: %d", c_syncPool.getSize(), c_syncPool.getNoOfFree()); infoEvent("Suma: c_dataBufferPool size: %d free: %d", c_dataBufferPool.getSize(), c_dataBufferPool.getNoOfFree()); infoEvent("Suma: c_metaSubscribers count: %d", count_subscribers(c_metaSubscribers)); infoEvent("Suma: c_dataSubscribers count: %d", count_subscribers(c_dataSubscribers)); infoEvent("Suma: c_prepDataSubscribers count: %d", count_subscribers(c_prepDataSubscribers)); infoEvent("Suma: c_removeDataSubscribers count: %d", count_subscribers(c_removeDataSubscribers)); }}/******************************************************************** * * Convert a table name (db+schema+tablename) to tableId * */#if 0voidSumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal){ jam(); if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) { jam(); GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend(); char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable]; const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated req->senderRef = reference(); req->senderData = subPtr.i; req->len = strLen; LinearSectionPtr ptr[1]; ptr[0].p = (Uint32*)tableName; ptr[0].sz = strLen; sendSignal(DBDICT_REF, GSN_GET_TABLEID_REQ, signal, GetTableIdReq::SignalLength, JBB, ptr, 1); } else { jam(); sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr); }}#endifvoid SumaParticipant::addTableId(Uint32 tableId, SubscriptionPtr subPtr, SyncRecord *psyncRec){#ifdef NODEFAIL_DEBUG ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u", tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);#endif subPtr.p->m_tables[tableId] = 1; subPtr.p->m_currentTable++; if(psyncRec != NULL) psyncRec->m_tableList.append(&tableId, 1); }#if 0void SumaParticipant::execGET_TABLEID_CONF(Signal * signal){ jamEntry(); GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr(); Uint32 tableId = conf->tableId; //Uint32 schemaVersion = conf->schemaVersion; Uint32 senderData = conf->senderData; SubscriptionPtr subPtr; Ptr<SyncRecord> syncPtr; c_subscriptions.getPtr(subPtr, senderData); c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI); /* * add to m_tableList */ addTableId(tableId, subPtr, syncPtr.p);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -