urlthread.py
来自「Harvestman-最新版本」· Python 代码 · 共 877 行 · 第 1/2 页
PY
877 行
""" The queue is considered blocked if all threads are waiting for data, and no data is coming """ blocked = self._get_num_blocked_threads() if blocked == len(self._threads): return True else: return False def push(self, urlObj): """ Push the url object and start downloading the url """ # print 'Pushed',urlObj.get_full_url() # unpack the tuple try: filename, url = urlObj.get_full_filename(), urlObj.get_full_url() except: return # Wait till we have a thread slot free, and push the # current url's info when we get one try: self.put( urlObj ) urlObj.qstatus = urlparser.URL_IN_QUEUE # If this URL was multipart, mark it as such self._multipartstatus[url] = MULTIPART_DOWNLOAD_QUEUED except Full: self.buffer.append(urlObj) def get_next_urltask(self): # Insert a random sleep in range # of 0 - 0.5 seconds # time.sleep(random.random()*0.5) try: if len(self.buffer): # Get last item from buffer item = buffer.pop() return item else: # print 'Waiting to get item',threading.currentThread() item = self.get() return item except Empty: return None def notify(self, thread): """ Method called by threads to notify that they have finished """ try: self._cond.acquire() # Mark the time stamp (last thread report time) self._ltrt = time.time() urlObj = thread.get_urlobject() # See if this was a multi-part download if urlObj.trymultipart: status = thread.get_status() if status == CONNECT_YES_DOWNLOADED: extrainfo('Thread %s reported %s' % (thread, urlObj.get_full_url())) # For flush mode, get the filename # for memory mode, get the data datamode = self._cfg.datamode fname, data = '','' if datamode == CONNECTOR_DATA_MODE_FLUSH: fname = thread.get_tmpfname() datalen = os.path.getsize(fname) else: data = thread.get_data() datalen = len(data) # See if the data was downloaded fully...,else reschedule this piece expected = (urlObj.range[-1] - urlObj.range[0]) + 1 if datalen != expected: extrainfo("Expected: %d, Got: %d" % (expected, datalen)) extrainfo("Thread %s did only a partial download, rescheduling this piece..." % thread) if self._monitor: # print 'Notifying failure',thread self._monitor.notify_failure(urlObj, thread) return index = urlObj.mirror_url.index # print 'Index=>',index if index in self._multipartdata: infolist = self._multipartdata[index] if data: infolist.append((urlObj.range[0],data)) elif fname: infolist.append((urlObj.range[0],fname)) else: infolist = [] if data: infolist.append((urlObj.range[0],data)) elif fname: infolist.append((urlObj.range[0],fname)) #else: # self._parts -= 1 # AD-HOC self._multipartdata[index] = infolist # print 'Length of data list is',len(infolist),self._parts if len(infolist)==self._parts: # Sort the data list according to byte-range infolist.sort() # Download of this URL is complete... logconsole('Download of %s is complete...' % urlObj.get_full_url()) if datamode == CONNECTOR_DATA_MODE_INMEM: data = ''.join([item[1] for item in infolist]) self._multipartdata['data:' + str(index)] = data else: pass self._multipartstatus[index] = MULTIPART_DOWNLOAD_COMPLETED else: # Currently when a thread reports an error, we abort the download # In future, we can inspect whether the error is fatal or not # and resume download in another thread etc... extrainfo('Thread %s reported error => %s' % (str(thread), str(thread.get_error()))) if self._monitor: # print 'Notifying failure',thread self._monitor.notify_failure(urlObj, thread) # print 'Notified failure',thread # if the thread failed, update failure stats on the data manager err = thread.get_error() tstatus = thread.get_status() # Either file was fetched or file was uptodate if err.number in (0, 304): # thread succeeded, increment file count stats on the data manager objects.datamgr.update_file_stats( urlObj, tstatus) finally: self._cond.release() def has_busy_threads(self): """ Return whether I have any busy threads """ val=0 for thread in self._threads: if thread.is_busy(): val += 1 break return val def get_busy_threads(self): """ Return a list of busy threads """ return [thread for thread in self._threads if thread.is_busy()] def get_busy_count(self): """ Return a count of busy threads """ return len(self.get_busy_threads()) def get_busy_figure(self): s='' for t in self._threads: if t.is_busy(): s=s + t.getName().split('-')[-1] + ' ' return s def wait(self, period, timeout): # Wait for the pool to signal that there # are no more busy threads # Note: timeout must be > period count = 0.0 while self.has_busy_threads(): self._endcond.acquire() try: self._endcond.wait(period) count += period self.end_hanging_threads() except IOError, e: break self._endcond.release() if count>=timeout: break def locate_thread(self, url): """ Find a thread which downloaded a certain url """ for x in self._threads: if not x.is_busy(): if x.get_url() == url: return x return None def locate_busy_threads(self, url): """ Find all threads which are downloading a certain url """ threads=[] for x in self._threads: if x.is_busy(): if x.get_url() == url: threads.append(x) return threads def check_duplicates(self, urlobj): """ Avoid downloading same url file twice. It can happen that same url is linked from different web pages """ filename = urlobj.get_full_filename() url = urlobj.get_full_url() # First check if any thread is in the process # of downloading this url. if self.locate_thread(url): debug('Another thread is downloading %s' % url) return True return False def end_hanging_threads(self): """ If any download thread is running for too long, kill it, and remove it from the thread pool """ pool=[] for thread in self._threads: if thread.long_running(): pool.append(thread) for thread in pool: extrainfo('Killing hanging thread ', thread) # remove this thread from the thread list self._threads.remove(thread) # kill it try: thread.terminate() except HarvestManUrlThreadInterrupt: pass del thread def end_all_threads(self): """ Kill all running threads """ try: self._cond.acquire() for t in self._threads: try: t.terminate() t.join() except HarvestManUrlThreadInterrupt, e: debug(str(e)) pass self._threads = [] finally: self._cond.release() def stop_all_threads(self): """ Stop all running threads """ # Same as end_all_threads but only that # we don't print the killed message. try: self._cond.acquire() for t in self._threads: try: t.terminate() t.join() except HarvestManUrlThreadInterrupt, e: pass self._threads = [] finally: self._cond.release() def remove_finished_threads(self): """ Clean up all threads that have completed """ for thread in self._threads: if not thread.is_busy(): self._threads.remove(thread) del thread def last_thread_report_time(self): """ Return the last thread reported time """ return self._ltrt def get_multipart_download_status(self, url): """ Get status of multipart downloads """ # If a thread has failed, signal exit if self._multiparterror: return MULTIPART_DOWNLOAD_ERROR else: return self._multipartstatus.get(url.index, MULTIPART_DOWNLOAD_STATUS_UNKNOWN) def get_multipart_url_data(self, url): """ Return data for multipart downloads """ return self._multipartdata.get('data:'+ str(url.index), '') def get_multipart_url_info(self, url): """ Return information for multipart downloads """ return self._multipartdata.get(url.index, '') def dead_thread_callback(self, t): """ Call back function called by a thread if it dies with an exception. This class then creates a fresh thread, migrates the data of the dead thread to it """ try: self._cond.acquire() new_t = HarvestManUrlThread(t.getName(), self._timeout, self) # Migrate data and start thread if new_t: new_t.set_urlobject(t.get_urlobject()) # Replace dead thread in the list idx = self._threads.index(t) self._threads[idx] = new_t new_t.start() else: # Could not make new thread, remove # current thread anyway self._threads.remove(t) finally: self._cond.release() def get_threads(self): """ Return the list of thread objects """ return self._threads def get_thread_urls(self): """ Return a list of current URLs being downloaded """ # This returns a list of URL objects, not URL strings urlobjs = [] for t in self._threads: if t.is_busy(): urlobj = t.get_urlobject() if urlobj: urlobjs.append(urlobj) return urlobjs def reset_multipart_data(self): """ Reset multipart related state """ self._multiparterror = False self._multipartdata.clear() self._multipartdata = {} self._multipartstatus.clear() self._multipartstatus = {} class HarvestManUrlThreadPoolMonitor(threading.Thread): def __init__(self, threadpool): self._pool = threadpool self._pool._monitor = self self.lock = threading.Lock() self._failedurls = [] self._listfailed = [] self._flag = False # Mirror manager self.mirrormgr = HarvestManMirrorManager.getInstance() # initialize threading threading.Thread.__init__(self, None, None, "Monitor") def run(self): while not self._flag: try: self.lock.acquire() items = [] self._failedurls = self._listfailed[:] for urlobj, urlerror in self._failedurls: # Reset URL to parent and try again... if urlobj.mirrored: # Try getting a new mirror URL new_urlobj = self.mirrormgr.get_different_mirror_url(urlobj, urlerror) if new_urlobj: extrainfo("New mirror URL=>", new_urlobj.get_full_url()) items.append((urlobj, urlerror)) self._pool.push(new_urlobj) else: logconsole('Could not find new working mirror. Exiting...') self._pool._multiparterror = True self._listfailed = [] break else: logconsole("URL is not mirrored, so no new mirrors to try. Exiting...") self._pool._multiparterror = True break for item in items: self._listfailed.remove(item) self.lock.release() time.sleep(0.1) finally: pass def notify_failure(self, urlobj, thread): self.lock.acquire() self._listfailed.append((urlobj, thread.get_error())) self.lock.release() def stop(self): self._flag = True def reset(self): """ Reset the state """ self._listfailed = [] self._failedurls = []
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?