⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpnet.cpp

📁 用UDP写的可靠传输程序源代码,非常有借鉴意义,适合互连网通讯
💻 CPP
字号:
// CUDPReceivedData - Buffer for storing received data and information about it
// CUDPErrorInfo - Buffer for storing error information
// CUDPNet - Class, which implements UDP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////

#include "stdafx.h"

#include "NetBasic.h"
#include "UDPNet.h"

// Maximum datagram length
#define MAX_DGLEN 65467

char* CUDPErrorInfo::GetTimeStamp(char* s)
{
    CHAR date[30]="";
    CHAR time[30]="";
    _timeb timebuffer;

    // Get date/time
    _strdate(date);
    _ftime(&timebuffer);
    _strtime(time);

    // Create string
    sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
    return s;
}

CUDPNet::CUDPNet(NetReceiver* receiver,unsigned short port,unsigned short maxthreads)
{
    // Tuning
    m_DefReceiver=receiver;
    m_uPort=port;

    // Initialization
    m_bSuspended=true;
    m_pBuffer=new char[MAX_DGLEN];
    m_Deliveries.clear();
    m_pDeliverTrds.clear();
    m_uMaxDeliverers=maxthreads;
    m_uBusy=0;

    CreateSockets();

    m_bKill=false;

    // Start threads and store handles
    m_pServerTrd=AfxBeginThread(UDPServerFunction,this);
    m_pDelManTrd=AfxBeginThread(UDPDelManFunction,this);
}

CUDPNet::~CUDPNet()
{
    // Terminate threads
    m_bKill=true;
    while ((!m_pDeliverTrds.empty()) || m_pServerTrd || m_pDelManTrd) {
        Sleep(50);
    }

    // Free resources
    closesocket(m_SendSocket);
    closesocket(m_RecvSocket);
    FreeDeliveries();
    delete[] m_pBuffer;
}

bool CUDPNet::CreateSockets()
{
    // Sockets creation
    m_SendSocket=socket(AF_INET, SOCK_DGRAM, 0);
    m_RecvSocket=socket(AF_INET, SOCK_DGRAM, 0);
    if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
        m_Deliveries.push_back(Delivery(new CUDPErrorInfo(0,WSAGetLastError(),IPAddr())));
        m_bSuspended=true;
        return false;
    }
    // Sockets are to support broadcasting
    BOOL broadcast=TRUE;
    setsockopt(m_SendSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
    setsockopt(m_RecvSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));

    // Binding local address with receiving socket
    m_Local.sin_family=AF_INET;
    m_Local.sin_port=htons(m_uPort);
    m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
    if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
    {
        m_Deliveries.push_back(Delivery(new CUDPErrorInfo(1,WSAGetLastError(),IPAddr())));
        m_bSuspended=true;
        return false;
    }

    return true;
}

bool CUDPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 /*options*/, bool /*storeiffail*/)
{
    if (m_bSuspended) return false;

    SOCKADDR_IN recip;
    recip.sin_family=AF_INET;
    recip.sin_port=htons(m_uPort);
    recip.sin_addr.s_addr=to.Solid;

    bool res=(sendto(m_SendSocket,(const char*)sb.GetBufferBegin(),sb.GetBufferSize(),0,(SOCKADDR*)&recip,sizeof(recip))!=SOCKET_ERROR);
    if (sb.GetAutoDel()) delete &sb;

    return res;
}

void CUDPNet::FreeDeliveries()
{
    for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
        if (it->data) {
            if (it->type==DeliveryType::ReceivedData) delete (CUDPReceivedData*)it->data; else
            if (it->type==DeliveryType::ErrorInfo) delete (CUDPErrorInfo*)it->data; else
            delete it->data;
        }
    }
    m_Deliveries.clear();
}


unsigned int UDPServerFunction(void* pNet)
{
    CUDPNet* net=(CUDPNet*)pNet;

    // Necessary variables
    timeval tv;
    tv.tv_sec=0;
    tv.tv_usec=1;
    fd_set fdread;
    SOCKADDR_IN sender;
    int sendersize=NULL;

    for(;;) {
        // Wait while suspended
        while (net->GetSuspended()) {
            // Does killing needed
            if (net->m_bKill) {
                net->m_pServerTrd=NULL;
                AfxEndThread(0,TRUE);
            }

            // Sleep a little bit
            Sleep(50);
        }

        // Check for received data
        FD_ZERO(&fdread);
        FD_SET(net->m_RecvSocket,&fdread);
        if (select(0,&fdread,NULL,NULL,&tv)>0) {
            // Receive data
            sendersize=sizeof(sender);
            int ret=recvfrom(net->m_RecvSocket,net->m_pBuffer,MAX_DGLEN,0,(SOCKADDR*)&sender,&sendersize);
            if (ret==SOCKET_ERROR) {
                // Error while receiving
                net->m_Deliveries.push_back(CUDPNet::Delivery(new CUDPErrorInfo(3,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
            } else {
                net->m_Deliveries.push_back(CUDPNet::Delivery(new CUDPReceivedData(ret,sender.sin_addr.S_un.S_addr,net->m_pBuffer)));
            }
        }

        // Does killing needed
        if (net->m_bKill) {
            net->m_pServerTrd=NULL;
            AfxEndThread(0,TRUE);
        }
    }
}

unsigned int UDPDelManFunction(void* pNet)
{
    CUDPNet* net=(CUDPNet*)pNet;
    CUDPNet::Delivery del;

    // Lock for delivery threads based on critical section
    CSingleLock lock(&net->m_CriticalSection);

    for(;;) {
        lock.Lock();
        // Does additional delivery threads needed
        if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
            net->m_pDeliverTrds.push_back(AfxBeginThread(UDPDeliverFunction,pNet));
        }
        lock.Unlock();

        // Kill server
        if (net->m_bKill) {
            net->m_pDelManTrd=NULL;
            AfxEndThread(net->m_Deliveries.size(),TRUE);
        }

        Sleep(10);
    }
}

unsigned int UDPDeliverFunction(void* pNet)
{
    CUDPNet* net=(CUDPNet*)pNet;
    CUDPNet::Delivery del;
    bool bNothing=true; // Shows if no deliveries were planned
    DWORD lastdel=GetTickCount();
    
    // Lock for delivery threads based on critical section
    CSingleLock lock(&net->m_CriticalSection);

    for(;;) {
        bNothing=true;

        while (lock.Lock(), !net->m_Deliveries.empty()) {
            // Get delivery
            del=net->m_Deliveries.front();
            net->m_Deliveries.pop_front();
            lock.Unlock();

            // Deliver delivery
            switch (del.type) {
            case CUDPNet::DeliveryType::ReceivedData:
                net->m_uBusy++;
                lastdel=GetTickCount();
                net->GetDefaultReceiver()->OnReceive(del.data);
                delete (CUDPReceivedData*)del.data;
                net->m_uBusy--;
            break;
            case CUDPNet::DeliveryType::ErrorInfo:
                net->m_uBusy++;
                lastdel=GetTickCount();
                net->GetDefaultReceiver()->OnError(del.data);
                delete (CUDPErrorInfo*)del.data;
                net->m_uBusy--;
            break;
            default:
                delete del.data;
            }

            // Thread is working now
            bNothing=false;

            // If killing needed then do it after unlocking
            if (net->m_bKill) break;
        }
        lock.Unlock();

        // Does killing needed (because of request or because of sponging)
        if (net->m_bKill || GetTickCount()-lastdel>20000) {
            net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
            AfxEndThread(net->m_Deliveries.size(),TRUE);
        }
        
        // Sleep a little bit
        if (bNothing) {
            Sleep(50);
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -