📄 emsocket.cpp
字号:
// Bugfix We still need to check for a valid protocol
// Remark: the default eMule v0.26b had removed this test......
switch (pendingPacket->prot){
case OP_EDONKEYPROT:
case OP_PACKEDPROT:
case OP_EMULEPROT:
break;
default:
EMTrace("CEMSocket::OnReceive ERROR Wrong header");
delete pendingPacket;
pendingPacket = NULL;
OnError(ERR_WRONGHEADER);
return;
}
// Security: Check for buffer overflow (2MB)
if(pendingPacket->size > sizeof(GlobalReadBuffer)) {
delete pendingPacket;
pendingPacket = NULL;
OnError(ERR_TOOBIG);
return;
}
// Init data buffer
pendingPacket->pBuffer = new char[pendingPacket->size + 1];
pendingPacketSize = 0;
}
// Bytes ready to be copied into packet's internal buffer
ASSERT(rptr <= rend);
uint32 toCopy = ((pendingPacket->size - pendingPacketSize) < (uint32)(rend - rptr)) ?
(pendingPacket->size - pendingPacketSize) : (uint32)(rend - rptr);
// Copy Bytes from Global buffer to packet's internal buffer
memcpy(&pendingPacket->pBuffer[pendingPacketSize], rptr, toCopy);
pendingPacketSize += toCopy;
rptr += toCopy;
// Check if packet is complet
ASSERT(pendingPacket->size >= pendingPacketSize);
if(pendingPacket->size == pendingPacketSize){
#ifdef EMSOCKET_DEBUG
EMTrace("CEMSocket::PacketReceived on %d, opcode=%X, realSize=%d",
(SOCKET)this, pendingPacket->opcode, pendingPacket->GetRealPacketSize());
#endif
// Process packet
bool bPacketResult = PacketReceived(pendingPacket);
delete pendingPacket;
pendingPacket = NULL;
pendingPacketSize = 0;
if (!bPacketResult)
return;
}
}
// Finally, if there is any data left over, save it for next time
ASSERT(rptr <= rend);
ASSERT(rend - rptr < PACKET_HEADER_SIZE);
if(rptr != rend) {
// Keep the partial head
pendingHeaderSize = rend - rptr;
memcpy(pendingHeader, rptr, pendingHeaderSize);
}
}
void CEMSocket::SetDownloadLimit(uint32 limit){
downloadLimit = limit;
downloadLimitEnable = true;
// CPU load improvement
if(limit > 0 && pendingOnReceive == true){
OnReceive(0);
}
}
void CEMSocket::DisableDownloadLimit(){
downloadLimitEnable = false;
// CPU load improvement
if(pendingOnReceive == true){
OnReceive(0);
}
}
/**
* Queues up the packet to be sent. Another thread will actually send the packet.
*
* If the packet is not a control packet, and if the socket decides that its queue is
* full and forceAdd is false, then the socket is allowed to refuse to add the packet
* to its queue. It will then return false and it is up to the calling thread to try
* to call SendPacket for that packet again at a later time.
*
* @param packet address to the packet that should be added to the queue
*
* @param delpacket if true, the responsibility for deleting the packet after it has been sent
* has been transfered to this object. If false, don't delete the packet after it
* has been sent.
*
* @param controlpacket the packet is a controlpacket
*
* @param forceAdd this packet must be added to the queue, even if it is full. If this flag is true
* then the method can not refuse to add the packet, and therefore not return false.
*
* @return true if the packet was added to the queue, false otherwise
*/
void CEMSocket::SendPacket(Packet* packet, bool delpacket, bool controlpacket, uint32 actualPayloadSize){
//EMTrace("CEMSocket::OnSenPacked1 linked: %i, controlcount %i, standartcount %i, isbusy: %i",m_bLinkedPackets, controlpacket_queue.GetCount(), standartpacket_queue.GetCount(), IsBusy());
sendLocker.Lock();
if (byConnected == ES_DISCONNECTED) {
sendLocker.Unlock();
if(delpacket) {
delete packet;
}
return;
} else {
if (!delpacket){
//ASSERT ( !packet->IsSplitted() );
Packet* copy = new Packet(packet->opcode,packet->size);
memcpy(copy->pBuffer,packet->pBuffer,packet->size);
packet = copy;
}
//if(m_startSendTick > 0) {
// m_lastSendLatency = ::GetTickCount() - m_startSendTick;
//}
if (controlpacket) {
controlpacket_queue.AddTail(packet);
// queue up for controlpacket
theApp.uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent());
} else {
bool first = !((sendbuffer && !m_currentPacket_is_controlpacket) || !standartpacket_queue.IsEmpty());
StandardPacketQueueEntry queueEntry = { actualPayloadSize, packet };
standartpacket_queue.AddTail(queueEntry);
// reset timeout for the first time
if (first) {
lastFinishedStandard = ::GetTickCount();
m_bAccelerateUpload = true; // Always accelerate first packet in a block
}
}
}
sendLocker.Unlock();
}
uint64 CEMSocket::GetSentBytesCompleteFileSinceLastCallAndReset() {
sendLocker.Lock();
uint64 sentBytes = m_numberOfSentBytesCompleteFile;
m_numberOfSentBytesCompleteFile = 0;
sendLocker.Unlock();
return sentBytes;
}
uint64 CEMSocket::GetSentBytesPartFileSinceLastCallAndReset() {
sendLocker.Lock();
uint64 sentBytes = m_numberOfSentBytesPartFile;
m_numberOfSentBytesPartFile = 0;
sendLocker.Unlock();
return sentBytes;
}
uint64 CEMSocket::GetSentBytesControlPacketSinceLastCallAndReset() {
sendLocker.Lock();
uint64 sentBytes = m_numberOfSentBytesControlPacket;
m_numberOfSentBytesControlPacket = 0;
sendLocker.Unlock();
return sentBytes;
}
uint64 CEMSocket::GetSentPayloadSinceLastCallAndReset() {
sendLocker.Lock();
uint64 sentBytes = m_actualPayloadSizeSent;
m_actualPayloadSizeSent = 0;
sendLocker.Unlock();
return sentBytes;
}
void CEMSocket::OnSend(int nErrorCode){
//onSendWillBeCalledOuter = false;
if (nErrorCode){
OnError(nErrorCode);
return;
}
//EMTrace("CEMSocket::OnSend linked: %i, controlcount %i, standartcount %i, isbusy: %i",m_bLinkedPackets, controlpacket_queue.GetCount(), standartpacket_queue.GetCount(), IsBusy());
sendLocker.Lock();
m_bBusy = false;
// stopped sending here.
//StoppedSendSoUpdateStats();
if (byConnected == ES_DISCONNECTED) {
sendLocker.Unlock();
return;
} else
byConnected = ES_CONNECTED;
if(m_currentPacket_is_controlpacket) {
// queue up for control packet
theApp.uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent());
}
sendLocker.Unlock();
}
//void CEMSocket::StoppedSendSoUpdateStats() {
// if(m_startSendTick > 0) {
// m_lastSendLatency = ::GetTickCount()-m_startSendTick;
//
// if(m_lastSendLatency > 0) {
// if(m_wasBlocked == true) {
// SocketTransferStats newLatencyStat = { m_lastSendLatency, ::GetTickCount() };
// m_Average_sendlatency_list.AddTail(newLatencyStat);
// m_latency_sum += m_lastSendLatency;
// }
//
// m_startSendTick = 0;
// m_wasBlocked = false;
//
// CleanSendLatencyList();
// }
// }
//}
//
//void CEMSocket::CleanSendLatencyList() {
// while(m_Average_sendlatency_list.GetCount() > 0 && ::GetTickCount() - m_Average_sendlatency_list.GetHead().timestamp > 3*1000) {
// SocketTransferStats removedLatencyStat = m_Average_sendlatency_list.RemoveHead();
// m_latency_sum -= removedLatencyStat.latency;
// }
//}
/**
* Try to put queued up data on the socket.
*
* Control packets have higher priority, and will be sent first, if possible.
* Standard packets can be split up in several package containers. In that case
* all the parts of a split package must be sent in a row, without any control packet
* in between.
*
* @param maxNumberOfBytesToSend This is the maximum number of bytes that is allowed to be put on the socket
* this call. The actual number of sent bytes will be returned from the method.
*
* @param onlyAllowedToSendControlPacket This call we only try to put control packets on the sockets.
* If there's a standard packet "in the way", and we think that this socket
* is no longer an upload slot, then it is ok to send the standard packet to
* get it out of the way. But it is not allowed to pick a new standard packet
* from the queue during this call. Several split packets are counted as one
* standard packet though, so it is ok to finish them all off if necessary.
*
* @return the actual number of bytes that were put on the socket.
*/
SocketSentBytes CEMSocket::Send(uint32 maxNumberOfBytesToSend, uint32 minFragSize, bool onlyAllowedToSendControlPacket) {
//EMTrace("CEMSocket::Send linked: %i, controlcount %i, standartcount %i, isbusy: %i",m_bLinkedPackets, controlpacket_queue.GetCount(), standartpacket_queue.GetCount(), IsBusy());
sendLocker.Lock();
if (byConnected == ES_DISCONNECTED) {
sendLocker.Unlock();
SocketSentBytes returnVal = { false, 0, 0 };
return returnVal;
} else if (m_bBusy && onlyAllowedToSendControlPacket /*&& ::GetTickCount() - lastSent < 50*/) {
sendLocker.Unlock();
SocketSentBytes returnVal = { true, 0, 0 };
return returnVal;
}
if(minFragSize < 1) {
minFragSize = 1;
}
maxNumberOfBytesToSend = GetNextFragSize(maxNumberOfBytesToSend, minFragSize);
bool bWasLongTimeSinceSend = (::GetTickCount() - lastSent) > 1000;
lastCalledSend = ::GetTickCount();
boolean anErrorHasOccured = false;
uint32 sentStandardPacketBytesThisCall = 0;
uint32 sentControlPacketBytesThisCall = 0;
while(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall < maxNumberOfBytesToSend && anErrorHasOccured == false && // don't send more than allowed. Also, there should have been no error in earlier loop
(!controlpacket_queue.IsEmpty() || !standartpacket_queue.IsEmpty() || sendbuffer != NULL) && // there must exist something to send
(onlyAllowedToSendControlPacket == false || // this means we are allowed to send both types of packets, so proceed
sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall > 0 && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) % minFragSize != 0 ||
sendbuffer == NULL && !controlpacket_queue.IsEmpty() || // There's a control packet in queue, and we are not currently sending anything, so we will handle the control packet next
sendbuffer != NULL && m_currentPacket_is_controlpacket == true || // We are in the progress of sending a control packet. We are always allowed to send those
sendbuffer != NULL && m_currentPacket_is_controlpacket == false && bWasLongTimeSinceSend && !controlpacket_queue.IsEmpty() && standartpacket_queue.IsEmpty() && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize // We have waited to long to clean the current packet (which may be a standard packet that is in the way). Proceed no matter what the value of onlyAllowedToSendControlPacket.
)
) {
// If we are currently not in the progress of sending a packet, we will need to find the next one to send
if(sendbuffer == NULL) {
Packet* curPacket = NULL;
if(!controlpacket_queue.IsEmpty()) {
// There's a control packet to send
m_currentPacket_is_controlpacket = true;
curPacket = controlpacket_queue.RemoveHead();
} else if(!standartpacket_queue.IsEmpty() /*&& onlyAllowedToSendControlPacket == false*/) {
// There's a standard packet to send
m_currentPacket_is_controlpacket = false;
StandardPacketQueueEntry queueEntry = standartpacket_queue.RemoveHead();
curPacket = queueEntry.packet;
m_actualPayloadSize = queueEntry.actualPayloadSize;
// remember this for statistics purposes.
m_currentPackageIsFromPartFile = curPacket->IsFromPF();
} else {
// Just to be safe. Shouldn't happen?
sendLocker.Unlock();
// if we reach this point, then there's something wrong with the while condition above!
ASSERT(0);
theApp.QueueDebugLogLine(true,_T("EMSocket: Couldn't get a new packet! There's an error in the first while condition in EMSocket::Send()"));
SocketSentBytes returnVal = { true, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall };
return returnVal;
}
// We found a package to send. Get the data to send from the
// package container and dispose of the container.
sendblen = curPacket->GetRealPacketSize();
sendbuffer = curPacket->DetachPacket();
sent = 0;
delete curPacket;
}
// At this point we've got a packet to send in sendbuffer. Try to send it. Loop until entire packet
// is sent, or until we reach maximum bytes to send for this call, or until we get an error.
// NOTE! If send would block (returns WSAEWOULDBLOCK), we will return from this method INSIDE this loop.
while (sent < sendblen &&
sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall < maxNumberOfBytesToSend &&
(
onlyAllowedToSendControlPacket == false || // this means we are allowed to send both types of packets, so proceed
m_currentPacket_is_controlpacket ||
bWasLongTimeSinceSend && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize ||
(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) % minFragSize != 0
) &&
anErrorHasOccured == false) {
uint32 tosend = sendblen-sent;
if(!onlyAllowedToSendControlPacket || m_currentPacket_is_controlpacket) {
if (maxNumberOfBytesToSend >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > maxNumberOfBytesToSend-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall))
tosend = maxNumberOfBytesToSend-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall);
} else if(bWasLongTimeSinceSend && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) {
if (minFragSize >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > minFragSize-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall))
tosend = minFragSize-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall);
} else {
uint32 nextFragMaxBytesToSent = GetNextFragSize(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall, minFragSize);
if (nextFragMaxBytesToSent >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > nextFragMaxBytesToSent-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall))
tosend = nextFragMaxBytesToSent-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall);
}
ASSERT (tosend != 0 && tosend <= sendblen-sent);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -