⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ctpnet.cpp

📁 用UDP写的可靠传输,非常有借鉴意义,适合互连网通讯
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// CCTPReceivedData - Buffer for storing received data and information about it
// CCTPErrorInfo - Buffer for storing error information
// CCTPNet - Class, which implements CTP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////

#include "stdafx.h"

#include "NetBasic.h"
#include "DebugLog.h"
#include "CTPNet.h"

// Macrodefinitions for handy log building

// Put message mess to output stream log, protecter by critical section cs
#define LOG(log,cs,mess) \
    if (log) { \
        char ts[22]; \
        CSingleLock lock(&cs,TRUE); \
        (((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
    }

// The same, but  string representation of ip-address ip can be referenced as
// "addr"
#define LOGA(log,cs,mess,ip) \
    if (log) { \
        char ts[22],saddr[16]; \
        ip.GetString(saddr); \
        CSingleLock lock(&cs,TRUE); \
        (((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
    }

// The same, but inserts description of header head between mess1 and mess2.
// Moreover, string representation of ip-address ip can be referenced as "addr"
#define LOGHA(log,cs,mess1,mess2,head,ip) \
    if (log) { \
        char ts[22],saddr[16]; \
        ip.GetString(saddr); \
        CSingleLock lock(&cs,TRUE); \
        ((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess1;\
        head->ToStream(*log); \
        (((ostream&)*(log))<<mess2<<"\n").flush();\
    }

CCTPReceivedData::CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf)
{
    this->command=command;
    this->size=size;
    this->from=IPAddr(from);
    pBuf=new char[(unsigned int)size];
    if (buf) memcpy(pBuf,buf,(unsigned int)size);
}

CCTPErrorInfo::CCTPErrorInfo(unsigned char type,unsigned __int16 command,int code,IPAddr addr)
{
    this->type=type;
    this->command=command;
    this->code=code;
    this->addr=addr;
    GetTimeStamp(timestamp);
}

char* CCTPErrorInfo::GetTimeStamp(char* s)
{
    CHAR date[30]="";
    CHAR time[30]="";
    _timeb timebuffer;

    // Get date/time
    _strdate(date);
    _ftime(&timebuffer);
    _strtime(time);

    // Create string
    sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
    return s;
}

void CCTPNet::Header::ToStream(ostream& out)
{
    if (command&CCTPNet::m_iConfirm) {
        out<<"{confirm: "<<(command^CCTPNet::m_iConfirm);
    } else {
        out<<"{command: "<<command;
    }
    out<<", id: "<<(unsigned int)id<<", size: "<<(unsigned int)size;
    if (amount>1) {
        out<<", num: "<<(unsigned int)number<<"("<<(unsigned int)amount<<")";
    }
    if (options) {
        out<<", opt: "<<options;
    }
    out<<"}";
}

bool CCTPNet::SntCommandInfo::Confirm(unsigned int i)
{
    CI[i].bConfirmed=true;
    for (unsigned int j=0; j<uCount; j++) {
        if (!CI[j].bConfirmed) return false;
    }

    return true;
}

bool CCTPNet::LargeCommandInfo::GotPart(unsigned int i)
{
    received[i]=true;
    for (unsigned int j=0; j<uCount; j++) {
        if (!received[j]) return false;
    }
    return true;
}

const unsigned __int8 CCTPNet::OptPing=CCTPNet::Options::DelAfterError|CCTPNet::Options::NoResend|CCTPNet::Options::UniqueCommand;
const unsigned __int16 CCTPNet::m_iConfirm=0x8000;

CCTPNet::CCTPNet(NetReceiver* receiver,unsigned short port,unsigned short servers,Times* times,ostream* log,unsigned __int16 packetdatasize,unsigned short maxdeliverers)
{
    // Tuning
    m_DefReceiver=receiver;
    m_uPort=port;
    if (times) m_Times=*times;
    m_uPacketDataSize=packetdatasize;
    m_pLog=log;

    // Initialize random generator
    srand((unsigned)time(NULL));

    // Initialization
    m_bSuspended=true;
    m_SntCommands.clear();
    m_Sessions.clear();
    m_LargeCommands.clear();
    m_Receivers.clear();
    m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];
    m_Deliveries.clear();
    m_pDeliverTrds.clear();
    m_uMaxDeliverers=maxdeliverers;
    m_uBusy=0;

    CreateSockets();

    m_bKill=false;

    // Start threads and store handles
    for (unsigned int i=0;i<servers;i++) {
        m_pServerTrds.push_back(AfxBeginThread(CTPServerFunction,this));
    }
    m_pDelManTrd=AfxBeginThread(CTPDelManFunction,this);

    LOG(m_pLog,m_csLog,"CTP started on port "<<port<<" with "<<servers<<" servers\n");
}

CCTPNet::~CCTPNet()
{
    LOG(m_pLog,m_csLog,"CTP is shuting down");

    // Terminate threads
    m_bKill=true;
    DWORD time=GetTickCount();
    while ((!m_pDeliverTrds.empty()) || !m_pServerTrds.empty() || m_pDelManTrd) {
        Sleep(m_Times.uSleepOnDestroy);
        // Kill servers, deliverers and delivery manager if they are busy too long
        if (GetTickCount()-time>m_Times.uPeriodDestroy) {
            CSingleLock locks(&m_csServerTrds);
            LOCK(locks);
            for (vector<CWinThread*>::iterator it=m_pServerTrds.begin(); it!=m_pServerTrds.end(); it++) {
                LOG(m_pLog,m_csLog,"Server thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
                TerminateThread((*it)->m_hThread,0);
            }
            m_pServerTrds.clear();
            UNLOCK(locks);

            CSingleLock lockd(&m_csDeliverTrds);
            LOCK(lockd);
            for (it=m_pDeliverTrds.begin(); it!=m_pDeliverTrds.end(); it++) {
                LOG(m_pLog,m_csLog,"Deliverer thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
                TerminateThread((*it)->m_hThread,0);
            }
            m_pDeliverTrds.clear();
            UNLOCK(lockd);

            if (m_pDelManTrd) {
                LOG(m_pLog,m_csLog,"Delivery manager thread was stopped forcedly");
                TerminateThread(m_pDelManTrd->m_hThread,0);
            }
        }
    }

    // Free resources
    closesocket(m_SendSocket);
    closesocket(m_RecvSocket);
    FreeSntCommands();
    FreeSessions();
    FreeLargeCommands();
    FreeDeliveries();
    m_Receivers.clear();
    delete[] m_pBuffer;

    LOG(m_pLog,m_csLog,"CTP stopped");
}

bool CCTPNet::CreateSockets()
{
    LOG(m_pLog,m_csLog,"Creating sockets");

    CSingleLock lock(&m_csDeliveries);

    // Sockets creation
    m_SendSocket=socket(AF_INET, SOCK_DGRAM, 0);
    m_RecvSocket=socket(AF_INET, SOCK_DGRAM, 0);
    if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
        LOCK(lock);
        m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(0,0,WSAGetLastError(),IPAddr())));
        UNLOCK(lock);
        LOG(m_pLog,m_csLog,"Failed to create sockets");
        m_bSuspended=true;
        return false;
    }
    // Sockets are to support broadcasting
    BOOL broadcast=TRUE;
    setsockopt(m_SendSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
    setsockopt(m_RecvSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));

    // Binding local address with receiving socket
    m_Local.sin_family=AF_INET;
    m_Local.sin_port=htons(m_uPort);
    m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
    if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
    {
        LOCK(lock);
        m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(1,0,WSAGetLastError(),IPAddr())));
        UNLOCK(lock);
        LOG(m_pLog,m_csLog,"Failed to bind receiving socket");
        m_bSuspended=true;
        return false;
    }

    return true;
}

void CCTPNet::CheckupOptions(Header& header)
{
    if (header.amount>1 && header.options&Options::UniqueCommand) header.options^=Options::UniqueCommand;
    if (header.options&Options::StartSession) header.options^=Options::StartSession;
}

bool CCTPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 options, bool storeiffail)
{
    // Arrange header
    Header head;
    head.amount=sb.GetPacketsCount();
    head.command=command;
    head.messize=sb.GetDataSize();
    head.options=options;
    CheckupOptions(head);

    // Provide data with headers
    for (unsigned int i=0;i<head.amount;i++) {
        // Fill rest header information
        GetNextID(head,to);
        head.size=(unsigned __int16)sb.GetPacketSize(i);
        head.number=i;

        // Put header in the packet
        sb.PutHead(&head,i);
    }

    // Store message to be sent and pointer to it
    SntCommandInfo ci(sb,GetTickCount(),to.Solid);
    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);
    m_SntCommands.push_back(ci);
    SntCommandInfoList::iterator curentry=--m_SntCommands.end();
    UNLOCK(lock);

    // Send packets
    for (i=0;i<head.amount;i++) {
        if (!SendPacket(sb.GetHeadPtr(i),to.Solid)) {
            // Store error information
            CSingleLock lock(&m_csDeliveries);
            LOCK(lock);
            m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(2,head.command,WSAGetLastError(),IPAddr(to.Solid))));
            UNLOCK(lock);

            // Delete from sent packets storage if there was an error and storeiffail equals false
            if (!storeiffail) {
                CSingleLock lock(&m_csSntCommands);
                LOCK(lock);
                m_SntCommands.erase(curentry);
                UNLOCK(lock);
            }

            // Exit sending function with return value, which shows error
            return false;
        }
    }

    // Exit sending function with return value, which shows success
    return true;
}

bool CCTPNet::SendPacket(char* buf, unsigned long to)
{
    SOCKADDR_IN recip;
    recip.sin_family=AF_INET;
    recip.sin_port=htons(m_uPort);
    recip.sin_addr.s_addr=to;
    Header* head=(Header*)buf;

    LOGHA(m_pLog,m_csLog,"Send packet "," to "<<saddr,head,IPAddr(to));

    return (sendto(m_SendSocket,(const char*)buf,head->size,0,(SOCKADDR*)&recip,sizeof(recip))!=SOCKET_ERROR);
}

void CCTPNet::FreeSntCommands()
{
    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);
    for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();it++) {
        it->Free();
    }
    m_SntCommands.clear();
    UNLOCK(lock);

    LOG(m_pLog,m_csLog,"Sent commands storage freed");
}

void CCTPNet::FreeSessions()
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    for (SessionsInfo::iterator it=m_Sessions.begin();it!=m_Sessions.end();it++) {
        it->second.received.clear();
    }
    m_Sessions.clear();
    UNLOCK(lock);

    LOG(m_pLog,m_csLog,"Sessions information storage freed");
}

void CCTPNet::FreeLargeCommands()
{
    CSingleLock lock(&m_csLargeCommands);
    LOCK(lock);
    for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
        it->Free();
        if (it->pRD) delete it->pRD;
    }
    m_LargeCommands.clear();
    UNLOCK(lock);

    LOG(m_pLog,m_csLog,"Large commands storage freed");
}

void CCTPNet::FreeDeliveries()
{
    for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
        if (it->data) {
            if (it->type==DeliveryType::ReceivedData) delete (CCTPReceivedData*)it->data; else
            if (it->type==DeliveryType::ErrorInfo) delete (CCTPErrorInfo*)it->data; else
            delete it->data;
        }
    }
    m_Deliveries.clear();

    LOG(m_pLog,m_csLog,"Deliveries storage freed");
}

bool CCTPNet::SaveRcvPacket(unsigned long from,Header* head)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    SessionInfo& si=GetSessionInfo(IPAddr(from),head->options&Options::Broadcast?true:false);

    if (si.received.empty()) {
        // First message from corresponding workstation
        si.received.push_back(head->id);
    } else {
        // Find place among already received messages
        if (Less(si.received.back(),head->id)) {
            si.received.push_back(head->id);
        } else {
            for (SessionInfo::RcvList::iterator it=si.received.begin();it!=si.received.end();it++) {
                // Already has been recieved
                if ((*it)==head->id) {
                    LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
                    return false;
                }
                if (Less(head->id,*it)) {
                    // Already has been recieved
                    if (it==si.received.begin() && si.minwasset) {
                        LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
                        return false;
                    }
                    // New one has been recieved
                    si.received.insert(it,head->id);
                    break;
                }
            }
        }

        // Remove ambigous information
        if (si.minwasset) {
            while ((si.received.front()+1)==*(++si.received.begin())) {
                si.received.pop_front();
            }
        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -