⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpnet.cpp

📁 用UDP写的可靠传输程序源代码,非常有借鉴意义,适合互连网通讯
💻 CPP
字号:
// CTCPReceivedData - Buffer for storing received data and information about it
// CTCPErrorInfo - Buffer for storing error information
// CTCPNet - Class, which implements TCP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////

#include "stdafx.h"

#include "NetBasic.h"
#include "TCPNet.h"

// Default maximum segment length
#define MAX_SEGLEN 8388608

char* CTCPErrorInfo::GetTimeStamp(char* s)
{
    CHAR date[30]="";
    CHAR time[30]="";
    _timeb timebuffer;

    // Get date/time
    _strdate(date);
    _ftime(&timebuffer);
    _strtime(time);

    // Create string
    sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
    return s;
}

CTCPNet::CTCPNet(NetReceiver* receiver,unsigned short port,unsigned short maxthreads)
{
    // Tuning
    m_DefReceiver=receiver;
    m_uPort=port;

    // Initialization
    m_bSuspended=true;
    m_pBuffer=new char[MAX_SEGLEN];
    m_Deliveries.clear();
    m_pDeliverTrds.clear();
    m_uMaxDeliverers=maxthreads;
    m_uBusy=0;

    CreateSockets();

    m_bKill=false;

    // Start threads and store handles
    m_pServerTrd=AfxBeginThread(TCPServerFunction,this);
    m_pDelManTrd=AfxBeginThread(TCPDelManFunction,this);
}

CTCPNet::~CTCPNet()
{
    // Terminate threads
    m_bKill=true;
    while ((!m_pDeliverTrds.empty()) || m_pServerTrd || m_pDelManTrd) {
        Sleep(50);

        // Followong check is needed, because accept() is a blocking call
        if (m_pServerTrd) {
            TerminateThread(m_pServerTrd,0);
            m_pServerTrd=NULL;
        }
    }

    // Free resources
    closesocket(m_SendSocket);
    closesocket(m_RecvSocket);
    FreeDeliveries();
    delete[] m_pBuffer;
}

bool CTCPNet::CreateSockets()
{
    // Sockets creation
    m_RecvSocket=socket(AF_INET, SOCK_STREAM, 0);
    if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
        m_Deliveries.push_back(Delivery(new CTCPErrorInfo(0,WSAGetLastError(),IPAddr())));
        m_bSuspended=true;
        return false;
    }

    // Binding local address with receiving socket
    m_Local.sin_family=AF_INET;
    m_Local.sin_port=htons(m_uPort);
    m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
    if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
    {
        m_Deliveries.push_back(Delivery(new CTCPErrorInfo(1,WSAGetLastError(),IPAddr())));
        m_bSuspended=true;
        return false;
    }
    // Tuning recieving socket to listen
    if (listen(m_RecvSocket,50)==SOCKET_ERROR)
    {
        m_Deliveries.push_back(Delivery(new CTCPErrorInfo(1,WSAGetLastError(),IPAddr())));
        m_bSuspended=true;
        return false;
    }

    return true;
}

bool CTCPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 /*options*/, bool /*storeiffail*/)
{
    if (m_bSuspended) return false;

    m_SendSocket=socket(AF_INET, SOCK_STREAM, 0);
    
    SOCKADDR_IN recip;
    recip.sin_family=AF_INET;
    recip.sin_port=htons(m_uPort);
    recip.sin_addr.s_addr=to.Solid;

    bool res=(connect(m_SendSocket,(sockaddr*)&recip,sizeof(recip))!=SOCKET_ERROR);
    res&=(send(m_SendSocket,(const char*)sb.GetBufferBegin(),sb.GetBufferSize(),0)!=SOCKET_ERROR);
    if (sb.GetAutoDel()) delete &sb;

    closesocket(m_SendSocket);

    return (res);
}

void CTCPNet::FreeDeliveries()
{
    for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
        if (it->data) {
            if (it->type==DeliveryType::ReceivedData) delete (CTCPReceivedData*)it->data; else
            if (it->type==DeliveryType::ErrorInfo) delete (CTCPErrorInfo*)it->data; else
            delete it->data;
        }
    }
    m_Deliveries.clear();
}


unsigned int TCPServerFunction(void* pNet)
{
    CTCPNet* net=(CTCPNet*)pNet;

    // Necessary variables
    SOCKADDR_IN sender;
    SOCKET client;
    int sendersize=NULL;

    for(;;) {
        // Wait while suspended
        while (net->GetSuspended()) {
            // Does killing needed
            if (net->m_bKill) {
                net->m_pServerTrd=NULL;
                AfxEndThread(0,TRUE);
            }

            // Sleep a little bit
            Sleep(50);
        }

        sendersize=sizeof(sender);
        if ((client=accept(net->m_RecvSocket,(sockaddr*)&sender,&sendersize))!=INVALID_SOCKET) {
            // Receive data            
            int ret=recv(client,net->m_pBuffer,MAX_SEGLEN,0);
            if (ret==SOCKET_ERROR) {
                // Error while receiving
                net->m_Deliveries.push_back(CTCPNet::Delivery(new CTCPErrorInfo(3,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
            } else {
                net->m_Deliveries.push_back(CTCPNet::Delivery(new CTCPReceivedData(ret,sender.sin_addr.S_un.S_addr,net->m_pBuffer)));
            }
        }
        Sleep(1);

        // Does killing needed
        if (net->m_bKill) {
            net->m_pServerTrd=NULL;
            AfxEndThread(0,TRUE);
        }
    }
}

unsigned int TCPDelManFunction(void* pNet)
{
    CTCPNet* net=(CTCPNet*)pNet;
    CTCPNet::Delivery del;

    // Lock for delivery threads based on critical section
    CSingleLock lock(&net->m_CriticalSection);

    for(;;) {
        lock.Lock();
        // Does additional delivery threads needed
        if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
            net->m_pDeliverTrds.push_back(AfxBeginThread(TCPDeliverFunction,pNet));
        }
        lock.Unlock();

        // Kill server
        if (net->m_bKill) {
            net->m_pDelManTrd=NULL;
            AfxEndThread(net->m_Deliveries.size(),TRUE);
        }

        Sleep(10);
    }
}

unsigned int TCPDeliverFunction(void* pNet)
{
    CTCPNet* net=(CTCPNet*)pNet;
    CTCPNet::Delivery del;
    bool bNothing=true; // Shows if no deliveries were planned
    DWORD lastdel=GetTickCount();
    
    // Lock for delivery threads based on critical section
    CSingleLock lock(&net->m_CriticalSection);

    for(;;) {
        bNothing=true;

        while (lock.Lock(), !net->m_Deliveries.empty()) {
            // Get delivery
            del=net->m_Deliveries.front();
            net->m_Deliveries.pop_front();
            lock.Unlock();

            // Deliver delivery
            switch (del.type) {
            case CTCPNet::DeliveryType::ReceivedData:
                net->m_uBusy++;
                lastdel=GetTickCount();
                net->GetDefaultReceiver()->OnReceive(del.data);
                delete (CTCPReceivedData*)del.data;
                net->m_uBusy--;
            break;
            case CTCPNet::DeliveryType::ErrorInfo:
                net->m_uBusy++;
                lastdel=GetTickCount();
                net->GetDefaultReceiver()->OnError(del.data);
                delete (CTCPErrorInfo*)del.data;
                net->m_uBusy--;
            break;
            default:
                delete del.data;
            }

            // Thread is working now
            bNothing=false;

            // If killing needed then do it after unlocking
            if (net->m_bKill) break;
        }
        lock.Unlock();

        // Does killing needed (because of request or because of sponging)
        if (net->m_bKill || GetTickCount()-lastdel>20000) {
            net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
            AfxEndThread(net->m_Deliveries.size(),TRUE);
        }
        
        // Sleep a little bit
        if (bNothing) {
            Sleep(50);
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -