⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transporterfacade.hpp

📁 mysql-5.0.22.tar.gz源码包
💻 HPP
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#ifndef TransporterFacade_H#define TransporterFacade_H#include <kernel_types.h>#include <ndb_limits.h>#include <NdbThread.h>#include <TransporterRegistry.hpp>#include <NdbMutex.h>#include "DictCache.hpp"#include <BlockNumbers.h>#include <mgmapi.h>class ClusterMgr;class ArbitMgr;class IPCConfig;struct ndb_mgm_configuration;class ConfigRetriever;class Ndb;class NdbApiSignal;typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);extern "C" {  void* runSendRequest_C(void*);  void* runReceiveResponse_C(void*);  void atexit_stop_instance();}class TransporterFacade{public:  TransporterFacade();  virtual ~TransporterFacade();  bool init(Uint32, const ndb_mgm_configuration *);  static TransporterFacade* instance();  int start_instance(int, const ndb_mgm_configuration*);  static void stop_instance();    /**   * Register this block for sending/receiving signals   * @return BlockNumber or -1 for failure   */  int open(void* objRef, ExecuteFunction, NodeStatusFunction);    // Close this block number  int close(BlockNumber blockNumber, Uint64 trans_id);  // Only sends to nodes which are alive  int sendSignal(NdbApiSignal * signal, NodeId nodeId);  int sendSignal(NdbApiSignal*, NodeId, 		 LinearSectionPtr ptr[3], Uint32 secs);  int sendFragmentedSignal(NdbApiSignal*, NodeId, 			   LinearSectionPtr ptr[3], Uint32 secs);  // Is node available for running transactions  bool   get_node_alive(NodeId nodeId) const;  bool   get_node_stopping(NodeId nodeId) const;  bool   getIsDbNode(NodeId nodeId) const;  bool   getIsNodeSendable(NodeId nodeId) const;  Uint32 getNodeGrp(NodeId nodeId) const;  Uint32 getNodeSequence(NodeId nodeId) const;  // Is there space in sendBuffer to send messages  bool   check_send_size(Uint32 node_id, Uint32 send_size);  // My own processor id  NodeId ownId() const;  void connected();  void doConnect(int NodeId);  void reportConnected(int NodeId);  void doDisconnect(int NodeId);  void reportDisconnected(int NodeId);  NodeId get_an_alive_node();  void ReportNodeAlive(NodeId nodeId);  void ReportNodeDead(NodeId nodeId);  void ReportNodeFailureComplete(NodeId nodeId);    void lock_mutex();  void unlock_mutex();  // Improving the API performance  void forceSend(Uint32 block_number);  void checkForceSend(Uint32 block_number);  // Close this block number  int close_local(BlockNumber blockNumber);  // Scan batch configuration parameters  Uint32 get_scan_batch_size();  Uint32 get_batch_byte_size();  Uint32 get_batch_size();  TransporterRegistry* get_registry() { return theTransporterRegistry;};  // heart beat received from a node (e.g. a signal came)  void hb_received(NodeId n);private:  /**   * Send a signal unconditional of node status (used by ClusterMgr)   */  friend class ClusterMgr;  friend class ArbitMgr;  friend class MgmtSrvr;  friend class SignalSender;  friend class GrepPS;  friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond  friend class GrepSS;  friend class Ndb;  friend class Ndb_cluster_connection_impl;  friend class NdbTransaction;    int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);  bool isConnected(NodeId aNodeId);  void doStop();  TransporterRegistry* theTransporterRegistry;  SocketServer m_socket_server;  int sendPerformedLastInterval;  int theOwnId;  NodeId theStartNodeId;  ClusterMgr* theClusterMgr;  ArbitMgr* theArbitMgr;    // Improving the API response time  int checkCounter;  Uint32 currentSendLimit;    void calculateSendLimit();  // Scan batch configuration parameters  Uint32 m_scan_batch_size;  Uint32 m_batch_byte_size;  Uint32 m_batch_size;  // Declarations for the receive and send thread  int  theStopReceive;  void threadMainSend(void);  NdbThread* theSendThread;  void threadMainReceive(void);  NdbThread* theReceiveThread;  friend void* runSendRequest_C(void*);  friend void* runReceiveResponse_C(void*);  friend void atexit_stop_instance();  /**   * Block number handling   */public:  /**   * Max number of Ndb objects.     * (Ndb objects should not be shared by different threads.)   */  STATIC_CONST( MAX_NO_THREADS = 4711 );  Uint32 m_waitfor_timeout; // in milli seconds...private:  struct ThreadData {    STATIC_CONST( ACTIVE = (1 << 16) | 1 );    STATIC_CONST( INACTIVE = (1 << 16) );    STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );        ThreadData(Uint32 initialSize = 32);        /**     * Split "object" into 3 list     *   This to improve locality     *   when iterating over lists     */    struct Object_Execute {      void * m_object;      ExecuteFunction m_executeFunction;    };    struct NodeStatus_NextFree {      NodeStatusFunction m_statusFunction;    };    Uint32 m_firstFree;    Vector<Uint32> m_statusNext;    Vector<Object_Execute> m_objectExecute;    Vector<NodeStatusFunction> m_statusFunction;        int open(void* objRef, ExecuteFunction, NodeStatusFunction);    int close(int number);    void expand(Uint32 size);    inline Object_Execute get(Uint16 blockNo) const {      blockNo -= MIN_API_BLOCK_NO;      if(blockNo < m_objectExecute.size()){	return m_objectExecute[blockNo];      }      Object_Execute oe = { 0, 0 };      return oe;    }    /**     * Is the block number used currently     */    inline bool getInUse(Uint16 index) const {      return (m_statusNext[index] & (1 << 16)) != 0;    }  } m_threads;    Uint32 m_max_trans_id;  Uint32 m_fragmented_signal_id;  /**   * execute function   */  friend void execute(void * callbackObj, SignalHeader * const header,                       Uint8 prio,                       Uint32 * const theData, LinearSectionPtr ptr[3]);  public:  NdbMutex* theMutexPtr;private:  static TransporterFacade* theFacadeInstance;public:  GlobalDictCache m_globalDictCache;};inlineTransporterFacade*TransporterFacade::instance(){  return theFacadeInstance;}inlinevoid TransporterFacade::lock_mutex(){  NdbMutex_Lock(theMutexPtr);}inlinevoid TransporterFacade::unlock_mutex(){  NdbMutex_Unlock(theMutexPtr);}#include "ClusterMgr.hpp"inlineboolTransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size){  return true;}inlineboolTransporterFacade::getIsDbNode(NodeId n) const {  return     theClusterMgr->getNodeInfo(n).defined &&     theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB;}inlineUint32TransporterFacade::getNodeGrp(NodeId n) const {  return theClusterMgr->getNodeInfo(n).m_state.nodeGroup;}inlineboolTransporterFacade::get_node_alive(NodeId n) const {  const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);  return node.m_alive;}inlinevoidTransporterFacade::hb_received(NodeId n) {  theClusterMgr->hb_received(n);}inlineboolTransporterFacade::get_node_stopping(NodeId n) const {  const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);  return ((node.m_state.startLevel == NodeState::SL_STOPPING_1) ||          (node.m_state.startLevel == NodeState::SL_STOPPING_2));}inlineboolTransporterFacade::getIsNodeSendable(NodeId n) const {  const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);  const Uint32 startLevel = node.m_state.startLevel;  if (node.m_info.m_type == NodeInfo::DB) {    if(node.m_state.singleUserMode &&        ownId() == node.m_state.singleUserApi) {      return (node.compatible &&               (node.m_state.startLevel == NodeState::SL_STOPPING_1 ||               node.m_state.startLevel == NodeState::SL_STARTED ||               node.m_state.startLevel == NodeState::SL_SINGLEUSER));      }      else        return node.compatible && (startLevel == NodeState::SL_STARTED ||                                 startLevel == NodeState::SL_STOPPING_1);  } else if (node.m_info.m_type == NodeInfo::REP) {    /**     * @todo Check that REP node actually has received API_REG_REQ     */    return node.compatible;  } else {    ndbout_c("TransporterFacade::getIsNodeSendable: Illegal node type: "             "%d of node: %d",              node.m_info.m_type, n);    abort();    return false; // to remove compiler warning  }}inlineUint32TransporterFacade::getNodeSequence(NodeId n) const {  return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;}inlineUint32TransporterFacade::get_scan_batch_size() {  return m_scan_batch_size;}inlineUint32TransporterFacade::get_batch_byte_size() {  return m_batch_byte_size;}inlineUint32TransporterFacade::get_batch_size() {  return m_batch_size;}#endif // TransporterFacade_H

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -