📄 transporterfacade.cpp
字号:
* Note that this function need no locking since its * only called from the destructor of Ndb (the NdbObject) * * Which is protected by a mutex */voidTransporterFacade::stop_instance(){ DBUG_ENTER("TransporterFacade::stop_instance"); if(theFacadeInstance) theFacadeInstance->doStop(); DBUG_VOID_RETURN;}voidTransporterFacade::doStop(){ DBUG_ENTER("TransporterFacade::doStop"); /** * First stop the ClusterMgr because it needs to send one more signal * and also uses theFacadeInstance to lock/unlock theMutexPtr */ if (theClusterMgr != NULL) theClusterMgr->doStop(); if (theArbitMgr != NULL) theArbitMgr->doStop(NULL); /** * Now stop the send and receive threads */ void *status; theStopReceive = 1; if (theReceiveThread) { NdbThread_WaitFor(theReceiveThread, &status); NdbThread_Destroy(&theReceiveThread); } if (theSendThread) { NdbThread_WaitFor(theSendThread, &status); NdbThread_Destroy(&theSendThread); } DBUG_VOID_RETURN;}extern "C" void* runSendRequest_C(void * me){ ((TransporterFacade*) me)->threadMainSend(); return 0;}void TransporterFacade::threadMainSend(void){ theTransporterRegistry->startSending(); if (!theTransporterRegistry->start_clients()){ ndbout_c("Unable to start theTransporterRegistry->start_clients"); exit(0); } m_socket_server.startServer(); while(!theStopReceive) { NdbSleep_MilliSleep(10); NdbMutex_Lock(theMutexPtr); if (sendPerformedLastInterval == 0) { theTransporterRegistry->performSend(); } sendPerformedLastInterval = 0; NdbMutex_Unlock(theMutexPtr); } theTransporterRegistry->stopSending(); m_socket_server.stopServer(); m_socket_server.stopSessions(true); theTransporterRegistry->stop_clients();}extern "C" void* runReceiveResponse_C(void * me){ ((TransporterFacade*) me)->threadMainReceive(); return 0;}void TransporterFacade::threadMainReceive(void){ theTransporterRegistry->startReceiving(); NdbMutex_Lock(theMutexPtr); theTransporterRegistry->update_connections(); NdbMutex_Unlock(theMutexPtr); while(!theStopReceive) { for(int i = 0; i<10; i++){ const int res = theTransporterRegistry->pollReceive(10); if(res > 0){ NdbMutex_Lock(theMutexPtr); theTransporterRegistry->performReceive(); NdbMutex_Unlock(theMutexPtr); } } NdbMutex_Lock(theMutexPtr); theTransporterRegistry->update_connections(); NdbMutex_Unlock(theMutexPtr); }//while theTransporterRegistry->stopReceiving();}TransporterFacade::TransporterFacade() : theTransporterRegistry(0), theStopReceive(0), theSendThread(NULL), theReceiveThread(NULL), m_fragmented_signal_id(0){ DBUG_ENTER("TransporterFacade::TransporterFacade"); theOwnId = 0; theMutexPtr = NdbMutex_Create(); sendPerformedLastInterval = 0; checkCounter = 4; currentSendLimit = 1; theClusterMgr = NULL; theArbitMgr = NULL; theStartNodeId = 1; m_scan_batch_size= MAX_SCAN_BATCH_SIZE; m_batch_byte_size= SCAN_BATCH_SIZE; m_batch_size= DEF_BATCH_SIZE; m_max_trans_id = 0; theClusterMgr = new ClusterMgr(* this); DBUG_VOID_RETURN;}boolTransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props){ DBUG_ENTER("TransporterFacade::init"); theOwnId = nodeId; theTransporterRegistry = new TransporterRegistry(this); const int res = IPCConfig::configureTransporters(nodeId, * props, * theTransporterRegistry); if(res <= 0){ TRP_DEBUG( "configureTransporters returned 0 or less" ); DBUG_RETURN(false); } ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE); iter.first(); theClusterMgr->init(iter); iter.first(); if(iter.find(CFG_NODE_ID, nodeId)){ TRP_DEBUG( "Node info missing from config." ); DBUG_RETURN(false); } Uint32 rank = 0; if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){ theArbitMgr = new ArbitMgr(* this); theArbitMgr->setRank(rank); Uint32 delay = 0; iter.get(CFG_NODE_ARBIT_DELAY, &delay); theArbitMgr->setDelay(delay); } Uint32 scan_batch_size= 0; if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) { m_scan_batch_size= scan_batch_size; } Uint32 batch_byte_size= 0; if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) { m_batch_byte_size= batch_byte_size; } Uint32 batch_size= 0; if (!iter.get(CFG_BATCH_SIZE, &batch_size)) { m_batch_size= batch_size; } Uint32 timeout = 120000; iter.first(); for (iter.first(); iter.valid(); iter.next()) { Uint32 tmp1 = 0, tmp2 = 0; iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1); iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2); tmp1 += tmp2; if (tmp1 > timeout) timeout = tmp1; } m_waitfor_timeout = timeout; if (!theTransporterRegistry->start_service(m_socket_server)){ ndbout_c("Unable to start theTransporterRegistry->start_service"); DBUG_RETURN(false); } theReceiveThread = NdbThread_Create(runReceiveResponse_C, (void**)this, 32768, "ndb_receive", NDB_THREAD_PRIO_LOW); theSendThread = NdbThread_Create(runSendRequest_C, (void**)this, 32768, "ndb_send", NDB_THREAD_PRIO_LOW); theClusterMgr->startThread(); #ifdef API_TRACE signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);#endif DBUG_RETURN(true);}voidTransporterFacade::connected(){ DBUG_ENTER("TransporterFacade::connected"); Uint32 sz = m_threads.m_statusNext.size(); for (Uint32 i = 0; i < sz ; i ++) { if (m_threads.getInUse(i)){ void * obj = m_threads.m_objectExecute[i].m_object; NodeStatusFunction RegPC = m_threads.m_statusFunction[i]; (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true); } } DBUG_VOID_RETURN;}voidTransporterFacade::ReportNodeDead(NodeId tNodeId){ /** * When a node fails we must report this to each Ndb object. * The function that is used for communicating node failures is called. * This is to ensure that the Ndb objects do not think their connections * are correct after a failure followed by a restart. * After the restart the node is up again and the Ndb object * might not have noticed the failure. */ Uint32 sz = m_threads.m_statusNext.size(); for (Uint32 i = 0; i < sz ; i ++) { if (m_threads.getInUse(i)){ void * obj = m_threads.m_objectExecute[i].m_object; NodeStatusFunction RegPC = m_threads.m_statusFunction[i]; (*RegPC) (obj, tNodeId, false, false); } }}voidTransporterFacade::ReportNodeFailureComplete(NodeId tNodeId){ /** * When a node fails we must report this to each Ndb object. * The function that is used for communicating node failures is called. * This is to ensure that the Ndb objects do not think their connections * are correct after a failure followed by a restart. * After the restart the node is up again and the Ndb object * might not have noticed the failure. */ DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete"); DBUG_PRINT("enter",("nodeid= %d", tNodeId)); Uint32 sz = m_threads.m_statusNext.size(); for (Uint32 i = 0; i < sz ; i ++) { if (m_threads.getInUse(i)){ void * obj = m_threads.m_objectExecute[i].m_object; NodeStatusFunction RegPC = m_threads.m_statusFunction[i]; (*RegPC) (obj, tNodeId, false, true); } } DBUG_VOID_RETURN;}voidTransporterFacade::ReportNodeAlive(NodeId tNodeId){ /** * When a node fails we must report this to each Ndb object. * The function that is used for communicating node failures is called. * This is to ensure that the Ndb objects do not think there connections * are correct after a failure * followed by a restart. * After the restart the node is up again and the Ndb object * might not have noticed the failure. */ Uint32 sz = m_threads.m_statusNext.size(); for (Uint32 i = 0; i < sz ; i ++) { if (m_threads.getInUse(i)){ void * obj = m_threads.m_objectExecute[i].m_object; NodeStatusFunction RegPC = m_threads.m_statusFunction[i]; (*RegPC) (obj, tNodeId, true, false); } }}int TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id){ NdbMutex_Lock(theMutexPtr); Uint32 low_bits = (Uint32)trans_id; m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits; close_local(blockNumber); NdbMutex_Unlock(theMutexPtr); return 0;}int TransporterFacade::close_local(BlockNumber blockNumber){ m_threads.close(blockNumber); return 0;}intTransporterFacade::open(void* objRef, ExecuteFunction fun, NodeStatusFunction statusFun){ DBUG_ENTER("TransporterFacade::open"); int r= m_threads.open(objRef, fun, statusFun); if (r < 0) DBUG_RETURN(r);#if 1 if (theOwnId > 0) { (*statusFun)(objRef, numberToRef(r, theOwnId), true, true); }#endif DBUG_RETURN(r);}TransporterFacade::~TransporterFacade(){ DBUG_ENTER("TransporterFacade::~TransporterFacade"); NdbMutex_Lock(theMutexPtr); delete theClusterMgr; delete theArbitMgr; delete theTransporterRegistry; NdbMutex_Unlock(theMutexPtr); NdbMutex_Destroy(theMutexPtr);#ifdef API_TRACE signalLogger.setOutputStream(0);#endif DBUG_VOID_RETURN;}void TransporterFacade::calculateSendLimit(){ Uint32 Ti; Uint32 TthreadCount = 0; Uint32 sz = m_threads.m_statusNext.size(); for (Ti = 0; Ti < sz; Ti++) { if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){ TthreadCount++; m_threads.m_statusNext[Ti] = ThreadData::INACTIVE; } } currentSendLimit = TthreadCount; if (currentSendLimit == 0) { currentSendLimit = 1; } checkCounter = currentSendLimit << 2;}//-------------------------------------------------// Force sending but still report the sending to the// adaptive algorithm.//-------------------------------------------------void TransporterFacade::forceSend(Uint32 block_number) { checkCounter--; m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE; sendPerformedLastInterval = 1; if (checkCounter < 0) { calculateSendLimit(); } theTransporterRegistry->forceSendCheck(0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -