📄 simulatedblock.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include "SimulatedBlock.hpp"#include <NdbOut.hpp>#include <GlobalData.hpp>#include <Emulator.hpp>#include <ErrorHandlingMacros.hpp>#include <TimeQueue.hpp>#include <TransporterRegistry.hpp>#include <SignalLoggerManager.hpp>#include <FastScheduler.hpp>#include "ndbd_malloc.hpp"#include <signaldata/EventReport.hpp>#include <signaldata/ContinueFragmented.hpp>#include <signaldata/NodeStateSignalData.hpp>#include <signaldata/FsRef.hpp>#include <signaldata/SignalDroppedRep.hpp>#include <DebuggerNames.hpp>#include "LongSignal.hpp"#include <Properties.hpp>#include "Configuration.hpp"#define ljamEntry() jamEntryLine(30000 + __LINE__)#define ljam() jamLine(30000 + __LINE__)//// Constructor, Destructor//SimulatedBlock::SimulatedBlock(BlockNumber blockNumber, const class Configuration & conf) : theNodeId(globalData.ownId), theNumber(blockNumber), theReference(numberToRef(blockNumber, globalData.ownId)), theConfiguration(conf), c_fragmentInfoHash(c_fragmentInfoPool), c_linearFragmentSendList(c_fragmentSendPool), c_segmentedFragmentSendList(c_fragmentSendPool), c_mutexMgr(* this), c_counterMgr(* this), c_ptrMetaDataCommon(0){ NewVarRef = 0; globalData.setBlock(blockNumber, this); c_fragmentIdCounter = 1; c_fragSenderRunning = false; Properties tmp; const Properties * p = &tmp; ndbrequire(p != 0); Uint32 count = 10; char buf[255]; count = 10; BaseString::snprintf(buf, 255, "%s.FragmentSendPool", getBlockName(blockNumber)); if(!p->get(buf, &count)) p->get("FragmentSendPool", &count); c_fragmentSendPool.setSize(count); count = 10; BaseString::snprintf(buf, 255, "%s.FragmentInfoPool", getBlockName(blockNumber)); if(!p->get(buf, &count)) p->get("FragmentInfoPool", &count); c_fragmentInfoPool.setSize(count); count = 10; BaseString::snprintf(buf, 255, "%s.FragmentInfoHash", getBlockName(blockNumber)); if(!p->get(buf, &count)) p->get("FragmentInfoHash", &count); c_fragmentInfoHash.setSize(count); count = 5; BaseString::snprintf(buf, 255, "%s.ActiveMutexes", getBlockName(blockNumber)); if(!p->get(buf, &count)) p->get("ActiveMutexes", &count); c_mutexMgr.setSize(count); c_counterMgr.setSize(5); #ifdef VM_TRACE_TIME clearTimes();#endif for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++) theExecArray[i] = 0; installSimulatedBlockFunctions(); UpgradeStartup::installEXEC(this); CLEAR_ERROR_INSERT_VALUE;#ifdef VM_TRACE m_global_variables = new Ptr<void> * [1]; m_global_variables[0] = 0;#endif}SimulatedBlock::~SimulatedBlock(){ freeBat();#ifdef VM_TRACE_TIME printTimes(stdout);#endif#ifdef VM_TRACE delete [] m_global_variables;#endif}void SimulatedBlock::installSimulatedBlockFunctions(){ ExecFunction * a = theExecArray; a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP; a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ; a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER; a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP; a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED; a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF; a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF; a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF; a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF; a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF; a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF; a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF; a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF; a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF; a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF; a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF; a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF; a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF; a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF; a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;}voidSimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn, ExecFunction f, bool force){ if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){ char errorMsg[255]; BaseString::snprintf(errorMsg, 255, "GSN %d(%d))", gsn, MAX_GSN); ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg); } theExecArray[gsn] = f;}voidSimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo, const char* filename, int lineno) const { char objRef[255]; BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno); char probData[255]; BaseString::snprintf(probData, 255, "Signal (GSN: %d, Length: %d, Rec Block No: %d)", gsn, len, recBlockNo); ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO, probData, objRef);}extern class SectionSegmentPool g_sectionSegmentPool;void SimulatedBlock::sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer) const { BlockNumber sendBnr = number(); BlockReference sendBRef = reference(); Uint32 noOfSections = signal->header.m_noOfSections; Uint32 recBlock = refToBlock(ref); Uint32 recNode = refToNode(ref); Uint32 ourProcessor = globalData.ownId; signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; Uint32 tSignalId = signal->header.theSignalId; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if#ifdef VM_TRACE if(globalData.testOn){ Uint16 proc = (recNode == 0 ? globalData.ownId : recNode); signal->header.theSendersBlockRef = sendBRef; globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], proc, signal->m_sectionPtr, signal->header.m_noOfSections); }#endif if(recNode == ourProcessor || recNode == 0) { signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = sendBRef; signal->header.theLength = length; globalScheduler.execute(signal, jobBuffer, recBlock, gsn); signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return; } else { // send distributed Signal SignalHeader sh; Uint32 tTrace = signal->getTrace(); sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = sendBnr; sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = 0; #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode);#endif SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, g_sectionSegmentPool, signal->m_sectionPtr); ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); ::releaseSections(noOfSections, signal->m_sectionPtr); signal->header.m_noOfSections = 0; } return;}void SimulatedBlock::sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer) const { Uint32 noOfSections = signal->header.m_noOfSections; Uint32 tSignalId = signal->header.theSignalId; Uint32 tTrace = signal->getTrace(); Uint32 tFragInf = signal->header.m_fragmentInfo; Uint32 ourProcessor = globalData.ownId; Uint32 recBlock = rg.m_block; signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if SignalHeader sh; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = number(); sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInf; /** * Check own node */ bool release = true; if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){#ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], ourProcessor, signal->m_sectionPtr, signal->header.m_noOfSections); }#endif globalScheduler.execute(signal, jobBuffer, recBlock, gsn); rg.m_nodes.clear((Uint32)0); rg.m_nodes.clear(ourProcessor); release = false; } /** * Do the big loop */ Uint32 recNode = 0; while(!rg.m_nodes.isclear()){ recNode = rg.m_nodes.find(recNode + 1); rg.m_nodes.clear(recNode);#ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], recNode, signal->m_sectionPtr, signal->header.m_noOfSections); }#endif#ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode);#endif SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, g_sectionSegmentPool, signal->m_sectionPtr); ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); } if(release){ ::releaseSections(noOfSections, signal->m_sectionPtr); } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return;}bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);void SimulatedBlock::sendSignal(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, LinearSectionPtr ptr[3], Uint32 noOfSections) const { BlockNumber sendBnr = number(); BlockReference sendBRef = reference(); Uint32 recBlock = refToBlock(ref); Uint32 recNode = refToNode(ref); Uint32 ourProcessor = globalData.ownId; ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.m_noOfSections = noOfSections; Uint32 tSignalId = signal->header.theSignalId; Uint32 tFragInfo = signal->header.m_fragmentInfo; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if#ifdef VM_TRACE if(globalData.testOn){ Uint16 proc = (recNode == 0 ? globalData.ownId : recNode); signal->header.theSendersBlockRef = sendBRef; globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], proc, ptr, noOfSections); }#endif if(recNode == ourProcessor || recNode == 0) { signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = sendBRef; /** * We have to copy the data */ Ptr<SectionSegment> segptr[3]; for(Uint32 i = 0; i<noOfSections; i++){ ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz)); signal->m_sectionPtr[i].i = segptr[i].i; } globalScheduler.execute(signal, jobBuffer, recBlock, gsn); signal->header.m_noOfSections = 0; return; } else { // send distributed Signal SignalHeader sh; Uint32 tTrace = signal->getTrace(); Uint32 noOfSections = signal->header.m_noOfSections; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = sendBnr; sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode);#endif SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, ptr); ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return;}void SimulatedBlock::sendSignal(NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jobBuffer, LinearSectionPtr ptr[3], Uint32 noOfSections) const { Uint32 tSignalId = signal->header.theSignalId; Uint32 tTrace = signal->getTrace(); Uint32 tFragInfo = signal->header.m_fragmentInfo; Uint32 ourProcessor = globalData.ownId; Uint32 recBlock = rg.m_block; ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); signal->header.theLength = length; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = recBlock; signal->header.theSendersSignalId = tSignalId; signal->header.theSendersBlockRef = reference(); signal->header.m_noOfSections = noOfSections; if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) { signal_error(gsn, length, recBlock, __FILE__, __LINE__); return; }//if SignalHeader sh; sh.theVerId_signalNumber = gsn; sh.theReceiversBlockNumber = recBlock; sh.theSendersBlockRef = number();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -