📄 uploadbandwidththrottler.cpp
字号:
void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledFileSocket* socket) {
// Get critical section
sendLocker.Lock();
if(doRun) {
RemoveFromAllQueues(socket, false);
// And remove it from upload slots
RemoveFromStandardListNoLock(socket);
}
// End critical section
sendLocker.Unlock();
}
/**
* Make the thread exit. This method will not return until the thread has stopped
* looping. This guarantees that the thread will not access the CEMSockets after this
* call has exited.
*/
void UploadBandwidthThrottler::EndThread() {
sendLocker.Lock();
// signal the thread to stop looping and exit.
doRun = false;
sendLocker.Unlock();
Pause(false);
// wait for the thread to signal that it has stopped looping.
threadEndedEvent->Lock();
}
void UploadBandwidthThrottler::Pause(bool paused) {
if(paused) {
pauseEvent->ResetEvent();
} else {
pauseEvent->SetEvent();
}
}
/**
* Start the thread. Called from the constructor in this class.
*
* @param pParam
*
* @return
*/
UINT AFX_CDECL UploadBandwidthThrottler::RunProc(LPVOID pParam) {
DbgSetThreadName("UploadBandwidthThrottler");
InitThreadLocale();
UploadBandwidthThrottler* uploadBandwidthThrottler = (UploadBandwidthThrottler*)pParam;
return uploadBandwidthThrottler->RunInternal();
}
/**
* The thread method that handles calling send for the individual sockets.
*
* Control packets will always be tried to be sent first. If there is any bandwidth leftover
* after that, send() for the upload slot sockets will be called in priority order until we have run
* out of available bandwidth for this loop. Upload slots will not be allowed to go without having sent
* called for more than a defined amount of time (i.e. two seconds).
*
* @return always returns 0.
*/
UINT UploadBandwidthThrottler::RunInternal() {
DWORD lastLoopTick = ::GetTickCount();
sint64 realBytesToSpend = 0;
uint32 allowedDataRate = 0;
uint32 lastMaxAllowedDataRate = 1;
uint32 rememberedSlotCounter = 0;
DWORD lastTickReachedBandwidth = ::GetTickCount();
while(doRun) {
pauseEvent->Lock();
DWORD timeSinceLastLoop = ::GetTickCount() - lastLoopTick;
// Get current speed from UploadSpeedSense
allowedDataRate = theApp.lastCommonRouteFinder->GetUpload();
uint32 minFragSize = 1300;
uint32 doubleSendSize = minFragSize*2; // send two packages at a time so they can share an ACK
if(allowedDataRate < 6*1024) {
minFragSize = 536;
doubleSendSize = minFragSize; // don't send two packages at a time at very low speeds to give them a smoother load
}
#define TIME_BETWEEN_UPLOAD_LOOPS 1
uint32 sleepTime;
if(allowedDataRate == 0 || allowedDataRate == _UI32_MAX || realBytesToSpend >= 1000) {
// we could send at once, but sleep a while to not suck up all cpu
sleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
} else {
// sleep for just as long as we need to get back to having one byte to send
sleepTime = max((uint32)ceil((double)(-realBytesToSpend + 1000)/allowedDataRate), TIME_BETWEEN_UPLOAD_LOOPS);
}
if(timeSinceLastLoop < sleepTime) {
Sleep(sleepTime-timeSinceLastLoop);
}
const DWORD thisLoopTick = ::GetTickCount();
timeSinceLastLoop = thisLoopTick - lastLoopTick;
// Calculate how many bytes we can spend
sint64 bytesToSpend = 0;
if(allowedDataRate != 0 && allowedDataRate != _UI32_MAX) {
// prevent overflow
if(timeSinceLastLoop == 0) {
// no time has passed, so don't add any bytes. Shouldn't happen.
bytesToSpend = 0; //realBytesToSpend/1000;
} else if(_I64_MAX/timeSinceLastLoop > allowedDataRate && _I64_MAX-allowedDataRate*timeSinceLastLoop > realBytesToSpend) {
if(timeSinceLastLoop > sleepTime + 2000) {
theApp.QueueDebugLogLine(false,_T("UploadBandwidthThrottler: Time since last loop too long. time: %ims wanted: %ims Max: %ims"), timeSinceLastLoop, sleepTime, sleepTime + 2000);
timeSinceLastLoop = sleepTime + 2000;
lastLoopTick = thisLoopTick - timeSinceLastLoop;
}
realBytesToSpend += allowedDataRate*timeSinceLastLoop;
bytesToSpend = realBytesToSpend/1000;
} else {
realBytesToSpend = _I64_MAX;
bytesToSpend = _I32_MAX;
}
} else {
realBytesToSpend = 0; //_I64_MAX;
bytesToSpend = _I32_MAX;
}
lastLoopTick = thisLoopTick;
if(bytesToSpend >= 1) {
uint64 spentBytes = 0;
uint64 spentOverhead = 0;
tempQueueLocker.Lock();
// are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list;
while(!m_TempControlQueueFirst_list.IsEmpty()) {
ThrottledControlSocket* moveSocket = m_TempControlQueueFirst_list.RemoveHead();
m_ControlQueueFirst_list.AddTail(moveSocket);
}
while(!m_TempControlQueue_list.IsEmpty()) {
ThrottledControlSocket* moveSocket = m_TempControlQueue_list.RemoveHead();
m_ControlQueue_list.AddTail(moveSocket);
}
tempQueueLocker.Unlock();
sendLocker.Lock();
// Send any queued up control packets first
while(bytesToSpend > 0 && spentBytes < (uint64)bytesToSpend && (!m_ControlQueueFirst_list.IsEmpty() || !m_ControlQueue_list.IsEmpty())) {
ThrottledControlSocket* socket = NULL;
if(!m_ControlQueueFirst_list.IsEmpty()) {
socket = m_ControlQueueFirst_list.RemoveHead();
} else if(!m_ControlQueue_list.IsEmpty()) {
socket = m_ControlQueue_list.RemoveHead();
}
if(socket != NULL) {
SocketSentBytes socketSentBytes = socket->SendControlData(bytesToSpend-spentBytes, minFragSize);
uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
spentBytes += lastSpentBytes;
spentOverhead += socketSentBytes.sentBytesControlPackets;
}
}
// Check if any sockets haven't gotten data for a long time. Then trickle them a package.
for(uint32 slotCounter = 0; slotCounter < (uint32)m_StandardOrder_list.GetSize(); slotCounter++) {
ThrottledFileSocket* socket = m_StandardOrder_list.GetAt(slotCounter);
if(socket != NULL) {
if(thisLoopTick-socket->GetLastCalledSend() > SEC2MS(1)) {
// trickle
uint32 neededBytes = socket->GetNeededBytes();
if(neededBytes > 0) {
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);
uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
spentBytes += lastSpentBytes;
spentOverhead += socketSentBytes.sentBytesControlPackets;
if(lastSpentBytes > 0 && slotCounter < m_highestNumberOfFullyActivatedSlots) {
m_highestNumberOfFullyActivatedSlots = slotCounter;
}
}
}
} else {
theApp.QueueDebugLogLine(false,_T("There was a NULL socket in the UploadBandwidthThrottler Standard list (trickle)! Prevented usage. Index: %i Size: %i"), slotCounter, m_StandardOrder_list.GetSize());
}
}
// Equal bandwidth for all slots
uint32 maxSlot = (uint32)m_StandardOrder_list.GetSize();
if(maxSlot > 0 && allowedDataRate/maxSlot < UPLOAD_CLIENT_DATARATE) {
maxSlot = allowedDataRate/UPLOAD_CLIENT_DATARATE;
}
if(maxSlot > m_highestNumberOfFullyActivatedSlots) {
m_highestNumberOfFullyActivatedSlots = maxSlot;
}
for(uint32 maxCounter = 0; maxCounter < min(maxSlot, (uint32)m_StandardOrder_list.GetSize()) && bytesToSpend > 0 && spentBytes < (uint64)bytesToSpend; maxCounter++) {
if(rememberedSlotCounter >= (uint32)m_StandardOrder_list.GetSize() ||
rememberedSlotCounter >= maxSlot) {
rememberedSlotCounter = 0;
}
ThrottledFileSocket* socket = m_StandardOrder_list.GetAt(rememberedSlotCounter);
if(socket != NULL) {
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(min(doubleSendSize, bytesToSpend-spentBytes), doubleSendSize);
uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
spentBytes += lastSpentBytes;
spentOverhead += socketSentBytes.sentBytesControlPackets;
} else {
theApp.QueueDebugLogLine(false,_T("There was a NULL socket in the UploadBandwidthThrottler Standard list (equal-for-all)! Prevented usage. Index: %i Size: %i"), rememberedSlotCounter, m_StandardOrder_list.GetSize());
}
rememberedSlotCounter++;
}
// Any bandwidth that hasn't been used yet are used first to last.
for(uint32 slotCounter = 0; slotCounter < (uint32)m_StandardOrder_list.GetSize() && bytesToSpend > 0 && spentBytes < (uint64)bytesToSpend; slotCounter++) {
ThrottledFileSocket* socket = m_StandardOrder_list.GetAt(slotCounter);
if(socket != NULL) {
uint32 bytesToSpendTemp = bytesToSpend-spentBytes;
SocketSentBytes socketSentBytes = socket->SendFileAndControlData(bytesToSpendTemp, doubleSendSize);
uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
spentBytes += lastSpentBytes;
spentOverhead += socketSentBytes.sentBytesControlPackets;
if(slotCounter+1 > m_highestNumberOfFullyActivatedSlots && (lastSpentBytes < bytesToSpendTemp || lastSpentBytes >= doubleSendSize)) { // || lastSpentBytes > 0 && spentBytes == bytesToSpend /*|| slotCounter+1 == (uint32)m_StandardOrder_list.GetSize())*/)) {
m_highestNumberOfFullyActivatedSlots = slotCounter+1;
}
} else {
theApp.QueueDebugLogLine(false,_T("There was a NULL socket in the UploadBandwidthThrottler Standard list (fully activated)! Prevented usage. Index: %i Size: %i"), slotCounter, m_StandardOrder_list.GetSize());
}
}
realBytesToSpend -= spentBytes*1000;
if(realBytesToSpend < -(((sint64)m_StandardOrder_list.GetSize()+1)*minFragSize)*1000) {
sint64 newRealBytesToSpend = -(((sint64)m_StandardOrder_list.GetSize()+1)*minFragSize)*1000;
realBytesToSpend = newRealBytesToSpend;
lastTickReachedBandwidth = thisLoopTick;
} else {
uint64 bandwidthSavedTolerance = m_StandardOrder_list.GetSize()*512*1000;
if(realBytesToSpend > 0 && (uint64)realBytesToSpend > 999+bandwidthSavedTolerance) {
sint64 newRealBytesToSpend = 999+bandwidthSavedTolerance;
//theApp.QueueDebugLogLine(false,_T("UploadBandwidthThrottler::RunInternal(): Too high saved bytesToSpend. Limiting value. Old value: %I64i New value: %I64i"), realBytesToSpend, newRealBytesToSpend);
realBytesToSpend = newRealBytesToSpend;
if(thisLoopTick-lastTickReachedBandwidth > max(1000, timeSinceLastLoop*2)) {
m_highestNumberOfFullyActivatedSlots = m_StandardOrder_list.GetSize()+1;
lastTickReachedBandwidth = thisLoopTick;
//theApp.QueueDebugLogLine(false, _T("UploadBandwidthThrottler: Throttler requests new slot due to bw not reached. m_highestNumberOfFullyActivatedSlots: %i m_StandardOrder_list.GetSize(): %i tick: %i"), m_highestNumberOfFullyActivatedSlots, m_StandardOrder_list.GetSize(), thisLoopTick);
}
} else {
lastTickReachedBandwidth = thisLoopTick;
}
}
m_SentBytesSinceLastCall += spentBytes;
m_SentBytesSinceLastCallOverhead += spentOverhead;
sendLocker.Unlock();
}
}
threadEndedEvent->SetEvent();
tempQueueLocker.Lock();
m_TempControlQueue_list.RemoveAll();
m_TempControlQueueFirst_list.RemoveAll();
tempQueueLocker.Unlock();
sendLocker.Lock();
m_ControlQueue_list.RemoveAll();
m_StandardOrder_list.RemoveAll();
sendLocker.Unlock();
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -