ose_transporter.cpp
来自「MySQL数据库开发源码 值得一看哦」· C++ 代码 · 共 488 行
CPP
488 行
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ose.h>#include "OSE_Transporter.hpp"#include "OSE_Signals.hpp"#include <TransporterCallback.hpp>#include "TransporterInternalDefinitions.hpp"#include <NdbMutex.h>#include <NdbHost.h>#include <NdbOut.hpp>#include <time.h>OSE_Transporter::OSE_Transporter(int _prioASignalSize, int _prioBSignalSize, NodeId localNodeId, const char * lHostName, NodeId remoteNodeId, NodeId serverNodeId, const char * rHostName, int byteorder, bool compression, bool checksum, bool signalId, Uint32 reportFreq) : Transporter(localNodeId, remoteNodeId, serverNodeId, byteorder, compression, checksum, signalId), isServer(localNodeId < remoteNodeId){ signalIdCounter = 0; prioBSignalSize = _prioBSignalSize; if (strcmp(lHostName, rHostName) == 0){ BaseString::snprintf(remoteNodeName, sizeof(remoteNodeName), "ndb_node%d", remoteNodeId); } else { BaseString::snprintf(remoteNodeName, sizeof(remoteNodeName), "%s/ndb_node%d", rHostName, remoteNodeId); } prioBSignal = NIL;}OSE_Transporter::~OSE_Transporter() {#if 0 /** * Don't free these buffers since they have already been freed * when the process allocating them died (wild pointers) */ if(prioBSignal != NIL) free_buf(&prioBSignal);#endif}boolOSE_Transporter::initTransporter() { struct OS_pcb * pcb = get_pcb(current_process()); if(pcb != NULL){ if(pcb->type != OS_ILLEGAL){ if(prioBSignalSize > pcb->max_sigsize){ DEBUG("prioBSignalSize(" << prioBSignalSize << ") > max_sigsize(" << pcb->max_sigsize << ") using max_sigsize"); prioBSignalSize = pcb->max_sigsize; } } free_buf((union SIGNAL **)&pcb); } maxPrioBDataSize = prioBSignalSize; maxPrioBDataSize -= (sizeof(NdbTransporterData) + MAX_MESSAGE_SIZE - 4); if(maxPrioBDataSize < 0){ #ifdef DEBUG_TRANSPORTER printf("maxPrioBDataSize < 0 %d\n", maxPrioBDataSize);#endif return false; } initSignals(); return true;}voidOSE_Transporter::initSignals(){ if(prioBSignal == NIL){ prioBSignal = alloc(prioBSignalSize, NDB_TRANSPORTER_DATA); prioBInsertPtr = &prioBSignal->dataSignal.data[0]; prioBSignal->dataSignal.length = 0; prioBSignal->dataSignal.senderNodeId = localNodeId; } dataToSend = 0;}NdbTransporterData *OSE_Transporter::allocPrioASignal(Uint32 messageLenBytes) const{ const Uint32 lenBytes = messageLenBytes + sizeof(NdbTransporterData) - 4; NdbTransporterData * sig = (NdbTransporterData*)alloc(lenBytes, NDB_TRANSPORTER_PRIO_A); sig->length = 0; sig->senderNodeId = localNodeId; return sig;}Uint32 *OSE_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){ if(prio >= 1){ prio = 1; insertPtr = prioBInsertPtr; signal = (NdbTransporterData*)prioBSignal; } else { signal = allocPrioASignal(lenBytes); insertPtr = &signal->data[0]; } return insertPtr;}voidOSE_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 bufferSize = signal->length; bufferSize += lenBytes; signal->length = bufferSize; if(prio >= 1){ prioBInsertPtr += (lenBytes / 4); if(bufferSize >= maxPrioBDataSize) doSend(); } else { /** * Prio A signal are sent directly */ signal->sigId = 0; ::send((union SIGNAL**)&signal, remoteNodePid); }}#if 0int getSeq(int _seq){ if (_seq > 0){ switch (_seq % 100){ case 10: return _seq - 1; case 9: return _seq + 1; default: return _seq; } }else{ return _seq; }}int getSeq(int _seq){ switch (_seq % 40){ case 10: return _seq-4; case 9: return _seq-2; case 8: return _seq; case 7: return _seq+2; case 6: return _seq+4; case 30: return _seq-9; case 29: return _seq-7; case 28: return _seq-5; case 27: return _seq-3; case 26: return _seq-1; case 25: return _seq+1; case 24: return _seq+3; case 23: return _seq+5; case 22: return _seq+7; case 21: return _seq+9; default: return _seq; }}#endifvoidOSE_Transporter::doSend() { /** * restore is always called to make sure the signal buffer is taken over * by a process that is alive, this will otherwise lead to that these buffers * are removed when the process that allocated them dies */ restore(prioBSignal); if(prioBSignal->dataSignal.length > 0){ prioBSignal->dataSignal.sigId = signalIdCounter; signalIdCounter++; ::send(&prioBSignal, remoteNodePid); } initSignals();}voidOSE_Transporter::doConnect() { NdbMutex_Lock(theMutexPtr); if(_connecting || _disconnecting || _connected){ NdbMutex_Unlock(theMutexPtr); return; } _connecting = true; signalIdCounter = 0; if(isServer){ DEBUG("Waiting for connect req: "); state = WAITING_FOR_CONNECT_REQ; } else { state = WAITING_FOR_HUNT; DEBUG("Hunting for: " << remoteNodeName); union SIGNAL* huntsig; huntsig = alloc(sizeof(NdbTransporterHunt), NDB_TRANSPORTER_HUNT); huntsig->ndbHunt.remoteNodeId = remoteNodeId; hunt(remoteNodeName, 0, NULL, &huntsig); } NdbMutex_Unlock(theMutexPtr);}voidOSE_Transporter::doDisconnect() { NdbMutex_Lock(theMutexPtr); switch(state){ case DISCONNECTED: case WAITING_FOR_HUNT: case WAITING_FOR_CONNECT_REQ: case WAITING_FOR_CONNECT_CONF: break; case CONNECTED: {#if 0 /** * There should not be anything in the buffer that needs to be sent here */ DEBUG("Doing send before disconnect"); doSend();#endif union SIGNAL * sig = alloc(sizeof(NdbTransporterDisconnectOrd), NDB_TRANSPORTER_DISCONNECT_ORD); sig->ndbDisconnect.senderNodeId = localNodeId; sig->ndbDisconnect.reason = NdbTransporterDisconnectOrd::NDB_DISCONNECT; ::send(&sig, remoteNodePid); detach(&remoteNodeRef); } break; } state = DISCONNECTED; _connected = false; _connecting = false; _disconnecting = false; NdbMutex_Unlock(theMutexPtr);}voidOSE_Transporter::huntReceived(struct NdbTransporterHunt * sig){ if(isServer){ WARNING("Hunt received for server: remoteNodeId: " << sig->remoteNodeId); return; } if(state != WAITING_FOR_HUNT){ WARNING("Hunt received while in state: " << state); return; } remoteNodePid = sender((union SIGNAL**)&sig); union SIGNAL * signal = alloc(sizeof(NdbTransporterConnectReq), NDB_TRANSPORTER_CONNECT_REQ); signal->ndbConnectReq.remoteNodeId = remoteNodeId; signal->ndbConnectReq.senderNodeId = localNodeId; DEBUG("Sending connect req to pid: " << hex << remoteNodePid); ::send(&signal, remoteNodePid); state = WAITING_FOR_CONNECT_CONF; return;}boolOSE_Transporter::connectReq(struct NdbTransporterConnectReq * sig){ if(!isServer){ WARNING("OSE Connect Req received for client: senderNodeId: " << sig->senderNodeId); return false; } if(state != WAITING_FOR_CONNECT_REQ){ PROCESS pid = sender((union SIGNAL**)&sig); union SIGNAL * signal = alloc(sizeof(NdbTransporterConnectRef), NDB_TRANSPORTER_CONNECT_REF); signal->ndbConnectRef.senderNodeId = localNodeId; signal->ndbConnectRef.reason = NdbTransporterConnectRef::INVALID_STATE; DEBUG("Sending connect ref to pid: " << hex << pid); ::send(&signal, pid); return false; } NdbMutex_Lock(theMutexPtr); if(prioBSignal != NIL){ restore(prioBSignal); free_buf(&prioBSignal); } initSignals(); remoteNodePid = sender((union SIGNAL**)&sig); union SIGNAL * signal = alloc(sizeof(NdbTransporterConnectRef), NDB_TRANSPORTER_CONNECT_CONF); signal->ndbConnectConf.senderNodeId = localNodeId; signal->ndbConnectConf.remoteNodeId = remoteNodeId; union SIGNAL * discon = alloc(sizeof(NdbTransporterDisconnectOrd), NDB_TRANSPORTER_DISCONNECT_ORD); discon->ndbDisconnect.senderNodeId = remoteNodeId; discon->ndbDisconnect.reason = NdbTransporterDisconnectOrd::PROCESS_DIED; DEBUG("Attaching to pid: " << hex << remoteNodePid); remoteNodeRef = attach(&discon, remoteNodePid); DEBUG("Sending connect conf to pid: " << hex << remoteNodePid); ::send(&signal, remoteNodePid); state = CONNECTED; _connected = true; _connecting = false; _disconnecting = false; NdbMutex_Unlock(theMutexPtr); return true;}boolOSE_Transporter::connectRef(struct NdbTransporterConnectRef * sig){ if(isServer){ WARNING("OSE Connect Ref received for server: senderNodeId: " << sig->senderNodeId); return false; } if(state != WAITING_FOR_CONNECT_CONF){ WARNING("OSE Connect Ref received for client while in state: " << state << " senderNodeId: " << sig->senderNodeId); return false; } doDisconnect();#if 0 /** * Don't call connect directly, wait until the next time * checkConnections is called which will trigger a new connect attempt */ doConnect();#endif return true;}boolOSE_Transporter::connectConf(struct NdbTransporterConnectConf * sig){ if(isServer){ WARNING("OSE Connect Conf received for server: senderNodeId: " << sig->senderNodeId); return false; } if(state != WAITING_FOR_CONNECT_CONF){ WARNING("OSE Connect Conf received while in state: " << state); return false; } NdbMutex_Lock(theMutexPtr); // Free the buffers to get rid of any "junk" that they might contain if(prioBSignal != NIL){ restore(prioBSignal); free_buf(&prioBSignal); } initSignals(); union SIGNAL * discon = alloc(sizeof(NdbTransporterDisconnectOrd), NDB_TRANSPORTER_DISCONNECT_ORD); discon->ndbDisconnect.senderNodeId = remoteNodeId; discon->ndbDisconnect.reason= NdbTransporterDisconnectOrd::PROCESS_DIED; remoteNodeRef = attach(&discon, remoteNodePid); state = CONNECTED; _connected = true; _connecting = false; _disconnecting = false; // Free the buffers to get rid of any "junk" that they might contain if(prioBSignal != NIL){ restore(prioBSignal); free_buf(&prioBSignal); } initSignals(); NdbMutex_Unlock(theMutexPtr); return true;}boolOSE_Transporter::disconnectOrd(struct NdbTransporterDisconnectOrd * sig){ if(state != CONNECTED){ WARNING("OSE Disconnect Ord received while in state: " << state << " reason: " << sig->reason); return false; } if(sig->reason == NdbTransporterDisconnectOrd::PROCESS_DIED){ state = DISCONNECTED; } doDisconnect(); reportDisconnect(callbackObj, remoteNodeId,0); return true;}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?