📄 physicalsocketserver.cc
字号:
#if defined(_MSC_VER) && _MSC_VER < 1300#pragma warning(disable:4786)#endif#include <cassert>#ifdef POSIXextern "C" {#include <string.h>#include <errno.h>#include <fcntl.h>#include <sys/time.h>#include <unistd.h>}#endif#ifdef WIN32#include <winsock2.h>#include <ws2tcpip.h>#define _WINSOCKAPI_#include <windows.h>#undef SetPort#endif#include <algorithm>#include <iostream>#include "basictypes.h"#include "byteorder.h"#include "common.h"#include "logging.h"#include "physicalsocketserver.h"#include "time.h"#include "winping.h"#ifdef __linux #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h#endif // __linux#ifdef WIN32class WinsockInitializer {public: WinsockInitializer() { WSADATA wsaData; WORD wVersionRequested = MAKEWORD(1, 0); err_ = WSAStartup(wVersionRequested, &wsaData); } ~WinsockInitializer() { WSACleanup(); } int error() { return err_; }private: int err_;};WinsockInitializer g_winsockinit;#endifnamespace utils_base {const int kfRead = 0x0001;const int kfWrite = 0x0002;const int kfConnect = 0x0004;const int kfClose = 0x0008;// Standard MTUsconst uint16 PACKET_MAXIMUMS[] = { 65535, // Theoretical maximum, Hyperchannel 32000, // Nothing 17914, // 16Mb IBM Token Ring 8166, // IEEE 802.4 //4464, // IEEE 802.5 (4Mb max) 4352, // FDDI //2048, // Wideband Network 2002, // IEEE 802.5 (4Mb recommended) //1536, // Expermental Ethernet Networks //1500, // Ethernet, Point-to-Point (default) 1492, // IEEE 802.3 1006, // SLIP, ARPANET //576, // X.25 Networks //544, // DEC IP Portal //512, // NETBIOS 508, // IEEE 802/Source-Rt Bridge, ARCNET 296, // Point-to-Point (low delay) 68, // Official minimum 0, // End of list marker};const uint32 IP_HEADER_SIZE = 20;const uint32 ICMP_HEADER_SIZE = 8;class PhysicalSocket : public AsyncSocket {public: PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET) : ss_(ss), s_(s), enabled_events_(0), error_(0), state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED) { if (s != INVALID_SOCKET) enabled_events_ = kfRead | kfWrite; } virtual ~PhysicalSocket() { Close(); } // Creates the underlying OS socket (same as the "socket" function). virtual bool Create(int type) { Close(); s_ = ::socket(AF_INET, type, 0); UpdateLastError(); if (type != SOCK_STREAM) enabled_events_ = kfRead | kfWrite; return s_ != INVALID_SOCKET; } SocketAddress GetLocalAddress() const { sockaddr_in addr; socklen_t addrlen = sizeof(addr); int result = ::getsockname(s_, (sockaddr*)&addr, &addrlen); ASSERT(addrlen == sizeof(addr)); utils_base::SocketAddress address; if (result >= 0) { address.FromSockAddr(addr); } else { ASSERT(result >= 0); } return address; } SocketAddress GetRemoteAddress() const { sockaddr_in addr; socklen_t addrlen = sizeof(addr); int result = ::getpeername(s_, (sockaddr*)&addr, &addrlen); ASSERT(addrlen == sizeof(addr)); utils_base::SocketAddress address; if (result >= 0) { address.FromSockAddr(addr); } else { ASSERT(errno == ENOTCONN); } return address; } int Bind(const SocketAddress& addr) { sockaddr_in saddr; addr.ToSockAddr(&saddr); int err = ::bind(s_, (sockaddr*)&saddr, sizeof(saddr)); UpdateLastError(); return err; } int Connect(const SocketAddress& addr) { // TODO: Implicit creation is required to reconnect... // ...but should we make it more explicit? if ((s_ == INVALID_SOCKET) && !Create(SOCK_STREAM)) return SOCKET_ERROR; SocketAddress addr2(addr); if (addr2.IsUnresolved()) { LOG(INFO) << "Resolving addr in PhysicalSocket::Connect"; addr2.Resolve(); // TODO: Do this async later? } sockaddr_in saddr; addr2.ToSockAddr(&saddr); int err = ::connect(s_, (sockaddr*)&saddr, sizeof(saddr)); UpdateLastError(); //LOG(INFO) << "SOCK[" << static_cast<int>(s_) << "] Connect(" << addr2.ToString() << ") Ret: " << err << " Error: " << error_; if (err == 0) { state_ = CS_CONNECTED; } else if (IsBlockingError(error_)) { state_ = CS_CONNECTING; enabled_events_ |= kfConnect; } enabled_events_ |= kfRead | kfWrite; return err; } int GetError() const { return error_; } void SetError(int error) { error_ = error; } ConnState GetState() const { return state_; } int SetOption(Option opt, int value) { assert(opt == OPT_DONTFRAGMENT);#ifdef WIN32 value = (value == 0) ? 0 : 1; return ::setsockopt( s_, IPPROTO_IP, IP_DONTFRAGMENT, reinterpret_cast<char*>(&value), sizeof(value));#endif#ifdef __linux value = (value == 0) ? IP_PMTUDISC_DONT : IP_PMTUDISC_DO; return ::setsockopt( s_, IPPROTO_IP, IP_MTU_DISCOVER, &value, sizeof(value));#endif#ifdef OSX // This is not possible on OSX. return -1;#endif } int Send(const void *pv, size_t cb) { int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb, 0); UpdateLastError(); //LOG(INFO) << "SOCK[" << static_cast<int>(s_) << "] Send(" << cb << ") Ret: " << sent << " Error: " << error_; ASSERT(sent <= static_cast<int>(cb)); // We have seen minidumps where this may be false if ((sent < 0) && IsBlockingError(error_)) { enabled_events_ |= kfWrite; } return sent; } int SendTo(const void *pv, size_t cb, const SocketAddress& addr) { sockaddr_in saddr; addr.ToSockAddr(&saddr); int sent = ::sendto( s_, (const char *)pv, (int)cb, 0, (sockaddr*)&saddr, sizeof(saddr)); UpdateLastError(); ASSERT(sent <= static_cast<int>(cb)); // We have seen minidumps where this may be false if ((sent < 0) && IsBlockingError(error_)) { enabled_events_ |= kfWrite; } return sent; } int Recv(void *pv, size_t cb) { int received = ::recv(s_, (char *)pv, (int)cb, 0); if ((received == 0) && (cb != 0)) { // Note: on graceful shutdown, recv can return 0. In this case, we // pretend it is blocking, and then signal close, so that simplifying // assumptions can be made about Recv. error_ = EWOULDBLOCK; return SOCKET_ERROR; } UpdateLastError(); if ((received >= 0) || IsBlockingError(error_)) { enabled_events_ |= kfRead; } return received; } int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) { sockaddr_in saddr; socklen_t cbAddr = sizeof(saddr); int received = ::recvfrom(s_, (char *)pv, (int)cb, 0, (sockaddr*)&saddr, &cbAddr); UpdateLastError(); if ((received >= 0) && (paddr != NULL)) paddr->FromSockAddr(saddr); if ((received >= 0) || IsBlockingError(error_)) { enabled_events_ |= kfRead; } return received; } int Listen(int backlog) { int err = ::listen(s_, backlog); UpdateLastError(); if (err == 0) state_ = CS_CONNECTING; enabled_events_ |= kfRead; return err; } Socket* Accept(SocketAddress *paddr) { sockaddr_in saddr; socklen_t cbAddr = sizeof(saddr); SOCKET s = ::accept(s_, (sockaddr*)&saddr, &cbAddr); UpdateLastError(); if (s == INVALID_SOCKET) return NULL; if (paddr != NULL) paddr->FromSockAddr(saddr); enabled_events_ |= kfRead | kfWrite; return ss_->WrapSocket(s); } int Close() { if (s_ == INVALID_SOCKET) return 0; int err = ::closesocket(s_); UpdateLastError(); //LOG(INFO) << "SOCK[" << static_cast<int>(s_) << "] Close() Ret: " << err << " Error: " << error_; s_ = INVALID_SOCKET; state_ = CS_CLOSED; enabled_events_ = 0; return err; } int EstimateMTU(uint16* mtu) { SocketAddress addr = GetRemoteAddress(); if (addr.IsAny()) { error_ = ENOTCONN; return -1; }#ifdef WIN32 WinPing ping; if (!ping.IsValid()) { error_ = EINVAL; // can't think of a better error ID return -1; } for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) { int32 size = PACKET_MAXIMUMS[level] - IP_HEADER_SIZE - ICMP_HEADER_SIZE; WinPing::PingResult result = ping.Ping(addr.ip(), size, 0, 1, false); if (result == WinPing::PING_FAIL) { error_ = EINVAL; // can't think of a better error ID return -1; } if (result != WinPing::PING_TOO_LARGE) { *mtu = PACKET_MAXIMUMS[level]; return 0; } } assert(false); return 0;#endif // WIN32#ifdef __linux int value; socklen_t vlen = sizeof(value); int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen); if (err < 0) { UpdateLastError(); return err; } assert((0 <= value) && (value <= 65536)); *mtu = uint16(value); return 0;#endif // __linux // TODO: OSX support } SocketServer* socketserver() { return ss_; } protected: PhysicalSocketServer* ss_; SOCKET s_; uint32 enabled_events_; int error_; ConnState state_; void UpdateLastError() {#ifdef WIN32 error_ = WSAGetLastError();#endif#ifdef POSIX error_ = errno;#endif }};#ifdef POSIXclass Dispatcher {public: virtual uint32 GetRequestedEvents() = 0; virtual void OnPreEvent(uint32 ff) = 0; virtual void OnEvent(uint32 ff, int err) = 0; virtual int GetDescriptor() = 0;};class EventDispatcher : public Dispatcher {public: EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { if (pipe(afd_) < 0) LOG(LERROR) << "pipe failed"; ss_->Add(this); } virtual ~EventDispatcher() { ss_->Remove(this); close(afd_[0]); close(afd_[1]); } virtual void Signal() { CritScope cs(&crit_); if (!fSignaled_) { uint8 b = 0; if (write(afd_[1], &b, sizeof(b)) < 0) LOG(LERROR) << "write failed"; fSignaled_ = true; } } virtual uint32 GetRequestedEvents() { return kfRead; } virtual void OnPreEvent(uint32 ff) { // It is not possible to perfectly emulate an auto-resetting event with // pipes. This simulates it by resetting before the event is handled. CritScope cs(&crit_); if (fSignaled_) { uint8 b; read(afd_[0], &b, sizeof(b)); fSignaled_ = false; } } virtual void OnEvent(uint32 ff, int err) { assert(false); } virtual int GetDescriptor() { return afd_[0]; }private: PhysicalSocketServer *ss_; int afd_[2]; bool fSignaled_; CriticalSection crit_;};class SocketDispatcher : public Dispatcher, public PhysicalSocket {public: SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) { ss_->Add(this); } SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) { ss_->Add(this); } virtual ~SocketDispatcher() { Close(); } bool Initialize() { ss_->Add(this); fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); return true; } virtual bool Create(int type) { // Change the socket to be non-blocking. if (!PhysicalSocket::Create(type)) return false; return Initialize(); } virtual int GetDescriptor() { return s_; } virtual uint32 GetRequestedEvents() { return enabled_events_; } virtual void OnPreEvent(uint32 ff) { if ((ff & kfConnect) != 0) state_ = CS_CONNECTED; } virtual void OnEvent(uint32 ff, int err) { if ((ff & kfRead) != 0) { enabled_events_ &= ~kfRead; SignalReadEvent(this); } if ((ff & kfWrite) != 0) { enabled_events_ &= ~kfWrite; SignalWriteEvent(this); } if ((ff & kfConnect) != 0) { enabled_events_ &= ~kfConnect; SignalConnectEvent(this); } if ((ff & kfClose) != 0) SignalCloseEvent(this, err); } virtual int Close() { if (s_ == INVALID_SOCKET) return 0; ss_->Remove(this); return PhysicalSocket::Close(); } };class FileDispatcher: public Dispatcher, public AsyncFile {public: FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { set_readable(true); ss_->Add(this); fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); } virtual ~FileDispatcher() { ss_->Remove(this); } SocketServer* socketserver() { return ss_; } virtual int GetDescriptor() { return fd_; } virtual uint32 GetRequestedEvents() { return flags_; } virtual void OnPreEvent(uint32 ff) { } virtual void OnEvent(uint32 ff, int err) { if ((ff & kfRead) != 0) SignalReadEvent(this); if ((ff & kfWrite) != 0) SignalWriteEvent(this); if ((ff & kfClose) != 0) SignalCloseEvent(this, err); } virtual bool readable() { return (flags_ & kfRead) != 0; } virtual void set_readable(bool value) { flags_ = value ? (flags_ | kfRead) : (flags_ & ~kfRead); } virtual bool writable() { return (flags_ & kfWrite) != 0; } virtual void set_writable(bool value) { flags_ = value ? (flags_ | kfWrite) : (flags_ & ~kfWrite); }private: PhysicalSocketServer* ss_;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -