📄 task.cpp
字号:
for (i = 0; i < ThreadCnt; i++) { if (ThreadQue[i]->Status == LOGINFAIL) Cnt++; } return Cnt;}int TTask :: JoinDownloads(){ const int FILE_CHUNK = 8024; FILE *fp; FILE *dl_file; char buffer[FILE_CHUNK]; int i, j; int Sum = 0; if ((fp = fopen(FilePath, "wb")) == NULL) return -1; for (i = 0; i < ThreadCnt; i++) { if (!(dl_file = fopen(ThreadQue[i]->LocalFile, "rb"))) return -1; while ((j = fread(buffer, sizeof(char), FILE_CHUNK, dl_file))) { Sum += j; if (fwrite(buffer, sizeof(char), j, fp) != j) return -1; #ifdef DEBUG ShowMsg("Building TargetFile has completed %.1f%%", (float)Sum / ThreadQue[0]->u->FileSize * 100);#endif } fclose(dl_file); } fclose(fp);}int TTask :: DeleteDownloads(void){ int i; for (i = 0; i < ThreadCnt; i++) { if (unlink(ThreadQue[i]->LocalFile) == -1) { if (errno == ENOENT) continue; else return -1; } } return 0;}void TTask :: InitThreads(TUrl * u, long FileSize, char * FileMode){ int i; long BytesPerConn; long BytesLeft; char buffer[MAXPATHLEN]; if(FileSize == -1) { BytesPerConn = -1; BytesLeft = -1; } else { BytesPerConn = FileSize / ThreadCnt; BytesLeft = FileSize % ThreadCnt; } for (i = 0; i < ThreadCnt; i++) { sprintf(ThreadQue[i]->Name, "%s%d", "Thread", i); ThreadQue[i]->u = u; ThreadQue[i]->Task = this; sprintf(buffer, "%s%s%d", u->HostFileName, DEFAULT_FILE_EXT, i); ThreadQue[i]->LocalFile = strdup(buffer); ThreadQue[i]->FileMode = strdup(FileMode); if(FileSize == -1) { ThreadQue[i]->BlkSize = -1; ThreadQue[i]->RemoteStartPos = 0; ThreadQue[i]->RemoteEndPos = -1; } else { ThreadQue[i]->BlkSize = BytesPerConn; ThreadQue[i]->RemoteStartPos = i * BytesPerConn; if(u->Proto == ptFTP) ThreadQue[i]->RemoteEndPos = (i + 1) * BytesPerConn; if(u->Proto == ptHTTP) ThreadQue[i]->RemoteEndPos = (i + 1) * BytesPerConn - 1; } ThreadQue[i]->LocalStartPos = 0; ThreadQue[i]->Status = IDLE; ThreadQue[i]->Retry = true; ThreadQue[i]->TryAttempts = 0; } if(FileSize != -1) { ThreadQue[--i]->BlkSize += BytesLeft; ThreadQue[--i]->RemoteEndPos += BytesLeft; } if(u->Proto == ptFTP && Mode == RESUME) { if (ResumeModifyFtpConns() != 0) Panic("A file error while accessing the DL'ed files %s", strerror(errno)); } if(u->Proto == ptHTTP && Mode == RESUME) { if (ResumeModifyHttpConns() != 0) Panic("A file error while accessing the DL'ed files %s", strerror(errno)); }}//typedef void * (*ThreadFunc)(void *)/* fn ----> FtpThread or HttpThread */int TTask :: ExecDownloads( ThreadFunc fn ){ int i; for (i = 0; i < ThreadCnt; i++) { if(CreateThread(ThreadQue[i], fn) != 0) { return -1; //Panic("Error: Not enough memory to create thread"); break; } } /* for (i = 0; i < ThreadCnt; i++) { JoinThread(ThreadQue[i]); }*/ // return ALLDONE;}void TTask :: SetMode(RunMode val){ Mode = val;}int TTask :: ResumeModifyFtpSingleConn(TThread * This){ struct stat buffer; if (stat(This->LocalFile, &buffer) == -1) { if (errno == ENOENT) return 0; else return -1; } if (buffer.st_size == (This->RemoteEndPos - This->RemoteStartPos)) This->Status = ALLDONE; This->RemoteStartPos += (buffer.st_size - This->LocalStartPos); This->RemoteBytesReceived = buffer.st_size; This->LocalStartPos = buffer.st_size; return 0;}int TTask :: ResumeModifyFtpConns(void) { int i; for (i = 0; i < ThreadCnt; i++) { if (ResumeModifyFtpSingleConn(ThreadQue[i]) != 0) return -1; } return 0;}int TTask :: ResumeModifyHttpSingleConn(TThread * This){ struct stat buffer; if (stat(This->LocalFile, &buffer) == -1) { if (errno == ENOENT) return 0; else return -1; } if (buffer.st_size == (This->RemoteEndPos - This->RemoteStartPos + 1)) This->Status = ALLDONE; This->RemoteStartPos += (buffer.st_size - This->LocalStartPos); This->RemoteBytesReceived = buffer.st_size; This->LocalStartPos = buffer.st_size; return 0;}int TTask :: ResumeModifyHttpConns(void){ int i; for (i = 0; i < ThreadCnt; i++) { if (ResumeModifyHttpSingleConn(ThreadQue[i]) != 0) return -1; } return 0;}/*//return value// -1 ==> Panic("Error: failed to create thread!");// -2 ==> MAXTRYS // -3 ==> LOGINFAIL// -4 ==> CONREJECT// -5 ==> LOCALFATAL*/ int TTask :: HandleThreads(ThreadFunc fn){ int i; static int MaxSimConn = 0; for (i = 0; i < ThreadCnt; i++) { switch (ThreadQue[i]->Status) { case MAXTRYS: //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; if(CreateThread(ThreadQue[i], fn) != 0) return -1; break; case LOGINFAIL: if(IsAllLoginFailed() == true) { //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; if(CreateThread(ThreadQue[i], fn) != 0) return -1; break; } else { int DCnt = QueryDownloadingCnt(); int ConnCnt = QueryConnectingCnt(); int LoggingCnt = QueryLoggingCnt(); if (DCnt == 0 && (ConnCnt == 0) && (LoggingCnt == 0)) { //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; //pthread_mutex_lock(&StatusMutex); LockThread(this); if(CreateThread(ThreadQue[i], fn) != 0) return -1; ThreadWaitCond(this); UnlockThread(this); //pthread_cond_wait(&ConnectingCond, &StatusMutex); //pthread_mutex_unlock(&StatusMutex); break; } else { if (DCnt > MaxSimConn) { MaxSimConn = DCnt; break; } if ((DCnt < MaxSimConn) && (ConnCnt == 0) && (LoggingCnt == 0)) { //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; //pthread_mutex_lock(&StatusMutex); LockThread(this); if(CreateThread(ThreadQue[i], fn) != 0) return -1; ThreadWaitCond(this); UnlockThread(this); //pthread_cond_wait(&ConnectingCond, &StatusMutex); //pthread_mutex_unlock(&StatusMutex); break; } } } break; case CONREJECT: if (IsAllConnRejected() == true) { //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; if(CreateThread(ThreadQue[i], fn) != 0) return -1; break; } else { int DCnt = QueryDownloadingCnt(); int ConnCnt = QueryConnectingCnt(); int LoggingCnt = QueryLoggingCnt(); if (DCnt == 0 && ConnCnt == 0 && LoggingCnt == 0) { //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; //pthread_mutex_lock(&StatusMutex); LockThread(this); if(CreateThread(ThreadQue[i], fn) != 0) return -1; ThreadWaitCond(this); UnlockThread(this); //pthread_cond_wait(&ConnectingCond, &StatusMutex); //pthread_mutex_unlock(&StatusMutex); break; } else { if (DCnt > MaxSimConn) { MaxSimConn = DCnt; break; } if (DCnt < MaxSimConn && ConnCnt == 0 && LoggingCnt == 0) { //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; //pthread_mutex_lock(&StatusMutex); LockThread(this); if(CreateThread(ThreadQue[i], fn) != 0) return -1; ThreadWaitCond(this); UnlockThread(this); //pthread_cond_wait(&ConnectingCond, &StatusMutex); //pthread_mutex_unlock(&StatusMutex); break; } } } break; case LOCALFATAL: return -5; break; case LOGGININ : { struct timeval CurTime; gettimeofday(&CurTime, NULL); int t = CurTime.tv_sec - ThreadQue[i]->BeginTime.tv_sec;/* * ShowMsg("%s\'s LOGGININ Status has cost %d second ...", * ThreadQue[i]->Name, t); */ if(t > 15) { CancelThread(ThreadQue[i]); //JoinThread(ThreadQue[i]); ThreadQue[i]->Status = IDLE; if(CreateThread(ThreadQue[i], fn) != 0) return -1; break; } } default: break; } }}void TTask :: TerminateThreads(void){ int i; for (i = 0; i < ThreadCnt; i++) CancelThread(ThreadQue[i]);/* for(i = 0; i < ThreadCnt; i++) JoinThread(ThreadQue[i]);*/}long TTask::GetEstTime(long BytesLeft, float CurSpeed){ long SecondsLeft; if(BytesLeft < 0 || CurSpeed <=0) return 0 ; else SecondsLeft = (long)(BytesLeft / CurSpeed); /* if(SecondsLeft >=3600) { printf("Time remaining %d hours %d minutes %d seconds", SecondsLeft / 3600, SecondsLeft % 3600 / 60, (SecondsLeft % 3600) % 60); } else if(SecondsLeft >=60) { printf("Time remaining %d minutes %d seconds", SecondsLeft % 3600 / 60, (SecondsLeft % 3600) % 60); } else printf("Time remaining %d seconds", SecondsLeft); */ return SecondsLeft; }#ifdef USE_QTvoid TTask :: RefreshDisplay(){// int i = 0;// long FileSize, BlkSize;// long AllRemoteBytesReceived; QString Status; float Received ; float AvgSpeed; float Percent; // struct timeval CurTime;// float TotalAvgSpeed; // long TimeOfCost; long nConn = GetThreadCnt(); long FileSize = u->FileSize; HandleThreads(fn); //////////////////////Refresh Task Infos /////////////////////////// /* AllRemoteBytesReceived = GetAllRemoteBytesReceived(); TotalAvgSpeed = GetTotalAvgSpeed(); if(FileSize == -1) { QString str6 = "UNKNOWN"; QString str7 = QString("%1K").arg((float)AllRemoteBytesReceived / 1024, 5, 'f', 2); msg->SetFileSize( str6 ); msg->SetAvgSpeed( str7 ); } else { QString str6 = QString("%1K").arg(FileSize / 1024, 5, 'f', 2); QString str7 = QString("%1K").arg((float)AllRemoteBytesReceived / 1024, 5, 'f', 2); msg->SetFileSize( str6 ); msg->SetAvgSpeed( str7 ); } int TimeLeft = GetEstTime(FileSize - AllRemoteBytesReceived, TotalAvgSpeed); QString str8 = QString("%1Sec").arg(TimeLeft); msg->SetTimeLeft(str8); QString str9 = QString("%1K").arg(AllRemoteBytesReceived / 1024); msg->SetReceived(str9); */ if (IsAllCompleted() == true) { if( this->ts == FINISHED ) return ; SetdlEndTime(); ShowMsg("Total cost time of download is : %d Sec", GetTotalCostTime()); ShowMsg("File Succesfully Retreived, Now joining the temp file"); if(JoinDownloads() == -1) { ShowMsg("Error: unable to open the file for \ writing-: %s", GetOutFilePath()); return; } ShowMsg("Deleting the temp files"); if ( DeleteDownloads() == -1 ) ShowMsg( "unable to delete the temp files" ); if ( plog->DeleteLogFile() == -1 ) ShowMsg( "logfile doesn't exist" ); ShowMsg( "All Done: Download Succesfull!" ); this->ts = FINISHED; } //////////////////////Refresh Thread Infos /////////////////////////// ThreadMsgs->clear(); for ( unsigned int i = 0; i < nConn; i++) { Status = GetCurThreadStatus(ThreadQue[i]); Received = GetCurThreadBytesReceived(ThreadQue[i]); AvgSpeed = GetCurThreadAvgSpeed(ThreadQue[i]); if(FileSize == -1) Percent = 0; else Percent = GetCurThreadPercent(ThreadQue[i]); QString str1 = QString("%1").arg(i + 1); QString str2 = Status; QString str3 = QString("%1K").arg( Received, 5, 'f', 2 ); QString str4 = QString("%1K").arg(AvgSpeed, 5, 'f', 2); QString str5 = QString("%1\%").arg(Percent, 5, 'f', 2); ThreadMsg * tm = new ThreadMsg( str1, str2, str3, str4, str5 ); msg->addThreadMsg( tm ); (void)new ThreadMsgListItem(ThreadMsgs, tm ); }}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -