⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 suma.cpp

📁 mysql-5.0.27版本源码包
💻 CPP
📖 第 1 页 / 共 4 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include "Suma.hpp"#include <ndb_version.h>#include <NdbTCP.h>#include <Bitmask.hpp>#include <SimpleProperties.hpp>#include <signaldata/NodeFailRep.hpp>#include <signaldata/ReadNodesConf.hpp>#include <signaldata/ListTables.hpp>#include <signaldata/GetTabInfo.hpp>#include <signaldata/GetTableId.hpp>#include <signaldata/DictTabInfo.hpp>#include <signaldata/SumaImpl.hpp>#include <signaldata/ScanFrag.hpp>#include <signaldata/TransIdAI.hpp>#include <signaldata/CreateTrig.hpp>#include <signaldata/AlterTrig.hpp>#include <signaldata/DropTrig.hpp>#include <signaldata/FireTrigOrd.hpp>#include <signaldata/TrigAttrInfo.hpp>#include <signaldata/CheckNodeGroups.hpp>#include <signaldata/GCPSave.hpp>#include <GrepError.hpp>#include <DebuggerNames.hpp>//#define HANDOVER_DEBUG//#define NODEFAIL_DEBUG//#define NODEFAIL_DEBUG2//#define DEBUG_SUMA_SEQUENCE//#define EVENT_DEBUG//#define EVENT_PH3_DEBUG//#define EVENT_DEBUG2#if 0#undef DBUG_ENTER#undef DBUG_PRINT#undef DBUG_RETURN#undef DBUG_VOID_RETURN#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }#endif/** * @todo: * SUMA crashes if an index is created at the same time as * global replication. Very easy to reproduce using testIndex. * Note: This only happens occasionally, but is quite easy to reprod. */Uint32 g_subPtrI = RNIL;static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;/************************************************************** * * Start of suma * */#define PRINT_ONLY 0static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;void Suma::execREAD_CONFIG_REQ(Signal* signal){  jamEntry();  const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();  Uint32 ref = req->senderRef;  Uint32 senderData = req->senderData;  const ndb_mgm_configuration_iterator * p =     theConfiguration.getOwnConfigIterator();  ndbrequire(p != 0);  // SumaParticipant  Uint32 noTables;  ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES,  			    &noTables);  /**   * @todo: fix pool sizes   */  c_tablePool_.setSize(noTables);  c_tables.setSize(noTables);    c_subscriptions.setSize(20); //10  c_subscriberPool.setSize(64);    c_subscriptionPool.setSize(64); //2  c_syncPool.setSize(20); //2  c_dataBufferPool.setSize(128);    {    SLList<SyncRecord> tmp(c_syncPool);    Ptr<SyncRecord> ptr;    while(tmp.seize(ptr))      new (ptr.p) SyncRecord(* this, c_dataBufferPool);    tmp.release();  }  // Suma  c_nodePool.setSize(MAX_NDB_NODES);  c_masterNodeId = getOwnNodeId();  c_nodeGroup = c_noNodesInGroup = c_idInNodeGroup = 0;  for (int i = 0; i < MAX_REPLICAS; i++) {    c_nodesInGroup[i]   = 0;  }  c_subCoordinatorPool.setSize(10);  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();  conf->senderRef = reference();  conf->senderData = senderData;  sendSignal(ref, GSN_READ_CONFIG_CONF, signal, 	     ReadConfigConf::SignalLength, JBB);}voidSuma::execSTTOR(Signal* signal) {  jamEntry();                              DBUG_ENTER("Suma::execSTTOR");  const Uint32 startphase  = signal->theData[1];  const Uint32 typeOfStart = signal->theData[7];  DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart));  if(startphase == 3){    jam();    g_TypeOfStart = typeOfStart;    signal->theData[0] = reference();    sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);#if 0    /**     * Debug     */        SubscriptionPtr subPtr;    Ptr<SyncRecord> syncPtr;    ndbrequire(c_subscriptions.seize(subPtr));    ndbrequire(c_syncPool.seize(syncPtr));        ndbout_c("Suma: subPtr.i = %d syncPtr.i = %d", subPtr.i, syncPtr.i);    subPtr.p->m_syncPtrI = syncPtr.i;    subPtr.p->m_subscriptionType = SubCreateReq::DatabaseSnapshot;    syncPtr.p->m_subscriptionPtrI = subPtr.i;    syncPtr.p->ptrI = syncPtr.i;    g_subPtrI = subPtr.i;    //    sendSTTORRY(signal);#endif        DBUG_VOID_RETURN;  }  if(startphase == 7) {    if(g_TypeOfStart == NodeState::ST_INITIAL_START &&       c_masterNodeId == getOwnNodeId()) {      jam();      createSequence(signal);      DBUG_VOID_RETURN;    }//if  }//if    sendSTTORRY(signal);    DBUG_VOID_RETURN;}voidSuma::createSequence(Signal* signal){  jam();  DBUG_ENTER("Suma::createSequence");  UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();    req->senderData  = RNIL;  req->sequenceId  = SUMA_SEQUENCE;  req->requestType = UtilSequenceReq::Create;  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 	     signal, UtilSequenceReq::SignalLength, JBB);  // execUTIL_SEQUENCE_CONF will call createSequenceReply()  DBUG_VOID_RETURN;}voidSuma::createSequenceReply(Signal* signal,			  UtilSequenceConf * conf,			  UtilSequenceRef * ref){  jam();  if (ref != NULL)    ndbrequire(false);  sendSTTORRY(signal);}voidSuma::execREAD_NODESCONF(Signal* signal){  jamEntry();  ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();   c_aliveNodes.clear();  c_preparingNodes.clear();  Uint32 count = 0;  for(Uint32 i = 0; i < MAX_NDB_NODES; i++){    if(NodeBitmask::get(conf->allNodes, i)){      jam();            count++;      NodePtr node;      ndbrequire(c_nodes.seize(node));            node.p->nodeId = i;      if(NodeBitmask::get(conf->inactiveNodes, i)){	jam();	node.p->alive = 0;      } else {	jam();	node.p->alive = 1;	c_aliveNodes.set(i);      }    } else      jam();  }  c_masterNodeId = conf->masterNodeId;  ndbrequire(count == conf->noOfNodes);  sendSTTORRY(signal);}voidSuma::sendSTTORRY(Signal* signal){  signal->theData[0] = 0;  signal->theData[3] = 1;  signal->theData[4] = 3;  signal->theData[5] = 5;  signal->theData[6] = 7;  signal->theData[7] = 255; // No more start phases from missra  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);}voidSuma::execNDB_STTOR(Signal* signal) {  jamEntry();                            }voidSuma::execCONTINUEB(Signal* signal){  jamEntry();}voidSumaParticipant::execCONTINUEB(Signal* signal) {  jamEntry();}/***************************************************************************** *  * Node state handling * *****************************************************************************/voidSuma::execSIGNAL_DROPPED_REP(Signal* signal){  jamEntry();  ndbrequire(false);}/******************************************************************** * * Dump state * */static unsignedcount_subscribers(const DLList<SumaParticipant::Subscriber> &subs){  unsigned n= 0;  SumaParticipant::SubscriberPtr i_subbPtr;  subs.first(i_subbPtr);  while(!i_subbPtr.isNull()){    n++;    subs.next(i_subbPtr);  }  return n;}voidSuma::execDUMP_STATE_ORD(Signal* signal){  jamEntry();  Uint32 tCase = signal->theData[0];  if(tCase >= 8000 && tCase <= 8003){    SubscriptionPtr subPtr;    c_subscriptions.getPtr(subPtr, g_subPtrI);        Ptr<SyncRecord> syncPtr;    c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);        if(tCase == 8000){      syncPtr.p->startMeta(signal);    }        if(tCase == 8001){      syncPtr.p->startScan(signal);    }    if(tCase == 8003){      subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;      LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);      Uint32 tab = 0;      Uint32 att[] = { 0, 1, 1 };      syncPtr.p->m_tableList.append(&tab, 1);      attrs.append(att, 3);    }  }  if(tCase == 8004){    infoEvent("Suma: c_subscriberPool  size: %d free: %d",	      c_subscriberPool.getSize(),	      c_subscriberPool.getNoOfFree());    infoEvent("Suma: c_tablePool  size: %d free: %d",	      c_tablePool_.getSize(),	      c_tablePool_.getNoOfFree());    infoEvent("Suma: c_subscriptionPool  size: %d free: %d",	      c_subscriptionPool.getSize(),	      c_subscriptionPool.getNoOfFree());    infoEvent("Suma: c_syncPool  size: %d free: %d",	      c_syncPool.getSize(),	      c_syncPool.getNoOfFree());    infoEvent("Suma: c_dataBufferPool  size: %d free: %d",	      c_dataBufferPool.getSize(),	      c_dataBufferPool.getNoOfFree());    infoEvent("Suma: c_metaSubscribers count: %d",	      count_subscribers(c_metaSubscribers));    infoEvent("Suma: c_dataSubscribers count: %d",	      count_subscribers(c_dataSubscribers));    infoEvent("Suma: c_prepDataSubscribers count: %d",	      count_subscribers(c_prepDataSubscribers));    infoEvent("Suma: c_removeDataSubscribers count: %d",	      count_subscribers(c_removeDataSubscribers));  }}/******************************************************************** * * Convert a table name (db+schema+tablename) to tableId * */#if 0voidSumaParticipant::convertNameToId(SubscriptionPtr subPtr, Signal * signal){  jam();  if(subPtr.p->m_currentTable < subPtr.p->m_maxTables) {    jam();    GetTableIdReq * req = (GetTableIdReq *)signal->getDataPtrSend();    char * tableName = subPtr.p->m_tableNames[subPtr.p->m_currentTable];    const Uint32 strLen = strlen(tableName) + 1; // NULL Terminated    req->senderRef  = reference();    req->senderData = subPtr.i;    req->len        = strLen;    LinearSectionPtr ptr[1];    ptr[0].p  = (Uint32*)tableName;    ptr[0].sz = strLen;    sendSignal(DBDICT_REF,	       GSN_GET_TABLEID_REQ, 	       signal, 	       GetTableIdReq::SignalLength,	       JBB,	       ptr,	       1);  } else {    jam();    sendSubCreateConf(signal, subPtr.p->m_subscriberRef, subPtr);  }}#endifvoid SumaParticipant::addTableId(Uint32 tableId,			    SubscriptionPtr subPtr, SyncRecord *psyncRec){#ifdef NODEFAIL_DEBUG  ndbout_c("SumaParticipant::addTableId(%u,%u,%u), current_table=%u",	   tableId, subPtr.i, psyncRec, subPtr.p->m_currentTable);#endif  subPtr.p->m_tables[tableId] = 1;  subPtr.p->m_currentTable++;  if(psyncRec != NULL)    psyncRec->m_tableList.append(&tableId, 1);  }#if 0void SumaParticipant::execGET_TABLEID_CONF(Signal * signal){  jamEntry();  GetTableIdConf* conf = (GetTableIdConf *)signal->getDataPtr();  Uint32 tableId = conf->tableId;  //Uint32 schemaVersion = conf->schemaVersion;    Uint32 senderData = conf->senderData;  SubscriptionPtr subPtr;  Ptr<SyncRecord> syncPtr;  c_subscriptions.getPtr(subPtr, senderData);  c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);    /*   * add to m_tableList   */  addTableId(tableId, subPtr, syncPtr.p);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -