⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ctpnet.cpp

📁 用UDP写的可靠传输,非常有借鉴意义,适合互连网通讯
💻 CPP
📖 第 1 页 / 共 2 页
字号:
    }

    // Session was started with this packet
    if (head->options&Options::StartSession) {
        si.minwasset=true;
        LOGA(m_pLog,m_csLog,"Session starting packet with ID:"<<(unsigned int)head->id<<" have been received from "<<saddr,IPAddr(from));
    }

    UNLOCK(lock);
    return true;
}

void CCTPNet::SendConfirmation(unsigned long to,Header header)
{
    // Change header
    header.command|=m_iConfirm;
    header.messize=0;
    header.size=GetHeaderSize();

    // Send confirmation command
    SendPacket((char*)&header,to);
}

void CCTPNet::ConfirmSntPacket(unsigned long to,Header* header)
{
    Header* head;
    bool erased=false;
    // Is command unique
    bool unique=(header->options&Options::UniqueCommand)&&(header->amount<=1);

    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);

    for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
        erased=false;
        head=(Header*)it->sbBody.GetBufferBegin();

        // Find appropriate recipient or accept any if message was broadcasted
        if (it->ipTo==to || head->options&Options::Broadcast) {
            if (!unique) {
                // Not unique command
                if (head->id==header->id-header->number) {
                    if (it->Confirm(header->number)) {
                        SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
                        LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
                        it->Free();
                        m_SntCommands.erase(it);
                    }
                    UNLOCK(lock);
                    return;
                }
            } else {
                // Unique command
                if ((head->command^header->command)==m_iConfirm) {
                    if (it->Confirm(0)) {
                        SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
                        LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
                        it->Free();
                        it=m_SntCommands.erase(it);
                        erased=true;
                        if (it==m_SntCommands.end()) break;
                    }
                }
            }
        }

        // Go to next sent packet
        if (!erased) it++;
    }

    UNLOCK(lock);
}

bool CCTPNet::ArrangeLargeCommand(unsigned long from,Header* head)
{
    LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" is a part of large command",head,IPAddr(from));

    char* mem=NULL;
    CSingleLock lock(&m_csLargeCommands);
    LOCK(lock);

    // Try to find packets, wich belongs to the same message in received parts
    for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
        if (it->id==head->id-head->number && it->pRD->from==from && !it->received[head->number]) {
            mem=it->pRD->pBuf;
            mem+=head->number*m_uPacketDataSize;
            memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
            if (it->GotPart(head->number)) {
                LOG(m_pLog,m_csLog,"Large command arranged");
                CSingleLock lock(&m_csDeliveries);
                LOCK(lock);
                m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ReceivedData),it->pRD));
                UNLOCK(lock);
                it->Free();
                m_LargeCommands.erase(it);
                return true;
            } else return false;
        }
    }

    // Part of the new message received
    LargeCommandInfo lpi(head->command,head->messize,from,head->id,head->amount);
    mem=lpi.pRD->pBuf;
    mem+=head->number*m_uPacketDataSize;
    memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
    m_LargeCommands.push_front(lpi);
    m_LargeCommands.front().GotPart(head->number);

    return false;
}

void CCTPNet::ResendNotConfirmedData()
{
    Header* head;

    DWORD time=GetTickCount();
    bool erased=false;
    unsigned int i=0;
    // This fleag is used to keep from showing error for all parts of the large
    // message
    bool wasdead=false;

    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);
    for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
        erased=false;
        head=(Header*)it->sbBody.GetBufferBegin();
        for (i=0; i<it->uCount; i++) if (!it->CI[i].bConfirmed) {
            // If timeout have expired
            if ((unsigned int)(time-it->CI[i].dwLTime)>it->CI[i].uResend*GetTimeout(it->ipTo,head->options&Options::Broadcast?true:false)) {
                if (!(head->options&Options::NoResend)) {
                    LOGHA(m_pLog,m_csLog,"Packet "," is to be resent to "<<saddr,((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
                    SendPacket(it->sbBody.GetHeadPtr(i),it->ipTo);
                }
                it->CI[i].dwLTime=time;
                it->CI[i].IncResend();

                // Produce error if dead timeout occuired
                if (it->CI[i].IsDeadTimeout() && !wasdead){
                    CSingleLock lock(&m_csDeliveries);
                    LOCK(lock);
                    m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ErrorInfo),new CCTPErrorInfo(4,head->command,WSAGetLastError(),IPAddr(it->ipTo))));
                    UNLOCK(lock);
                    wasdead=true;

                    // Erase if needed
                    if (head->options&Options::DelAfterError) {
                        LOGHA(m_pLog,m_csLog,"Command, sent to "<<saddr<<", which includes packet ",", is delete from sent commands storage after generating error delivery",((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
                        m_SntCommands.erase(it);
                        erased=true;
                        break;
                    }
                }
            }
        }

        // Go to next element
        if (!erased) it++;
    }
    UNLOCK(lock);
}

void CCTPNet::AddSpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type)
{
    if (command==0) return;
    for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
        if (it->command==command && it->type==type) {
            it=m_Receivers.erase(it);
            if (it==m_Receivers.end()) break;
        } else it++;
    }
    m_Receivers.push_front(SpecialReceiver(command,receiver,type));
}

void CCTPNet::DeleteSpecialReceiver(NetReceiver* receiver)
{
    for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
        if (it->receiver==receiver) {
            it=m_Receivers.erase(it);
            if (it==m_Receivers.end()) break;
        } else it++;
    }
}

NetReceiver* CCTPNet::GetReceiver(unsigned __int16 command, DeliveryType type)
{
    if (command==0) return m_DefReceiver;
    for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();it++) {
        if (it->command==command && it->type==type) return it->receiver;
    }
    return m_DefReceiver;
}

CCTPNet::SessionInfo& CCTPNet::GetSessionInfo(IPAddr addr, bool bcast)
{
    if (bcast) addr.SetBroadcast();
    return m_Sessions.insert(SessionsInfo::value_type(addr.Solid,SessionInfo())).first->second;
}

void CCTPNet::GetNextID(Header& head,IPAddr addr)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    if (m_Sessions.find(addr.Solid)==m_Sessions.end()) head.options|=Options::StartSession;
    SessionInfo& si=GetSessionInfo(addr,head.options&Options::Broadcast?true:false);
    head.id=++si.id;
}

unsigned int CCTPNet::GetTimeout(IPAddr addr, bool bcast)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    SessionInfo& si=GetSessionInfo(addr,bcast);
    if (!(si.timeout)) return m_Times.uDefTimeout; else return si.timeout;
}

void CCTPNet::SetTimeout(IPAddr addr, bool bcast, unsigned int timeout)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    SessionInfo& si=GetSessionInfo(addr,bcast);
    if (!(si.timeout)) {
        LOGA(m_pLog,m_csLog,"Timeout for session with "<<saddr<<" is considered to be "<<timeout<<" microseconds",addr);
        si.timeout=timeout;
    }
}

unsigned int CTPServerFunction(void* pNet)
{
    CCTPNet* net=(CCTPNet*)pNet;

    // Necessary variables
    timeval tv;
    tv.tv_sec=0;
    tv.tv_usec=1;
    fd_set fdread;
    SOCKADDR_IN sender;
    int sendersize=NULL;
    CCTPNet::Header* head;
    DWORD time=NULL;
    DWORD checktime=GetTickCount();
    CSingleLock lock(&net->m_csDeliveries);
    CSingleLock lockn(&net->m_csNetwork);


    for(;;) {
        // Wait while suspended
        while (net->GetSuspended()) {
            // Does killing needed
            if (net->m_bKill) {
                CSingleLock lock(&net->m_csServerTrds);
                LOCK(lock);
                net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
                UNLOCK(lock);
                LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
                return (unsigned int)AfxGetThread();
            }

            // Sleep a little bit
            Sleep(net->GetTimes().uSleepSuspended);
        }

        // Check for received data
        time=GetTickCount();
        FD_ZERO(&fdread);
        FD_SET(net->m_RecvSocket,&fdread);
        LOCK(lockn);
        if (select(0,&fdread,NULL,NULL,&tv)>0) {
            // Receive data
            sendersize=sizeof(sender);
            int ret=recvfrom(net->m_RecvSocket,net->m_pBuffer,net->GetPacketDataSize()+net->GetHeaderSize(),0,(SOCKADDR*)&sender,&sendersize);
            UNLOCK(lockn);
            if (ret==SOCKET_ERROR) {
                // Error while receiving
                LOCK(lock);
                net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetDefaultReceiver(),new CCTPErrorInfo(3,0,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
                UNLOCK(lock);
                LOG(net->m_pLog,net->m_csLog,"Network receiving error");
            } else {
                if (ret>=net->GetHeaderSize()) {
                    head=(CCTPNet::Header*)net->m_pBuffer;
                    LOGHA(net->m_pLog,net->m_csLog,"Packet "," have been received from "<<saddr,head,IPAddr(sender.sin_addr.S_un.S_addr));

                    if (net->IsConfirmation(head->command)) {
                        net->ConfirmSntPacket(sender.sin_addr.S_un.S_addr,head);
                    } else {
                        if (net->SaveRcvPacket(sender.sin_addr.S_un.S_addr,head)) {
                            // New packet was got
                            if (head->amount>1) {
                                net->ArrangeLargeCommand(sender.sin_addr.S_un.S_addr,head);
                            } else {
                                LOCK(lock);
                                net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetReceiver(head->command,CCTPNet::DeliveryType::ReceivedData),new CCTPReceivedData(head->command,head->messize,sender.sin_addr.S_un.S_addr,net->m_pBuffer+net->GetHeaderSize())));
                                UNLOCK(lock);
                            }
                        }
                        // Send confimation
                        net->SendConfirmation(sender.sin_addr.S_un.S_addr,*head);
                    }
                }
            }
        } else UNLOCK(lockn);

        // Does killing needed
        if (net->m_bKill) {
            CSingleLock lock(&net->m_csServerTrds);
            LOCK(lock);
            net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
            UNLOCK(lock);
            LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
            return (unsigned int)AfxGetThread();
        }

        // Sent packets were not checked long enough
        if ((time-checktime)>net->GetTimes().uPeriodCheckResend) {
            // Check if resending needed
            checktime=GetTickCount();
            net->ResendNotConfirmedData();
        }
    }
}

unsigned int CTPDelManFunction(void* pNet)
{
    CCTPNet* net=(CCTPNet*)pNet;
    CCTPNet::Delivery del;
    CSingleLock lock(&net->m_csDeliverTrds);

    for(;;) {
        // Does additional delivery threads needed
        if (!net->GetSuspended()) {
            LOCK(lock);
            if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
                net->m_pDeliverTrds.push_back(AfxBeginThread(CTPDeliverFunction,pNet));
                LOG(net->m_pLog,net->m_csLog,"Delivery manager have created deliverer with handle "<<net->m_pDeliverTrds.back()->m_hThread);
            }
            UNLOCK(lock);
        }

        // Kill server
        if (net->m_bKill) {
            net->m_pDelManTrd=NULL;
            LOG(net->m_pLog,net->m_csLog,"Delivery manager thread stopped");
            return (unsigned int)AfxGetThread();
        }

        Sleep(net->GetTimes().uSleepDelMan);
    }
}

unsigned int CTPDeliverFunction(void* pNet)
{
    CCTPNet* net=(CCTPNet*)pNet;
    CCTPNet::Delivery del;
    // If zero then something was done. If greater then nothing was done
    //(greater means doing nothing longer)
    bool bNothing=true;
    DWORD lastdel=GetTickCount();
    CSingleLock lock(&net->m_csDeliveries);

    for(;;) {
        bNothing=true;

        for (;;) {
            // Get delivery if exist
            LOCK(lock);
            if (net->m_Deliveries.empty()) break;
            del=net->m_Deliveries.front();
            net->m_Deliveries.pop_front();
            UNLOCK(lock);

            LOG(net->m_pLog,net->m_csLog,"Delivery is beeing processed");

            // Deliver delivery
            net->m_uBusy++;
            switch (del.type) {
            case CCTPNet::DeliveryType::ReceivedData:
                del.target->OnReceive(del.data);
                delete (CCTPReceivedData*)del.data;
            break;
            case CCTPNet::DeliveryType::ErrorInfo:
                del.target->OnError(del.data);
                delete (CCTPErrorInfo*)del.data;
            break;
            default:
                if (VALID(del.data)) delete del.data;
            }
            net->m_uBusy--;
            lastdel=GetTickCount();

            // Thread is working now
            bNothing=false;

            // If killing needed then do it after unlocking
            if (net->m_bKill) break;
        }
        if (lock.IsLocked()) UNLOCK(lock);

        // Does killing needed (because of request or because of sponging)
        if (net->m_bKill || (net->m_Deliveries.size()==0 && GetTickCount()-lastdel>net->GetTimes().uPeriodAutoDest)) {
            CSingleLock lock(&net->m_csDeliverTrds);
            LOCK(lock);
            net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
            UNLOCK(lock);
            LOG(net->m_pLog,net->m_csLog,"Deliverer thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
            return (unsigned int)AfxGetThread();
        }
        
        // Sleep a little bit
        if (bNothing) {
            Sleep(net->GetTimes().uSleepNothing);
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -