📄 tcp_transporter.hpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#ifndef TCP_TRANSPORTER_HPP#define TCP_TRANSPORTER_HPP#include "Transporter.hpp"#include "SendBuffer.hpp"#include <NdbTCP.h>struct ReceiveBuffer { Uint32 *startOfBuffer; // Pointer to start of the receive buffer Uint32 *readPtr; // Pointer to start reading data char *insertPtr; // Pointer to first position in the receiveBuffer // in which to insert received data. Earlier // received incomplete messages (slack) are // copied into the first part of the receiveBuffer Uint32 sizeOfData; // In bytes Uint32 sizeOfBuffer; bool init(int bytes); void destroy(); void clear(); void incompleteMessage();};class TCP_Transporter : public Transporter { friend class TransporterRegistry;private: // Initialize member variables TCP_Transporter(TransporterRegistry&, int sendBufferSize, int maxReceiveSize, const char *lHostName, const char *rHostName, int r_port, bool isMgmConnection, NodeId lHostId, NodeId rHostId, NodeId serverNodeId, bool checksum, bool signalId, Uint32 reportFreq = 4096); // Disconnect, delete send buffers and receive buffer virtual ~TCP_Transporter(); /** * Allocate buffers for sending and receiving */ bool initTransporter(); Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio); void updateWritePtr(Uint32 lenBytes, Uint32 prio); bool hasDataToSend() const ; /** * Retrieves the contents of the send buffers and writes it on * the external TCP/IP interface until the send buffers are empty * and as long as write is possible. */ bool doSend(); /** * It reads the external TCP/IP interface once * and puts the data in the receiveBuffer */ int doReceive(); /** * Returns socket (used for select) */ NDB_SOCKET_TYPE getSocket() const; /** * Get Receive Data * * Returns - no of bytes to read * and set ptr */ virtual Uint32 getReceiveData(Uint32 ** ptr); /** * Update receive data ptr */ virtual void updateReceiveDataPtr(Uint32 bytesRead); virtual Uint32 get_free_buffer() const;protected: /** * Setup client/server and perform connect/accept * Is used both by clients and servers * A client connects to the remote server * A server accepts any new connections */ virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd); virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd); bool connect_common(NDB_SOCKET_TYPE sockfd); /** * Disconnects a TCP/IP node. Empty send and receivebuffer. */ virtual void disconnectImpl(); private: /** * Send buffers */ SendBuffer m_sendBuffer; // Sending/Receiving socket used by both client and server NDB_SOCKET_TYPE theSocket; Uint32 maxReceiveSize; /** * Socket options */ int sockOptRcvBufSize; int sockOptSndBufSize; int sockOptNodelay; int sockOptTcpMaxSeg; void setSocketOptions(); static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket); bool sendIsPossible(struct timeval * timeout); /** * Statistics */ Uint32 reportFreq; Uint32 receiveCount; Uint64 receiveSize; Uint32 sendCount; Uint64 sendSize; ReceiveBuffer receiveBuffer;#if defined NDB_OSE || defined NDB_SOFTOSE PROCESS theReceiverPid;#endif};inlineNDB_SOCKET_TYPETCP_Transporter::getSocket() const { return theSocket;}inlineUint32TCP_Transporter::getReceiveData(Uint32 ** ptr){ (* ptr) = receiveBuffer.readPtr; return receiveBuffer.sizeOfData;}inlinevoidTCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){ char * ptr = (char *)receiveBuffer.readPtr; ptr += bytesRead; receiveBuffer.readPtr = (Uint32*)ptr; receiveBuffer.sizeOfData -= bytesRead; receiveBuffer.incompleteMessage();}inlineboolTCP_Transporter::hasDataToSend() const { return m_sendBuffer.dataSize > 0;}inlineboolReceiveBuffer::init(int bytes){#ifdef DEBUG_TRANSPORTER ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;#endif startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1]; sizeOfBuffer = bytes + sizeof(Uint32); clear(); return true;}inlinevoidReceiveBuffer::destroy(){ delete[] startOfBuffer; sizeOfBuffer = 0; startOfBuffer = 0; clear();}inlinevoidReceiveBuffer::clear(){ readPtr = startOfBuffer; insertPtr = (char *)startOfBuffer; sizeOfData = 0;}inlinevoidReceiveBuffer::incompleteMessage() { if(startOfBuffer != readPtr){ if(sizeOfData != 0) memmove(startOfBuffer, readPtr, sizeOfData); readPtr = startOfBuffer; insertPtr = ((char *)startOfBuffer) + sizeOfData; }}#endif // Define of TCP_Transporter_H
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -