⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transport.cpp

📁 sspi_workbench
💻 CPP
字号:
#include "precomp.h"
#include "transport.h"

#pragma comment(lib, "wsock32.lib")

///////////////////////////////////////////////////
// Transport
const DWORD TRANSPORT_MAGIC_NUMBER = 0xFFEEEEFF;
const DWORD TRANSPORT_VERSION      = 0x000000001;

SimpleThreadPool* Transport::s_pThreadPool;

void Transport::Init(ERR_FCN errFcn) {
    WSADATA wd;
    int err = WSAStartup(MAKEWORD(1,1), &wd);
    if (err)
        errFcn(L"WSAStartup", false, err);

    s_pThreadPool = new SimpleThreadPool(errFcn, _workerThreadProc);
    s_pThreadPool->init();
}

void Transport::Shutdown() {
    if (s_pThreadPool) {
        s_pThreadPool->close(1000);
        delete s_pThreadPool;
    }
    WSACleanup();
}

Transport::Transport(ERR_FCN errFcn, WM_DISPATCH_FCN wmDispatchFcn)
: m_errFcn(errFcn),
  m_wmDispatchFcn(wmDispatchFcn),
  m_socket(INVALID_SOCKET)
{}

Transport::~Transport() {
    if (INVALID_SOCKET != m_socket)
        closesocket(m_socket);
}

bool Transport::sendMessage(MsgProtection msgProtection, DWORD cbMsg, DWORD cbTotal, const void* pMsg) {
    _ASSERT(INVALID_SOCKET != m_socket);
    SendMessageJob* pJob = new SendMessageJob(msgProtection, cbMsg, cbTotal, pMsg);
    return s_pThreadPool->executeJob(this, jtSendMsg, pJob, m_wmDispatchFcn);
}

bool Transport::recvMessage(MsgProtection* pmsgProtection, DWORD* pcbMsg, DWORD* pcbTotal, void** ppMsg) {
    _ASSERT(INVALID_SOCKET != m_socket);
    RecvMessageJob* pJob = new RecvMessageJob(reinterpret_cast<DWORD*>(pmsgProtection), pcbMsg, pcbTotal, ppMsg);
    return s_pThreadPool->executeJob(this, jtRecvMsg, pJob, m_wmDispatchFcn);
}

void Transport::_sendMessage(DWORD msgProtection, DWORD cbMsg, DWORD cbTotal, const void* pMsg) {
    _ASSERT(INVALID_SOCKET != m_socket);

    MessageHeader h;
    h.magic         = TRANSPORT_MAGIC_NUMBER;
    h.version       = TRANSPORT_VERSION;
    h.cbMsg         = cbMsg;
    h.cbTotal       = cbTotal;
    h.msgProtection = msgProtection;

    DWORD cbToSend = cbTotal + sizeof h;
    BYTE* p = new BYTE[cbToSend];
    BYTE* it = p;
    CopyMemory(it, &h, sizeof h);
    it += sizeof h;
    CopyMemory(it, pMsg, cbTotal);

    const DWORD n = send(m_socket, reinterpret_cast<const char*>(p), cbToSend, 0);
    if (SOCKET_ERROR == n)
        _err(L"send");
    _ASSERT(n == cbToSend); // blocking socket should send all

    delete [] p;
}

void Transport::_recvMessage(DWORD* pMsgProtection, DWORD* pcbMsg, DWORD* pcbTotal, void** ppMsg) {
    _ASSERT(INVALID_SOCKET != m_socket);

    MessageHeader h;
    {
        const DWORD cbReceived = recv(m_socket, reinterpret_cast<char*>(&h), sizeof h, 0);
        if (SOCKET_ERROR == cbReceived)
            _err(L"recv");
        _ASSERT(cbReceived == sizeof h); // blocking socket should recv all
    }
    BYTE* p = reinterpret_cast<BYTE*>(malloc(h.cbTotal));
    {
        const DWORD cbReceived = recv(m_socket, reinterpret_cast<char*>(p), h.cbTotal, 0);
        if (SOCKET_ERROR == cbReceived)
            _err(L"recv");
        _ASSERT(cbReceived == h.cbTotal); // blocking socket should recv all
    }
    *pMsgProtection = h.msgProtection;
    *pcbMsg         = h.cbMsg;
    *pcbTotal       = h.cbTotal;
    *ppMsg          = p;
}

void Transport::_workerThreadProc(void* pCtx, int nType, void* pv) {
    Transport* pTransport = reinterpret_cast<Transport*>(pCtx);
    pTransport->_handleJob(nType, pv);
}

void Transport::_handleJob(int nType, void* pv) {
    JobTypes jt = static_cast<JobTypes>(nType);
    switch (jt) {
        case jtSendMsg: {
            SendMessageJob* pJob = static_cast<SendMessageJob*>(pv);
            _sendMessage(pJob->msgProtection, pJob->cbMsg, pJob->cbTotal, pJob->pMsg);
            delete pJob;
            break;
        }
        case jtRecvMsg: {
            RecvMessageJob* pJob = static_cast<RecvMessageJob*>(pv);
            _recvMessage(pJob->pmsgProtection, pJob->pcbMsg, pJob->pcbTotal, pJob->ppMsg);
            delete pJob;
            break;
        }
    }
}

///////////////////////////////////////////////////
// ClientTransport
ClientTransport::ClientTransport(ERR_FCN errFcn, WM_DISPATCH_FCN wmDispatchFcn)
  : Transport(errFcn, wmDispatchFcn)
{}

ClientTransport::~ClientTransport()
{}

bool ClientTransport::connect(const char* pszHostAddr, WORD port) {
    _ASSERT(INVALID_SOCKET == m_socket); // don't connect twice

    const DWORD addr = inet_addr(pszHostAddr);
    if (INADDR_NONE == addr)
        _err(L"inet_addr");

    m_socket = socket(AF_INET, SOCK_STREAM, 0);
    if (INVALID_SOCKET == m_socket)
        _err(L"socket");

    sockaddr_in saddr; ZeroMemory(&saddr, sizeof saddr);
    saddr.sin_family      = AF_INET;
    saddr.sin_addr.s_addr = addr;
    saddr.sin_port        = htons(port);
    int err = ::connect(m_socket, reinterpret_cast<sockaddr*>(&saddr), sizeof saddr);
    if (SOCKET_ERROR == err)
        _err(L"connect");
    return true;
}

///////////////////////////////////////////////////
// ServerTransport
ServerTransport::ServerTransport(WORD port, ERR_FCN errFcn, WM_DISPATCH_FCN wmDispatchFcn)
  : Transport(errFcn, wmDispatchFcn),
  m_port(port)
{}

ServerTransport::~ServerTransport()
{}

bool ServerTransport::listenAndAcceptSingleConnection() {
    _ASSERT(INVALID_SOCKET == m_socket);
    return s_pThreadPool->executeJob(this, jtListenAndAcceptSingleConnection, 0, m_wmDispatchFcn);
}

void ServerTransport::_listenAndAcceptSingleConnection() {
    SOCKET listener = socket(AF_INET, SOCK_STREAM, 0);
    if (INVALID_SOCKET == listener)
        _err(L"socket");
    sockaddr_in addr; ZeroMemory(&addr, sizeof addr);
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(m_port);
    addr.sin_family = AF_INET;
    int err = bind(listener, reinterpret_cast<sockaddr*>(&addr), sizeof addr);
    if (SOCKET_ERROR == err)
        _err(L"bind");
    err = listen(listener, SOMAXCONN);
    if (SOCKET_ERROR == err)
        _err(L"listen");
    m_socket = accept(listener, 0, 0);
    if (INVALID_SOCKET == m_socket)
        _err(L"accept");
}

void ServerTransport::_handleJob(int nType, void* pv) {
    JobTypes jt = static_cast<JobTypes>(nType);
    switch (jt) {
        case jtListenAndAcceptSingleConnection:
            _listenAndAcceptSingleConnection();
            break;
        default:
            Transport::_handleJob(nType, pv);
            break;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -