📄 downloadclient.cpp
字号:
}
/* Barry - Originally this only wrote to disk when a full 180k block
had been received from a client, and only asked for data in
180k blocks.
This meant that on average 90k was lost for every connection
to a client data source. That is a lot of wasted data.
To reduce the lost data, packets are now written to a buffer
and flushed to disk regularly regardless of size downloaded.
This includes compressed packets.
Data is also requested only where gaps are, not in 180k blocks.
The requests will still not exceed 180k, but may be smaller to
fill a gap.
*/
void CUpDownClient::ProcessBlockPacket(char *packet, uint32 size, bool packed)
{
try {
// Ignore if no data required
if (!(GetDownloadState() == DS_DOWNLOADING || GetDownloadState() == DS_NONEEDEDPARTS))
return;
const int HEADER_SIZE = 24;
// Update stats
m_dwLastBlockReceived = ::GetTickCount();
// Read data from packet
CSafeMemFile *data = new CSafeMemFile((BYTE*)packet, size);
uchar fileID[16];
data->Read(fileID, 16);
// Check that this data is for the correct file
if ( (!reqfile) || memcmp(packet, reqfile->GetFileHash(), 16))
{
delete data;
throw CString(GetResString(IDS_ERR_WRONGFILEID) + " (ProcessBlockPacket)");
}
// Find the start & end positions, and size of this chunk of data
uint32 nStartPos;
uint32 nEndPos;
uint32 nBlockSize = 0;
data->Read(&nStartPos, 4);
if (packed)
{
data->Read(&nBlockSize, 4);
nEndPos = nStartPos + (size - HEADER_SIZE);
usedcompressiondown = true;
}
else
data->Read(&nEndPos,4);
delete data;
// Check that packet size matches the declared data size + header size (24)
if ( size != ((nEndPos - nStartPos) + HEADER_SIZE))
throw CString(GetResString(IDS_ERR_BADDATABLOCK)+" (ProcessBlockPacket)");
// Move end back one, should be inclusive
theApp.UpdateReceivedBytes(size - HEADER_SIZE);
m_nDownDataRateMS += size - HEADER_SIZE;
credits->AddDownloaded(size - HEADER_SIZE);
nEndPos--;
// Loop through to find the reserved block that this is within
Pending_Block_Struct *cur_block;
for (POSITION pos = m_PendingBlocks_list.GetHeadPosition(); pos != NULL; m_PendingBlocks_list.GetNext(pos))
{
cur_block = m_PendingBlocks_list.GetAt(pos);
if ((cur_block->block->StartOffset <= nStartPos) && (cur_block->block->EndOffset >= nStartPos))
{
// Found reserved block
// Remember this start pos, used to draw part downloading in list
m_nLastBlockOffset = nStartPos;
// Occasionally packets are duplicated, no point writing it twice
// This will be 0 in these cases, or the length written otherwise
uint32 lenWritten = 0;
// Handle differently depending on whether packed or not
if (!packed)
{
// Write to disk (will be buffered in part file class)
lenWritten = reqfile->WriteToBuffer(size - HEADER_SIZE,
(BYTE *) (packet + HEADER_SIZE),
nStartPos,
nEndPos,
cur_block->block );
}
else // Packed
{
// Create space to store unzipped data, the size is only an initial guess, will be resized in unzip() if not big enough
uint32 lenUnzipped = (size * 2);
// Don't get too big
if (lenUnzipped > (BLOCKSIZE + 300))
lenUnzipped = (BLOCKSIZE + 300);
BYTE *unzipped = new BYTE[lenUnzipped];
// Try to unzip the packet
int result = unzip(cur_block, (BYTE *)(packet + HEADER_SIZE), (size - HEADER_SIZE), &unzipped, &lenUnzipped);
if (result == Z_OK)
{
// Write any unzipped data to disk
if (lenUnzipped > 0)
{
// Use the current start and end positions for the uncompressed data
nStartPos = cur_block->block->StartOffset + cur_block->totalUnzipped - lenUnzipped;
nEndPos = cur_block->block->StartOffset + cur_block->totalUnzipped - 1;
if( nEndPos > cur_block->block->EndOffset){
theApp.emuledlg->AddDebugLogLine(false, GetResString(IDS_ERR_CORRUPTCOMPRPKG),reqfile->GetFileName(),666);
reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
}
else{
// Write uncompressed data to file
lenWritten = reqfile->WriteToBuffer(size - HEADER_SIZE,
unzipped,
nStartPos,
nEndPos,
cur_block->block );
}
}
}
else
{
theApp.emuledlg->AddDebugLogLine(false, GetResString(IDS_ERR_CORRUPTCOMPRPKG), reqfile->GetFileName(), result);
reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
}
delete [] unzipped;
}
// These checks only need to be done if any data was written
if (lenWritten > 0)
{
m_nTransferedDown += lenWritten;
// If finished reserved block
if (nEndPos == cur_block->block->EndOffset)
{
reqfile->RemoveBlockFromList(cur_block->block->StartOffset, cur_block->block->EndOffset);
delete cur_block->block;
// Not always allocated
if (cur_block->zStream) delete cur_block->zStream;
delete cur_block;
m_PendingBlocks_list.RemoveAt(pos);
// Request next block
SendBlockRequests();
}
}
// Stop looping and exit method
return;
}
}
} catch (...) {}
}
int CUpDownClient::unzip(Pending_Block_Struct *block, BYTE *zipped, uint32 lenZipped, BYTE **unzipped, uint32 *lenUnzipped, bool recursive)
{
int err = Z_DATA_ERROR;
try
{
// Save some typing
z_stream *zS = block->zStream;
// Is this the first time this block has been unzipped
if (zS == NULL)
{
// Create stream
block->zStream = new z_stream;
zS = block->zStream;
// Initialise stream values
zS->zalloc = (alloc_func)0;
zS->zfree = (free_func)0;
zS->opaque = (voidpf)0;
// Set output data streams, do this here to avoid overwriting on recursive calls
zS->next_out = (*unzipped);
zS->avail_out = (*lenUnzipped);
// Initialise the z_stream
err = inflateInit(zS);
if (err != Z_OK)
return err;
}
// Use whatever input is provided
zS->next_in = zipped;
zS->avail_in = lenZipped;
// Only set the output if not being called recursively
if (!recursive)
{
zS->next_out = (*unzipped);
zS->avail_out = (*lenUnzipped);
}
// Try to unzip the data
err = inflate(zS, Z_SYNC_FLUSH);
// Is zip finished reading all currently available input and writing all generated output
if (err == Z_STREAM_END)
{
// Finish up
err = inflateEnd(zS);
if (err != Z_OK)
return err;
// Got a good result, set the size to the amount unzipped in this call (including all recursive calls)
(*lenUnzipped) = (zS->total_out - block->totalUnzipped);
block->totalUnzipped = zS->total_out;
}
else if ((err == Z_OK) && (zS->avail_out == 0))
{
// Output array was not big enough, call recursively until there is enough space
// What size should we try next
uint32 newLength = (*lenUnzipped) *= 2;
if (newLength == 0)
newLength = lenZipped * 2;
// Copy any data that was successfully unzipped to new array
BYTE *temp = new BYTE[newLength];
memcpy(temp, (*unzipped), (zS->total_out - block->totalUnzipped));
delete [] (*unzipped);
(*unzipped) = temp;
(*lenUnzipped) = newLength;
// Position stream output to correct place in new array
zS->next_out = (*unzipped) + (zS->total_out - block->totalUnzipped);
zS->avail_out = (*lenUnzipped) - (zS->total_out - block->totalUnzipped);
// Try again
err = unzip(block, zS->next_in, zS->avail_in, unzipped, lenUnzipped, true);
}
else if ((err == Z_OK) && (zS->avail_in == 0))
{
// All available input has been processed, everything ok.
// Set the size to the amount unzipped in this call (including all recursive calls)
(*lenUnzipped) = (zS->total_out - block->totalUnzipped);
block->totalUnzipped = zS->total_out;
}
else
{
// Should not get here unless input data is corrupt
theApp.emuledlg->AddDebugLogLine(false,"Unexpected zip error");
// DebugBreak(); I removed this so that we could let the client run for more the five minutes.. Barry needs to see if there is or isn't a preoblem here..
}
if (err != Z_OK)
(*lenUnzipped) = 0;
} catch (...) {}
return err;
}
uint32 CUpDownClient::CalculateDownloadRate(){
m_AvarageDDR_list.AddTail(m_nDownDataRateMS);
m_nSumForAvgDownDataRate += m_nDownDataRateMS;
if (m_AvarageDDR_list.GetCount() > 300)
m_nSumForAvgDownDataRate -= m_AvarageDDR_list.RemoveHead ();
m_nDownDataRateMS = 0;
if(m_AvarageDDR_list.GetCount() > 10)
m_nDownDatarate = 10 * m_nSumForAvgDownDataRate / m_AvarageDDR_list.GetCount();
else
m_nDownDatarate = 0;
m_cShowDR++;
if (m_cShowDR == 30){
m_cShowDR = 0;
//theApp.emuledlg->transferwnd.downloadlistctrl.UpdateItem(this);
UpdateDisplayedInfo();
}
if ((::GetTickCount() - m_dwLastBlockReceived) > DOWNLOADTIMEOUT){
Packet* packet = new Packet(OP_CANCELTRANSFER,0);
theApp.uploadqueue->AddUpDataOverheadFileRequest(packet->size);
socket->SendPacket(packet,true,true);
SetDownloadState(DS_ONQUEUE);
}
return m_nDownDatarate;
}
uint16 CUpDownClient::GetAvailablePartCount(){
uint16 result = 0;
for (int i = 0;i != m_nPartCount;i++){
if (IsPartAvailable(i))
result++;
}
return result;
}
void CUpDownClient::SetRemoteQueueRank(uint16 nr){
m_nRemoteQueueRank = nr;
//theApp.emuledlg->transferwnd.downloadlistctrl.UpdateItem(this);
UpdateDisplayedInfo();
}
bool CUpDownClient::SwapToAnotherFile(bool bIgnoreNoNeeded){
if (GetDownloadState() == DS_DOWNLOADING)
return false;
CPartFile* SwapTo = NULL;
if (!m_OtherRequests_list.IsEmpty()){
for (POSITION pos = m_OtherRequests_list.GetHeadPosition();pos != 0;m_OtherRequests_list.GetNext(pos)){
CPartFile* cur_file = m_OtherRequests_list.GetAt(pos);
if (cur_file != reqfile && theApp.downloadqueue->IsPartFile(cur_file) && !cur_file->IsStopped() && (cur_file->GetStatus(false) == PS_READY || cur_file->GetStatus(false) == PS_EMPTY) ){
SwapTo = cur_file;
m_OtherRequests_list.RemoveAt(pos);
break;
}
}
}
if (!SwapTo && bIgnoreNoNeeded){
for (POSITION pos = m_OtherNoNeeded_list.GetHeadPosition();pos != 0;m_OtherNoNeeded_list.GetNext(pos)){
CPartFile* cur_file = m_OtherNoNeeded_list.GetAt(pos);
if (cur_file != reqfile && theApp.downloadqueue->IsPartFile(cur_file) && !cur_file->IsStopped() && (cur_file->GetStatus(false) == PS_READY || cur_file->GetStatus(false) == PS_EMPTY) ){
SwapTo = cur_file;
m_OtherNoNeeded_list.RemoveAt(pos);
break;
}
}
}
if (SwapTo){
m_OtherNoNeeded_list.AddHead(reqfile);
theApp.downloadqueue->RemoveSource(this);
m_nRemoteQueueRank = 0;
if (m_abyPartStatus){
delete[] m_abyPartStatus;
m_abyPartStatus = 0;
}
m_dwLastAskedTime = 0;
m_iRate=0;
m_strComment="";
theApp.downloadqueue->CheckAndAddKnownSource(SwapTo,this);
return true;
}
else
return false;
}
void CUpDownClient::UDPReaskACK(uint16 nNewQR){
m_bUDPPending = false;
SetRemoteQueueRank(nNewQR);
m_dwLastAskedTime = ::GetTickCount();
}
void CUpDownClient::UDPReaskFNF(){
m_bUDPPending = false;
theApp.emuledlg->AddDebugLogLine(false,CString("UDP ANSWER FNF : %s"),GetUserName());
theApp.downloadqueue->RemoveSource(this);
if (!socket)
Disconnected();
}
void CUpDownClient::UDPReaskForDownload(){
ASSERT ( reqfile );
if(!reqfile || m_bUDPPending)
return;
//the line "m_bUDPPending = true;" use to be here
if(m_nUDPPort != 0 && theApp.glob_prefs->GetUDPPort() != 0 &&
!theApp.serverconnect->IsLowID() && !HasLowID() && !(socket && socket->IsConnected())) {
//don't use udp to ask for sources
if(IsSourceRequestAllowed())
return;
m_bUDPPending = true;
Packet* response = new Packet(OP_REASKFILEPING,16,OP_EMULEPROT);
memcpy(response->pBuffer,reqfile->GetFileHash(),16);
theApp.uploadqueue->AddUpDataOverheadFileRequest(response->size);
theApp.clientudp->SendPacket(response,GetIP(),GetUDPPort());
}
}
// Barry - Sets string to show parts downloading, eg NNNYNNNNYYNYN
void CUpDownClient::ShowDownloadingParts(CString *partsYN)
{
Requested_Block_Struct *cur_block;
int x;
// Initialise to all N's
char *n = new char[m_nPartCount+1];
_strnset(n, 'N', m_nPartCount);
n[m_nPartCount] = 0;
partsYN->SetString(n, m_nPartCount);
delete [] n;
for (POSITION pos = m_PendingBlocks_list.GetHeadPosition(); pos != 0; m_PendingBlocks_list.GetNext(pos))
{
cur_block = m_PendingBlocks_list.GetAt(pos)->block;
x = (cur_block->StartOffset / PARTSIZE);
partsYN->SetAt(x, 'Y');
}
}
void CUpDownClient::UpdateDisplayedInfo(boolean force) {
DWORD curTick = ::GetTickCount();
if(force || curTick-m_lastRefreshedDLDisplay > MINWAIT_BEFORE_DLDISPLAY_WINDOWUPDATE+(uint32)(rand()/(RAND_MAX/1000))) {
theApp.emuledlg->transferwnd.downloadlistctrl.UpdateItem(this);
theApp.emuledlg->transferwnd.clientlistctrl.RefreshClient(this);
m_lastRefreshedDLDisplay = curTick;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -