📄 warsocketio.cpp
字号:
#include "StdAfx.h"#include "WarSocketIo.h" // class implemented#ifndef WAR_ASSERT_H_INCLUDED# define WAR_ASSERT_H_INCLUDED# include <assert.h>#endif#ifndef WAR_VECTOR_INCLUDED# define WAR_VECTOR_INCLUDED# include <vector>#endif#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endif#ifndef WAR_SOCKET_H# include "WarSocket.h"#endif#ifndef WAR_STDLIB_H_INCLUDED# define WAR_STDLIB_H_INCLUDED# include <stdlib.h>#endif #ifndef WAR_STIO_H_INCLUDED# define WAR_STIO_H_INCLUDED# include <stdio.h>#endif#ifndef WAR_TIME_H_INCLUDED# define WAR_TIME_H_INCLUDED# include <time.h>#endif#ifndef WAR_LOG_H# include "WarLog.h"#endif#ifndef WAR_PERFMON_DEF_H# include "WarPerfmonDef.h"#endif#if defined(HAVE_UNISTD_H) && !defined(WAR_UNISTD_H_INCLUDED)# define WAR_UNISTD_H_INCLUDED# include <unistd.h>#endif#if defined(HAVE_FCNTL_H) && !defined(WAR_FCNTL_H_INCLUDED)# define WAR_FCNTL_H_INCLUDED# include <fcntl.h>#endif#define AUTO_LOCK WarAutoLock MyLock(mLock);using namespace std;/////////////////////////////// PUBLIC /////////////////////////////////////////============================= LIFECYCLE ====================================WarSocketIo::WarSocketIo() : WarPluginSupport<WarSocketIo>("WarSocketIo"),mpCompanion(NULL){ WAR_DB_PERFMON_INC(WAR_PRFDEBUG_NUM_SOCKET_IO); Initialize();}// WarSocketIoWarSocketIo::WarSocketIo(WarSocketIo& from)throw(WarException): WarPluginSupport<WarSocketIo>("WarSocketIo"),mpCompanion(NULL){ WAR_DB_PERFMON_INC(WAR_PRFDEBUG_NUM_SOCKET_IO); Initialize(); operator = (from);}// WarSocketIoWarSocketIo::~WarSocketIo(){ if (mpCompanion) { mpCompanion = NULL; } try { if (IsOpen()) Close(); } catch(WarException) { } WarSocketEngine::UnregisterSocket(this); WAR_DB_PERFMON_DEC(WAR_PRFDEBUG_NUM_SOCKET_IO);}// ~WarSocketIo//============================= OPERATORS ====================================WarSocketIo& WarSocketIo::operator = (WarSocketIo& from) throw(WarException){ AUTO_LOCK if (IsOpen()) Close(); mpCompanion = from.mpCompanion; if (from.IsOpen()) mSocket = Dup(from.mSocket); SetCurrentState(from.mCurrentState); mIsAsync = from.mIsAsync; return *this;}//============================= OPERATIONS ===================================void WarSocketIo::SetReuseAddr() throw(WarException){ int bool_val = 1; if (setsockopt(mSocket, SOL_SOCKET, SO_REUSEADDR, (war_ccstr_t)&bool_val, sizeof(bool_val))) { WarThrowSyserr("setsockopt(SO_REUSEADDR)"); }}void WarSocketIo::Connect(WarSocketTypesE commType, const WarNetAddress& rremoteHost, const WarNetAddress *plocalAddress) throw(WarException){ mIdleTimeSince.Reset(); if (IsOpen()) Close(); DoCreateSocket(rremoteHost.GetFamily(), commType, 0); SetCurrentState(WAR_SCKSTATE_CREATED); if (plocalAddress) { if (plocalAddress->GetPort() != 0) SetReuseAddr(); if ((bind(mSocket, &plocalAddress->GetSockAddr(), plocalAddress->GetSockAddrLength()) != 0)) WarThrowSyserr("bind()"); } SetAsyncMode(mIsAsync, true); try { DoConnect(&rremoteHost.GetSockAddr(), rremoteHost.GetSockAddrLength()); } catch(WarException& ex) { if (ex.SystemError() == EWOULDBLOCK) return; throw ex; } SetCurrentState(WAR_SCKSTATE_CONNECTED);}void WarSocketIo::Listen(WarNetAddress& rLocalAddress, int backlog, std::pair<war_port_t,war_port_t> *pRange) throw(WarException){ mIdleTimeSince.Reset(); if (IsOpen()) Close(); DoCreateSocket(rLocalAddress.GetFamily(), SOCK_STREAM, 0); SetCurrentState(WAR_SCKSTATE_CREATED); if (pRange) { if (pRange->second < pRange->first) { WarLog err_log(WARLOG_ERROR, "WarSocketIo::Listen()", this); err_log << "Invalid range. " << pRange->first << " - " << pRange->second << war_endl; WarThrow(WarError(WAR_ERR_INVALID_ARGUMENT), NULL); } const size_t num_ports = pRange->second - pRange->first; vector<char> ports_tried(num_ports); WarNetAddress my_address = rLocalAddress; srand((unsigned)time(NULL)); size_t i; for(i = 0; i < num_ports; i++) { war_port_t try_port = 0; do { try_port = (war_port_t) ((1.0 * (num_ports -1) * rand()) / (RAND_MAX * 1.0)); assert(try_port < num_ports); } while(ports_tried[try_port]); ports_tried[try_port] = 1; my_address.SetPort(try_port + pRange->first); if (bind(mSocket, &my_address.GetSockAddr(), rLocalAddress.GetSockAddrLength()) == 0) { rLocalAddress = my_address; break; // Success; } } if (i == num_ports) WarThrow(WarError(WAR_NETERR_NO_AVAIL_PORTS_IN_RANGE), NULL); } else { if (bind(mSocket, &rLocalAddress.GetSockAddr(), rLocalAddress.GetSockAddrLength()) != 0) WarThrowSyserr("bind()"); } SetAsyncMode(mIsAsync, true); DoListen(backlog); SetCurrentState(WAR_SCKSTATE_LISTENING);}void WarSocketIo::Close(){ mIdleTimeSince.Reset(); WarLog sck_log(WARLOG_SOCKET, "WarSocketIo::Close()", this); if (IsOpen()) DoClose(); mSocket = WAR_INVALID_SOCKET; SetCurrentState(WAR_SCKSTATE_CLOSED); mClearToSend = false; mClearToRecive = false; if (sck_log) { sck_log << "Closing socket " << GetSeqNumber() << ". " << GetBytesSent() << " bytes sent and " << GetBytesReceived() << " bytes received in " << (int)(GetConnectedTime() / 1000) << " seconds" << war_endl; }}void WarSocketIo::SetAsyncMode(bool makeAsync, bool doForce) throw(WarException){ if (!doForce && (makeAsync == mIsAsync)) return; mIsAsync = makeAsync; if (IsOpen()) {#if HAVE_WINSOCK unsigned long sck_mode = makeAsync; if (::ioctlsocket (mSocket, FIONBIO, &sck_mode)) WarThrowSyserr("ioctlsocket(FIONBIO)");#else int flags = 0; if ((flags = ::fcntl(mSocket, F_GETFL, 0)) != 0) WarThrowSyserr("fcntl(F_GETFL)"); if (makeAsync) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (fcntl(mSocket, F_SETFL, flags) != 0) WarThrowSyserr("fcntl(F_SETFL)");#endif }}int WarSocketIo::Send(war_ccstr_t buf, size_t len, int flags) throw(WarException){ AUTO_LOCK mIdleTimeSince.Reset(); int bytes = DoSend(buf, len, flags); if (bytes < 0) WarThrowSyserr("recv"); mBytesSent += bytes; return bytes;}void WarSocketIo::SendWithCallback(war_transfer_buffer_ptr_t& outBuffer, size_t numSegments) throw(WarException){ WarThrow(WarError(WAR_ERR_NOT_IMPLEMENTED), NULL);}int WarSocketIo::Recv(war_cstr_t buf, size_t len, int flags) throw(WarException){ AUTO_LOCK mIdleTimeSince.Reset(); int bytes = DoRecv(buf, len, flags); if (bytes < 0) WarThrowSyserr("recv"); mBytesReceived += bytes; return bytes;}void WarSocketIo::RecvWithCallback(war_transfer_buffer_ptr_t& inBuffer, size_t numSegments) throw(WarException){ WarThrow(WarError(WAR_ERR_NOT_IMPLEMENTED), NULL);}war_socket_t WarSocketIo::Dup(const war_socket_t from) throw(WarException){ if (from == WAR_INVALID_SOCKET) WarThrow(WarError(WAR_ERR_NOT_OPEN), NULL); war_socket_t duplicated_socket = WAR_INVALID_SOCKET;#ifdef WIN32 if (!::DuplicateHandle( ::GetCurrentProcess(), (const HANDLE)from, ::GetCurrentProcess(), (PHANDLE)&duplicated_socket, 0, true, DUPLICATE_SAME_ACCESS)) WarThrowSyserr(NULL);#else if ((duplicated_socket = dup((int)from)) == -1) WarThrowSyserr(NULL);#endif return duplicated_socket;}//============================= ACCESS ===================================void WarSocketIo::AssignSocket(war_socket_t sck) throw(WarException){ if (mSocket != WAR_INVALID_SOCKET) WarThrow(WarError(WAR_ERR_ALREADY_INITIALIZED), NULL); mSocket = sck; SetCurrentState(WAR_SCKSTATE_UNKNOWN);}//============================= INQUIRY ===================================bool WarSocketIo::IsOpen() const{ return mSocket != WAR_INVALID_SOCKET;}bool WarSocketIo::IsClearToSend() const{ return !mIsAsync || mClearToSend;}bool WarSocketIo::IsClearToReceive() const{ return !mIsAsync || mClearToRecive;}WarSocketStatesE WarSocketIo::GetCurrentState() const{ return mCurrentState;}bool WarSocketIo::IsAsync() const{ return mIsAsync;}war_uint64_t WarSocketIo::GetSeqNumber() const{ return mSeqNumber;}war_uint64_t WarSocketIo::GetBytesSent() const{ return mBytesSent;}war_uint64_t WarSocketIo::GetBytesReceived() const{ return mBytesReceived;}WarTime WarSocketIo::GetConnectTime() const{ return mConnectTime;}WarTime WarSocketIo::GetCloseTime() const{ return mCloseTime;}WarTime WarSocketIo::GetIdleTimeSince() const{ return mIdleTimeSince;}war_time_t WarSocketIo::GetConnectedTime() const{ return GetCloseTime().mTime - GetConnectTime().mTime;}const WarNetAddress& WarSocketIo::GetLocalAddress() const{ if (mLocalAddress.IsEmpty()) { struct sockaddr my_sockaddr; war_socklen_t my_sockaddr_len = sizeof(my_sockaddr); if (getsockname(mSocket, &my_sockaddr, &my_sockaddr_len) != 0) WarThrow(WarSystemError(), "getsockname()"); ((WarNetAddress&)(mLocalAddress)).AssignSockaddr(&my_sockaddr, my_sockaddr_len); } return mLocalAddress;}const WarNetAddress& WarSocketIo::GetRemoteAddress() const{ if (mRemoteAddress.IsEmpty()) { struct sockaddr my_sockaddr; int my_sockaddr_len = sizeof(my_sockaddr); if (getpeername(mSocket, &my_sockaddr, &my_sockaddr_len) != 0) WarThrow(WarSystemError(), "getpeername()"); ((WarNetAddress&)mRemoteAddress).AssignSockaddr(&my_sockaddr, my_sockaddr_len); } return mRemoteAddress;}//============================= CALLBACKS ===================================void WarSocketIo::OnConnect(const WarError& status){ SetCurrentState(WAR_SCKSTATE_CONNECTED); mIdleTimeSince.Reset(); if (mpCompanion) mpCompanion->PreOnConnect(status);}void WarSocketIo::OnAccept(const WarError& status, war_socket_t newSocket, const WarNetAddress& remoteAddress, const WarNetAddress& localAddress){ mIdleTimeSince.Reset(); if (mpCompanion) mpCompanion->PreOnAccept(status, newSocket, remoteAddress, localAddress);}void WarSocketIo::OnReceived(const WarError& status, war_transfer_buffer_ptr_t& buffer){ WarError my_status = status; mClearToRecive = (status == WAR_ERR_OK); mIdleTimeSince.Reset(); war_uint32_t bytes = 0; if (status == false) { mBytesReceived += (bytes= buffer->mBytesUsed); if (0 == bytes) my_status = WAR_FERR_END_OF_FILE; } WAR_RUN_PLUGINS(WarSocketIo, OnReceived, (status, buffer)); if (mpCompanion) { mpCompanion->PreOnReceived(my_status, buffer); }}void WarSocketIo::OnSent(const WarError& status, war_transfer_buffer_ptr_t& buffer){ mClearToSend = (status == WAR_ERR_OK); mIdleTimeSince.Reset(); if (status == false) mBytesSent += buffer->mBytesUsed; WAR_RUN_PLUGINS(WarSocketIo, OnSent, (status, buffer)); if (mpCompanion) { mpCompanion->PreOnSent(status, buffer); }}void WarSocketIo::OnClose(const WarError& status){ mIdleTimeSince.Reset(); mClearToRecive = false; mClearToSend = false; if (mpCompanion) mpCompanion->PreOnClose(status);}/////////////////////////////// PROTECTED ///////////////////////////////////void WarSocketIo::SetCurrentState(WarSocketStatesE newState){ if (newState == WAR_SCKSTATE_CONNECTED) { mConnectTime.Reset(); mCloseTime.Clear(); mBytesSent = 0; mBytesReceived = 0; } if (newState == WAR_SCKSTATE_CLOSED) mCloseTime.Reset(); mCurrentState = newState; if (mpCompanion) mpCompanion->OnStateChanged();}void WarSocketIo::DoListen(int backlog) throw(WarException){ if (listen(mSocket, backlog) == WAR_INVALID_SOCKET) WarThrowSyserr("listen()");}int WarSocketIo::DoRecv(char *buf, int len, int flags) throw(WarException){ return recv(mSocket, buf, len, flags);}int WarSocketIo::DoSend(const char *buf, int len, int flags) throw(WarException){ return send(mSocket, buf, len, flags);}void WarSocketIo::DoConnect(const struct sockaddr *name, int namelen) throw(WarException){ if (connect(mSocket, name, namelen) != 0) WarThrowSyserr("connect()");}int WarSocketIo::DoClose() throw(WarException){ war_socket_t sck = mSocket; mSocket = WAR_INVALID_SOCKET;#if HAVE_WINSOCK return closesocket(sck);#else return close(sck);#endif}void WarSocketIo::DoCreateSocket(int af, int type, int protocol) throw(WarException){ assert(mSocket == WAR_INVALID_SOCKET); mSocket = socket(af, type, protocol); if (mSocket == WAR_INVALID_SOCKET) WarThrow(WarSystemError(), "socket()");}/////////////////////////////// PRIVATE ///////////////////////////////////void WarSocketIo::Initialize(){ WarSocketEngine::RegisterSocket(this); mpCompanion = NULL; mSocket = WAR_INVALID_SOCKET; mIsAsync = false; mCurrentState = WAR_SCKSTATE_VOID; mClearToSend = false; mClearToRecive = false; mBytesSent = 0; mBytesReceived = 0; mConnectTime.Clear(); mCloseTime.Clear(); WarCollector<char> identifier; identifier << "SocketIo(" << GetSeqNumber() << ")"; AddLogIdentifierTag(identifier.GetValue().c_str()); WarLog sck_log(WARLOG_SOCKET, "WarSocketIo::Initialize()", this); if (sck_log) { sck_log << "Socket " << GetSeqNumber() << " created." << war_endl; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -