📄 suma.hpp
字号:
struct Bucket { bool active; bool handover; bool handover_started; Uint32 handoverGCI; };#define NO_OF_BUCKETS 24 struct Bucket c_buckets[NO_OF_BUCKETS]; bool c_handoverToDo; Uint32 c_lastCompleteGCI; /** * */ DLList<Subscriber> c_metaSubscribers; DLList<Subscriber> c_dataSubscribers; DLList<Subscriber> c_prepDataSubscribers; DLList<Subscriber> c_removeDataSubscribers; /** * Lists */ KeyTable<Table> c_tables; DLHashTable<Subscription> c_subscriptions; /** * Pools */ ArrayPool<Subscriber> c_subscriberPool; ArrayPool<Table> c_tablePool_; ArrayPool<Subscription> c_subscriptionPool; ArrayPool<SyncRecord> c_syncPool; DataBuffer<15>::DataBufferPool c_dataBufferPool; /** * for restarting Suma not to start sending data too early */ bool c_restartLock; /** * for flagging that a GCI containg inconsistent data * typically due to node failiure */ Uint32 c_lastInconsistentGCI; Uint32 c_nodeFailGCI; NodeBitmask c_failedApiNodes; /** * Functions */ bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId); bool parseTable(Signal* signal, class GetTabInfoConf* conf, Uint32 tableId, SyncRecord* syncPtr_p); bool checkTableTriggers(SegmentedSectionPtr ptr); void addTableId(Uint32 TableId, SubscriptionPtr subPtr, SyncRecord *psyncRec); void sendSubIdRef(Signal* signal, Uint32 errorCode); void sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr); void sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errorCode); void sendSubStartRef(SubscriptionPtr subPtr, Signal* signal, Uint32 errorCode, bool temporary = false); void sendSubStartRef(Signal* signal, Uint32 errorCode, bool temporary = false); void sendSubStopRef(Signal* signal, Uint32 errorCode, bool temporary = false); void sendSubSyncRef(Signal* signal, Uint32 errorCode); void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref, Uint32 errorCode, bool temporary = false); void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, SubscriptionData::Part); void sendSubStopComplete(Signal*, SubscriberPtr); void sendSubStopReq(Signal* signal, bool unlock= false); void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr); Uint32 getFirstGCI(Signal* signal); Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci); virtual Uint32 getStoreBucket(Uint32 v) = 0; virtual Uint32 getResponsibleSumaNodeId(Uint32 D) = 0; virtual Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true) = 0; struct FailoverBuffer { // FailoverBuffer(DataBuffer<15>::DataBufferPool & p); FailoverBuffer(); bool subTableData(Uint32 gci, Uint32 *src, int sz); bool subGcpCompleteRep(Uint32 gci); bool nodeFailRep(); // typedef DataBuffer<15> GCIDataBuffer; // GCIDataBuffer m_GCIDataBuffer; // GCIDataBuffer::DataBufferIterator m_GCIDataBuffer_it; Uint32 *c_gcis; int c_sz; // Uint32 *c_buf; // int c_buf_sz; int c_first; int c_next; bool c_full; } c_failoverBuffer; /** * Table admin */ void convertNameToId( SubscriptionPtr subPtr, Signal * signal);};class Suma : public SumaParticipant { BLOCK_DEFINES(Suma);public: Suma(const Configuration & conf); virtual ~Suma();private: /** * Public interface */ void execCREATE_SUBSCRIPTION_REQ(Signal* signal); void execDROP_SUBSCRIPTION_REQ(Signal* signal); void execSTART_SUBSCRIPTION_REQ(Signal* signal); void execSTOP_SUBSCRIPTION_REQ(Signal* signal); void execSYNC_SUBSCRIPTION_REQ(Signal* signal); void execABORT_SYNC_REQ(Signal* signal); /** * Framework signals */ void getNodeGroupMembers(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal); void execSTTOR(Signal* signal); void sendSTTORRY(Signal*); void execNDB_STTOR(Signal* signal); void execDUMP_STATE_ORD(Signal* signal); void execREAD_NODESCONF(Signal* signal); void execNODE_FAILREP(Signal* signal); void execINCL_NODEREQ(Signal* signal); void execCONTINUEB(Signal* signal); void execSIGNAL_DROPPED_REP(Signal* signal); void execAPI_FAILREQ(Signal* signal) ; void execSUB_GCP_COMPLETE_ACC(Signal* signal); /** * Controller interface */ void execSUB_CREATE_REF(Signal* signal); void execSUB_CREATE_CONF(Signal* signal); void execSUB_DROP_REF(Signal* signal); void execSUB_DROP_CONF(Signal* signal); void execSUB_START_REF(Signal* signal); void execSUB_START_CONF(Signal* signal); void execSUB_STOP_REF(Signal* signal); void execSUB_STOP_CONF(Signal* signal); void execSUB_SYNC_REF(Signal* signal); void execSUB_SYNC_CONF(Signal* signal); void execSUB_ABORT_SYNC_REF(Signal* signal); void execSUB_ABORT_SYNC_CONF(Signal* signal); void execSUMA_START_ME(Signal* signal); void execSUMA_HANDOVER_REQ(Signal* signal); void execSUMA_HANDOVER_CONF(Signal* signal); /** * Subscription generation interface */ void createSequence(Signal* signal); void createSequenceReply(Signal* signal, UtilSequenceConf* conf, UtilSequenceRef* ref); void execUTIL_SEQUENCE_CONF(Signal* signal); void execUTIL_SEQUENCE_REF(Signal* signal); void execCREATE_SUBID_REQ(Signal* signal); Uint32 getStoreBucket(Uint32 v); Uint32 getResponsibleSumaNodeId(Uint32 D); /** * for Suma that is restarting another */ struct Restart { Restart(Suma& s); Suma & suma; bool c_okToStart[MAX_REPLICAS]; bool c_waitingToStart[MAX_REPLICAS]; DLHashTable<SumaParticipant::Subscription>::Iterator c_subPtr; // TODO [MAX_REPLICAS] SubscriberPtr c_subbPtr; // TODO [MAX_REPLICAS] void progError(int line, int cause, const char * extra) { suma.progError(line, cause, extra); } void resetNode(Uint32 sumaRef); void runSUMA_START_ME(Signal*, Uint32 sumaRef); void startNode(Signal*, Uint32 sumaRef); void createSubscription(Signal* signal, Uint32 sumaRef); void nextSubscription(Signal* signal, Uint32 sumaRef); void completeSubscription(Signal* signal, Uint32 sumaRef); void startSync(Signal* signal, Uint32 sumaRef); void nextSync(Signal* signal, Uint32 sumaRef); void completeSync(Signal* signal, Uint32 sumaRef); void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr, Signal* signal, Uint32 sumaRef); void startSubscriber(Signal* signal, Uint32 sumaRef); void nextSubscriber(Signal* signal, Uint32 sumaRef); void completeSubscriber(Signal* signal, Uint32 sumaRef); void completeRestartingNode(Signal* signal, Uint32 sumaRef); } Restart;private: friend class Restart; struct SubCoordinator { Uint32 m_subscriberRef; Uint32 m_subscriberData; Uint32 m_subscriptionId; Uint32 m_subscriptionKey; NdbNodeBitmask m_participants; Uint32 m_outstandingGsn; SignalCounter m_outstandingRequests; Uint32 nextList; union { Uint32 prevList; Uint32 nextPool; }; }; Ptr<SubCoordinator> SubCoordinatorPtr; struct Node { Uint32 nodeId; Uint32 alive; Uint32 nextList; union { Uint32 prevList; Uint32 nextPool; }; }; typedef Ptr<Node> NodePtr; /** * Variables */ NodeId c_masterNodeId; SLList<Node> c_nodes; NdbNodeBitmask c_aliveNodes; NdbNodeBitmask c_preparingNodes; Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true); /** * for all Suma's to keep track of other Suma's in Node group */ Uint32 c_nodeGroup; Uint32 c_noNodesInGroup; Uint32 c_idInNodeGroup; NodeId c_nodesInGroup[MAX_REPLICAS]; /** * don't seem to be used */ ArrayPool<Node> c_nodePool; ArrayPool<SubCoordinator> c_subCoordinatorPool; DLList<SubCoordinator> c_runningSubscriptions;};inline Uint32Suma::RtoI(Uint32 sumaRef, bool dieOnNotFound) { for (Uint32 i = 0; i < c_noNodesInGroup; i++) { if (sumaRef == calcSumaBlockRef(c_nodesInGroup[i])) return i; } ndbrequire(!dieOnNotFound); return RNIL;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -