📄 transport.cpp
字号:
#include "precomp.h"
#include "transport.h"
#pragma comment(lib, "wsock32.lib")
///////////////////////////////////////////////////
// Transport
const DWORD TRANSPORT_MAGIC_NUMBER = 0xFFEEEEFF;
const DWORD TRANSPORT_VERSION = 0x000000001;
SimpleThreadPool* Transport::s_pThreadPool;
void Transport::Init(ERR_FCN errFcn) {
WSADATA wd;
int err = WSAStartup(MAKEWORD(1,1), &wd);
if (err)
errFcn(L"WSAStartup", false, err);
s_pThreadPool = new SimpleThreadPool(errFcn, _workerThreadProc);
s_pThreadPool->init();
}
void Transport::Shutdown() {
if (s_pThreadPool) {
s_pThreadPool->close(1000);
delete s_pThreadPool;
}
WSACleanup();
}
Transport::Transport(ERR_FCN errFcn, WM_DISPATCH_FCN wmDispatchFcn)
: m_errFcn(errFcn),
m_wmDispatchFcn(wmDispatchFcn),
m_socket(INVALID_SOCKET)
{}
Transport::~Transport() {
if (INVALID_SOCKET != m_socket)
closesocket(m_socket);
}
bool Transport::sendMessage(MsgProtection msgProtection, DWORD cbMsg, DWORD cbTotal, const void* pMsg) {
_ASSERT(INVALID_SOCKET != m_socket);
SendMessageJob* pJob = new SendMessageJob(msgProtection, cbMsg, cbTotal, pMsg);
return s_pThreadPool->executeJob(this, jtSendMsg, pJob, m_wmDispatchFcn);
}
bool Transport::recvMessage(MsgProtection* pmsgProtection, DWORD* pcbMsg, DWORD* pcbTotal, void** ppMsg) {
_ASSERT(INVALID_SOCKET != m_socket);
RecvMessageJob* pJob = new RecvMessageJob(reinterpret_cast<DWORD*>(pmsgProtection), pcbMsg, pcbTotal, ppMsg);
return s_pThreadPool->executeJob(this, jtRecvMsg, pJob, m_wmDispatchFcn);
}
void Transport::_sendMessage(DWORD msgProtection, DWORD cbMsg, DWORD cbTotal, const void* pMsg) {
_ASSERT(INVALID_SOCKET != m_socket);
MessageHeader h;
h.magic = TRANSPORT_MAGIC_NUMBER;
h.version = TRANSPORT_VERSION;
h.cbMsg = cbMsg;
h.cbTotal = cbTotal;
h.msgProtection = msgProtection;
DWORD cbToSend = cbTotal + sizeof h;
BYTE* p = new BYTE[cbToSend];
BYTE* it = p;
CopyMemory(it, &h, sizeof h);
it += sizeof h;
CopyMemory(it, pMsg, cbTotal);
const DWORD n = send(m_socket, reinterpret_cast<const char*>(p), cbToSend, 0);
if (SOCKET_ERROR == n)
_err(L"send");
_ASSERT(n == cbToSend); // blocking socket should send all
delete [] p;
}
void Transport::_recvMessage(DWORD* pMsgProtection, DWORD* pcbMsg, DWORD* pcbTotal, void** ppMsg) {
_ASSERT(INVALID_SOCKET != m_socket);
MessageHeader h;
{
const DWORD cbReceived = recv(m_socket, reinterpret_cast<char*>(&h), sizeof h, 0);
if (SOCKET_ERROR == cbReceived)
_err(L"recv");
_ASSERT(cbReceived == sizeof h); // blocking socket should recv all
}
BYTE* p = reinterpret_cast<BYTE*>(malloc(h.cbTotal));
{
const DWORD cbReceived = recv(m_socket, reinterpret_cast<char*>(p), h.cbTotal, 0);
if (SOCKET_ERROR == cbReceived)
_err(L"recv");
_ASSERT(cbReceived == h.cbTotal); // blocking socket should recv all
}
*pMsgProtection = h.msgProtection;
*pcbMsg = h.cbMsg;
*pcbTotal = h.cbTotal;
*ppMsg = p;
}
void Transport::_workerThreadProc(void* pCtx, int nType, void* pv) {
Transport* pTransport = reinterpret_cast<Transport*>(pCtx);
pTransport->_handleJob(nType, pv);
}
void Transport::_handleJob(int nType, void* pv) {
JobTypes jt = static_cast<JobTypes>(nType);
switch (jt) {
case jtSendMsg: {
SendMessageJob* pJob = static_cast<SendMessageJob*>(pv);
_sendMessage(pJob->msgProtection, pJob->cbMsg, pJob->cbTotal, pJob->pMsg);
delete pJob;
break;
}
case jtRecvMsg: {
RecvMessageJob* pJob = static_cast<RecvMessageJob*>(pv);
_recvMessage(pJob->pmsgProtection, pJob->pcbMsg, pJob->pcbTotal, pJob->ppMsg);
delete pJob;
break;
}
}
}
///////////////////////////////////////////////////
// ClientTransport
ClientTransport::ClientTransport(ERR_FCN errFcn, WM_DISPATCH_FCN wmDispatchFcn)
: Transport(errFcn, wmDispatchFcn)
{}
ClientTransport::~ClientTransport()
{}
bool ClientTransport::connect(const char* pszHostAddr, WORD port) {
_ASSERT(INVALID_SOCKET == m_socket); // don't connect twice
const DWORD addr = inet_addr(pszHostAddr);
if (INADDR_NONE == addr)
_err(L"inet_addr");
m_socket = socket(AF_INET, SOCK_STREAM, 0);
if (INVALID_SOCKET == m_socket)
_err(L"socket");
sockaddr_in saddr; ZeroMemory(&saddr, sizeof saddr);
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = addr;
saddr.sin_port = htons(port);
int err = ::connect(m_socket, reinterpret_cast<sockaddr*>(&saddr), sizeof saddr);
if (SOCKET_ERROR == err)
_err(L"connect");
return true;
}
///////////////////////////////////////////////////
// ServerTransport
ServerTransport::ServerTransport(WORD port, ERR_FCN errFcn, WM_DISPATCH_FCN wmDispatchFcn)
: Transport(errFcn, wmDispatchFcn),
m_port(port)
{}
ServerTransport::~ServerTransport()
{}
bool ServerTransport::listenAndAcceptSingleConnection() {
_ASSERT(INVALID_SOCKET == m_socket);
return s_pThreadPool->executeJob(this, jtListenAndAcceptSingleConnection, 0, m_wmDispatchFcn);
}
void ServerTransport::_listenAndAcceptSingleConnection() {
SOCKET listener = socket(AF_INET, SOCK_STREAM, 0);
if (INVALID_SOCKET == listener)
_err(L"socket");
sockaddr_in addr; ZeroMemory(&addr, sizeof addr);
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(m_port);
addr.sin_family = AF_INET;
int err = bind(listener, reinterpret_cast<sockaddr*>(&addr), sizeof addr);
if (SOCKET_ERROR == err)
_err(L"bind");
err = listen(listener, SOMAXCONN);
if (SOCKET_ERROR == err)
_err(L"listen");
m_socket = accept(listener, 0, 0);
if (INVALID_SOCKET == m_socket)
_err(L"accept");
}
void ServerTransport::_handleJob(int nType, void* pv) {
JobTypes jt = static_cast<JobTypes>(nType);
switch (jt) {
case jtListenAndAcceptSingleConnection:
_listenAndAcceptSingleConnection();
break;
default:
Transport::_handleJob(nType, pv);
break;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -