📄 uploadqueue.cpp
字号:
//this file is part of eMule
//Copyright (C)2002 Merkur ( devs@emule-project.net / http://www.emule-project.net )
//
//This program is free software; you can redistribute it and/or
//modify it under the terms of the GNU General Public License
//as published by the Free Software Foundation; either
//version 2 of the License, or (at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program; if not, write to the Free Software
//Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#include "stdafx.h"
#include "emule.h"
#include "UploadQueue.h"
#include "Packets.h"
#include "KnownFile.h"
#include "ListenSocket.h"
#include "Exceptions.h"
#include "Scheduler.h"
#include "PerfLog.h"
#include "UploadBandwidthThrottler.h"
#include "ClientList.h"
#include "LastCommonRouteFinder.h"
#include "DownloadQueue.h"
#include "FriendList.h"
#include "Statistics.h"
#include "MMServer.h"
#include "OtherFunctions.h"
#include "UpDownClient.h"
#include "SharedFileList.h"
#include "KnownFileList.h"
#include "Sockets.h"
#include "ClientCredits.h"
#include "Server.h"
#include "ServerList.h"
#include "WebServer.h"
#include "emuledlg.h"
#include "ServerWnd.h"
#include "TransferWnd.h"
#include "SearchDlg.h"
#include "StatisticsDlg.h"
#include "Kademlia/Kademlia/Kademlia.h"
#include "Kademlia/Kademlia/Prefs.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
static uint32 counter, sec, statsave;
static UINT _uSaveStatistics = 0;
// -khaos--+++> Added iupdateconnstats...
static uint32 igraph, istats, iupdateconnstats;
// <-----khaos-
//TODO rewrite the whole networkcode, use overlapped sockets.. sure....
CUploadQueue::CUploadQueue()
{
VERIFY( (h_timer = SetTimer(0,0,100,UploadTimer)) != NULL );
if (thePrefs.GetVerbose() && !h_timer)
AddDebugLogLine(true,_T("Failed to create 'upload queue' timer - %s"),GetErrorMessage(GetLastError()));
datarate = 0;
counter=0;
successfullupcount = 0;
failedupcount = 0;
totaluploadtime = 0;
m_nLastStartUpload = 0;
statsave=0;
// -khaos--+++>
iupdateconnstats=0;
// <-----khaos-
m_dwRemovedClientByScore = ::GetTickCount();
m_iHighestNumberOfFullyActivatedSlotsSinceLastCall = 0;
m_MaxActiveClients = 0;
m_MaxActiveClientsShortTime = 0;
m_lastCalculatedDataRateTick = 0;
m_avarage_dr_sum = 0;
friendDatarate = 0;
m_dwLastResortedUploadSlots = 0;
}
/**
* Find the highest ranking client in the waiting queue, and return it.
*
* Low id client are ranked as lowest possible, unless they are currently connected.
* A low id client that is not connected, but would have been ranked highest if it
* had been connected, gets a flag set. This flag means that the client should be
* allowed to get an upload slot immediately once it connects.
*
* @return address of the highest ranking client.
*/
CUpDownClient* CUploadQueue::FindBestClientInQueue(bool allowLowIdAddNextConnectToBeSet) {
POSITION toadd = 0;
POSITION toaddlow = 0;
uint32 bestscore = 0;
uint32 bestlowscore = 0;
CUpDownClient* newclient = NULL;
CUpDownClient* lowclient = NULL;
POSITION pos1, pos2;
for (pos1 = waitinglist.GetHeadPosition();( pos2 = pos1 ) != NULL;)
{
waitinglist.GetNext(pos1);
CUpDownClient* cur_client = waitinglist.GetAt(pos2);
//While we are going through this list.. Lets check if a client appears to have left the network..
ASSERT ( cur_client->GetLastUpRequest() );
if ((::GetTickCount() - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME) || !theApp.sharedfiles->GetFileByID(cur_client->GetUploadFileID()) )
{
//This client has either not been seen in a long time, or we no longer share the file he wanted anymore..
cur_client->ClearWaitStartTime();
RemoveFromWaitingQueue(pos2,true);
continue;
}
else
{
// finished clearing
uint32 cur_score = cur_client->GetScore(false);
if ( cur_score > bestscore)
{
// cur_client is more worthy than current best client that is ready to go (connected).
if(!cur_client->HasLowID() || (cur_client->socket && cur_client->socket->IsConnected())) {
// this client is a HighID or a lowID client that is ready to go (connected)
// and it is more worthy
bestscore = cur_score;
toadd = pos2;
newclient = waitinglist.GetAt(toadd);
} else if(allowLowIdAddNextConnectToBeSet && !cur_client->m_bAddNextConnect) {
// this client is a lowID client that is not ready to go (not connected)
// now that we know this client is not ready to go, compare it to the best not ready client
// the best not ready client may be better than the best ready client, so we need to check
// against that client
if (cur_score > bestlowscore)
{
// it is more worthy, keep it
bestlowscore = cur_score;
toaddlow = pos2;
lowclient = waitinglist.GetAt(toaddlow);
}
}
} else {
// cur_client is more worthy. Save it.
}
}
}
if (bestlowscore > bestscore && lowclient && allowLowIdAddNextConnectToBeSet)
{
lowclient->m_bAddNextConnect = true;
}
if (!toadd)
{
return NULL;
}
else
{
return waitinglist.GetAt(toadd);
}
}
void CUploadQueue::InsertInUploadingList(CUpDownClient* newclient) {
// Add it last
theApp.uploadBandwidthThrottler->AddToStandardList(uploadinglist.GetCount(), newclient->GetFileUploadSocket());
uploadinglist.AddTail(newclient);
newclient->SetSlotNumber(uploadinglist.GetCount());
}
bool CUploadQueue::AddUpNextClient(CUpDownClient* directadd){
CUpDownClient* newclient = NULL;
// select next client or use given client
if (!directadd)
{
newclient = FindBestClientInQueue(true);
if(newclient) {
//Set this true so a tagged lowID can get the next reserved slot
lastupslotHighID = true;
//AddLogLine(true,"Added High ID: %s", newclient->GetUserName());
RemoveFromWaitingQueue(newclient, true);
theApp.emuledlg->transferwnd->ShowQueueCount(waitinglist.GetCount());
}
}
else
{
newclient = directadd;
/*if (!IsDownloading(newclient)){
if (newclient->HasLowID())
AddLogLine(true,"DirectAdd: LowID: %s", newclient->GetUserName());
else
AddLogLine(true,"DirectAdd: HighID: %s", newclient->GetUserName());
}*/
}
if(newclient == NULL) {
return false;
}
if (!thePrefs.TransferFullChunks())
UpdateMaxClientScore(); // refresh score caching, now that the highest score is removed
if (IsDownloading(newclient))
{
return false;
}
// tell the client that we are now ready to upload
if (!newclient->socket || !newclient->socket->IsConnected())
{
newclient->SetUploadState(US_CONNECTING);
if (!newclient->TryToConnect(true))
return false;
}
else
{
if (thePrefs.GetDebugClientTCPLevel() > 0)
DebugSend("OP__AcceptUploadReq", newclient);
Packet* packet = new Packet(OP_ACCEPTUPLOADREQ,0);
theStats.AddUpDataOverheadFileRequest(packet->size);
newclient->socket->SendPacket(packet,true);
newclient->SetUploadState(US_UPLOADING);
}
newclient->SetUpStartTime();
newclient->ResetSessionUp();
InsertInUploadingList(newclient);
m_nLastStartUpload = ::GetTickCount();
// statistic
CKnownFile* reqfile = theApp.sharedfiles->GetFileByID((uchar*)newclient->GetUploadFileID());
if (reqfile){
reqfile->statistic.AddAccepted();
}
theApp.emuledlg->transferwnd->uploadlistctrl.AddClient(newclient);
return true;
}
void CUploadQueue::UpdateActiveClientsInfo(DWORD curTick) {
// Save number of active clients for statistics
uint32 tempHighest = theApp.uploadBandwidthThrottler->GetHighestNumberOfFullyActivatedSlotsSinceLastCallAndReset();
if(thePrefs.GetLogUlDlEvents() && theApp.uploadBandwidthThrottler->GetStandardListSize() > (uint32)uploadinglist.GetSize()) {
// debug info, will remove this when I'm done.
//AddDebugLogLine(false, _T("UploadQueue: Error! Throttler has more slots than UploadQueue! Throttler: %i UploadQueue: %i Tick: %i"), theApp.uploadBandwidthThrottler->GetStandardListSize(), uploadinglist.GetSize(), ::GetTickCount());
if(tempHighest > (uint32)uploadinglist.GetSize()) {
tempHighest = uploadinglist.GetSize();
}
}
m_iHighestNumberOfFullyActivatedSlotsSinceLastCall = tempHighest;
// save 15 minutes of data about number of fully active clients
uint32 tempMaxRemoved = 0;
while(!activeClients_tick_list.IsEmpty() && !activeClients_list.IsEmpty() && curTick-activeClients_tick_list.GetHead() > 20*1000) {
activeClients_tick_list.RemoveHead();
uint32 removed = activeClients_list.RemoveHead();
if(removed > tempMaxRemoved) {
tempMaxRemoved = removed;
}
}
activeClients_list.AddTail(m_iHighestNumberOfFullyActivatedSlotsSinceLastCall);
activeClients_tick_list.AddTail(curTick);
if(activeClients_tick_list.GetSize() > 1) {
uint32 tempMaxActiveClients = m_iHighestNumberOfFullyActivatedSlotsSinceLastCall;
uint32 tempMaxActiveClientsShortTime = m_iHighestNumberOfFullyActivatedSlotsSinceLastCall;
POSITION activeClientsTickPos = activeClients_tick_list.GetTailPosition();
POSITION activeClientsListPos = activeClients_list.GetTailPosition();
while(activeClientsListPos != NULL && (tempMaxRemoved > tempMaxActiveClients && tempMaxRemoved >= m_MaxActiveClients || curTick - activeClients_tick_list.GetAt(activeClientsTickPos) < 10 * 1000)) {
DWORD activeClientsTickSnapshot = activeClients_tick_list.GetAt(activeClientsTickPos);
uint32 activeClientsSnapshot = activeClients_list.GetAt(activeClientsListPos);
if(activeClientsSnapshot > tempMaxActiveClients) {
tempMaxActiveClients = activeClientsSnapshot;
}
if(activeClientsSnapshot > tempMaxActiveClientsShortTime && curTick - activeClientsTickSnapshot < 10 * 1000) {
tempMaxActiveClientsShortTime = activeClientsSnapshot;
}
activeClients_tick_list.GetPrev(activeClientsTickPos);
activeClients_list.GetPrev(activeClientsListPos);
}
if(tempMaxActiveClients > m_MaxActiveClients) {
m_MaxActiveClients = tempMaxActiveClients;
}
m_MaxActiveClientsShortTime = tempMaxActiveClientsShortTime;
} else {
m_MaxActiveClients = m_iHighestNumberOfFullyActivatedSlotsSinceLastCall;
m_MaxActiveClientsShortTime = m_iHighestNumberOfFullyActivatedSlotsSinceLastCall;
}
}
/**
* Maintenance method for the uploading slots. It adds and removes clients to the
* uploading list. It also makes sure that all the uploading slots' Sockets always have
* enough packets in their queues, etc.
*
* This method is called aproximately once every 100 milliseconds.
*/
void CUploadQueue::Process() {
theApp.sharedfiles->Publish();
DWORD curTick = ::GetTickCount();
UpdateActiveClientsInfo(curTick);
if (ForceNewClient()){
// There's not enough open uploads. Open another one.
AddUpNextClient();
}
// The loop that feeds the upload slots with data.
POSITION pos = uploadinglist.GetHeadPosition();
while(pos != NULL){
// Get the client. Note! Also updates pos as a side effect.
CUpDownClient* cur_client = uploadinglist.GetNext(pos);
if (thePrefs.m_iDbgHeap >= 2)
ASSERT_VALID(cur_client);
//It seems chatting or friend slots can get stuck at times in upload.. This needs looked into..
if (!cur_client->socket)
{
RemoveFromUploadQueue(cur_client, _T("Uploading to client without socket? (CUploadQueue::Process)"));
if(cur_client->Disconnected(_T("CUploadQueue::Process"))){
delete cur_client;
}
} else {
cur_client->SendBlockData();
}
}
// Save used bandwidth for speed calculations
uint64 sentBytes = theApp.uploadBandwidthThrottler->GetNumberOfSentBytesSinceLastCallAndReset();
avarage_dr_list.AddTail(sentBytes);
m_avarage_dr_sum += sentBytes;
uint64 sentBytesOverhead = theApp.uploadBandwidthThrottler->GetNumberOfSentBytesOverheadSinceLastCallAndReset();
avarage_friend_dr_list.AddTail(theStats.sessionSentBytesToFriend);
// Save time beetween each speed snapshot
avarage_tick_list.AddTail(curTick);
// don't save more than 30 secs of data
while(avarage_tick_list.GetCount() > 3 && !avarage_friend_dr_list.IsEmpty() && ::GetTickCount()-avarage_tick_list.GetHead() > 30*1000) {
m_avarage_dr_sum -= avarage_dr_list.RemoveHead();
avarage_friend_dr_list.RemoveHead();
avarage_tick_list.RemoveHead();
}
};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -