urlthread.py
来自「Harvestman-最新版本」· Python 代码 · 共 877 行 · 第 1/2 页
PY
877 行
# -- coding: utf-8""" urlthread.py - Url thread downloader module. Has two classes, one for downloading of urls and another for managing the url threads. This module is part of the HarvestMan program. Author: Anand B Pillai <abpillai at gmail dot com> Modification History Jan 10 2006 Anand Converted from dos to unix format (removed Ctrl-Ms). Jan 20 2006 Anand Small change in printing debug info in download method. Mar 05 2007 Anand Implemented http 304 handling in notify(...). Apr 09 2007 Anand Added check to make sure that threads are not re-started for the same recurring problem. Copyright (C) 2004 Anand B Pillai."""__version__ = '2.0 b1'__author__ = 'Anand B Pillai'import os, sysimport mathimport timeimport threadingimport copyimport randomimport shafrom collections import dequefrom Queue import Queue, Full, Emptyfrom harvestman.lib import urlparserfrom harvestman.lib.mirrors import HarvestManMirrorManagerfrom harvestman.lib.common.common import *from harvestman.lib.common.macros import *class HarvestManUrlThreadInterrupt(Exception): """ Interrupt raised to kill a harvestManUrlThread class's object """ def __init__(self, value): self.value = value def __str__(self): return str(self.value)class HarvestManUrlThread(threading.Thread): """ Class to download a url in a separate thread """ # The last error which caused a thread instance to die _lasterror = None def __init__(self, name, timeout, threadpool): """ Constructor, the constructor takes a url, a filename , a timeout value, and the thread pool object pooling this thread """ # url Object (This is an instance of urlPathParser class) self._urlobject = None # thread queue object pooling this thread self._pool = threadpool # max lifetime for the thread self._timeout = timeout # start time of thread self._starttime = 0 # start time of a download self._dstartime = 0 # sleep time self._sleepTime = 1.0 # error object self._error = None # download status self._downloadstatus = 0 # busy flag self._busyflag = False # end flag self._endflag = False # Url data, only used for CONNECTOR_DATA_MODE_INMEM self._data = '' # Url temp file, used for mode CONNECTOR_DATA_MODE_FLUSH self._urltmpfile = '' # Current connector self._conn = None # initialize threading threading.Thread.__init__(self, None, None, name) def __str__(self): return self.getName() def get_error(self): """ Get error object of this thread """ return self._error def get_status(self): """ Get the download status of this thread """ return self._downloadstatus def get_data(self): """ Return the data of this thread """ return self._data def get_tmpfname(self): """ Return the temp filename if any """ return self._urltmpfile def set_tmpfname(self, filename): """ Set the temporary filename """ # Typically called by connector objects self._urltmpfile = filename def set_status(self, status): """ Set the download status of this thread """ self._downloadstatus = status def is_busy(self): """ Get busy status for this thread """ return self._busyflag def set_busy_flag(self, flag): """ Set busy status for this thread """ self._busyflag = flag def join(self): """ The thread's join method to be called by other threads """ threading.Thread.join(self, self._timeout) def terminate(self): """ Kill this thread """ self.stop() msg = 'Download thread, ' + self.getName() + ' killed!' raise HarvestManUrlThreadInterrupt, msg def stop(self): """ Stop this thread """ # If download was not completed, push-back object # to the pool. if self._downloadstatus==0 and self._urlobject: self._pool.push(self._urlobject) self._endflag = True def download(self, url_obj): """ Download this url """ # Set download status self._downloadstatus = 0 self._dstartime = time.time() url = url_obj.get_full_url() if not url_obj.trymultipart: # print 'Gewt URL=>',url,self if url_obj.is_image(): extrainfo('Downloading image ...', url) else: extrainfo('Downloading url ...', url) else: startrange = url_obj.range[0] endrange = url_obj.range[-1] # print "Got URL",url,self extrainfo('%s: Downloading url %s, byte range(%d - %d)' % (str(self),url,startrange,endrange)) # This call will block if we exceed the number of connections self._conn = objects.connfactory.create_connector() mode = self._conn.get_data_mode() if not url_obj.trymultipart: res = self._conn.save_url(url_obj) else: # print 'Downloading URL',url,self res = self._conn.wrapper_connect(url_obj) # print 'Connector returned',self,url_obj.get_full_url() if mode == CONNECTOR_DATA_MODE_FLUSH: self._urltmpfile = self._conn.get_tmpfname() elif mode == CONNECTOR_DATA_MODE_INMEM: self._data = self._conn.get_data() # Add page hash to URL object data = self._conn.get_data() # Update pagehash on the URL object if data: url_obj.pagehash = sha.new(data).hexdigest() # Remove the connector from the factory objects.connfactory.remove_connector(self._conn) # Set this as download status self._downloadstatus = res # get error flag from connector self._error = self._conn.get_error() self._conn = None # Notify thread pool self._pool.notify(self) if SUCCESS(res): if not url_obj.trymultipart: extrainfo('Finished download of ', url) else: startrange = url_obj.range[0] endrange = url_obj.range[-1] debug('Finished download of byte range(%d - %d) of %s' % (startrange,endrange, url)) elif self._error.number != 304: error('Failed to download URL',url) objects.datamgr.update_url(url_obj) def run(self): """ Run this thread """ while not self._endflag: try: self._starttime=time.time() # print 'Waiting for next URL task',self url_obj = self._pool.get_next_urltask() # Dont do duplicate checking for multipart... if not url_obj.trymultipart and self._pool.check_duplicates(url_obj): print 'Is duplicate',url_obj.get_full_url() continue if not url_obj: time.sleep(0.1) continue # set busy flag to 1 self._busyflag = True # Save reference self._urlobject = url_obj # print 'Got url=>',url_obj.get_full_url() filename, url = url_obj.get_full_filename(), url_obj.get_full_url() if not filename and not url: return # Perf fix: Check end flag # in case the program was terminated # between start of loop and now! if not self._endflag: self.download(url_obj) # reset busyflag # print 'Setting busyflag to False',self self._busyflag = False except Exception, e: raise error('Worker thread Exception',e) # Now I am dead - so I need to tell the pool # object to migrate my data and produce a new thread. # See class for last error. If it is same as # this error, don't do anything since this could # be a programming error and will send us into # a loop... # Set busyflag to False self._busyflag = False # Remove the connector from the factory if self._conn and (not self._conn.is_released()): objects.connfactory.remove_connector(self._conn) if str(self.__class__._lasterror) == str(e): debug('Looks like a repeating error, not trying to restart worker thread %s' % (str(self))) else: self.__class__._lasterror = e # self._pool.dead_thread_callback(self) error('Worker thread %s has died due to error: %s' % (str(self), str(e))) error('Worker thread was downloading URL %s' % url_obj.get_full_url()) def get_url(self): if self._urlobject: return self._urlobject.get_full_url() return "" def get_filename(self): if self._urlobject: return self._urlobject.get_full_filename() return "" def get_urlobject(self): """ Return this thread's url object """ return self._urlobject def get_connector(self): """ Return the connector object """ return self._conn def set_urlobject(self, urlobject): self._urlobject = urlobject def get_start_time(self): """ Return the start time of current download """ return self._starttime def set_start_time(self, starttime): """ Return the start time of current download """ self._starttime = starttime def get_elapsed_time(self): """ Get the time taken for this thread """ now=time.time() fetchtime=float(math.ceil((now-self._starttime)*100)/100) return fetchtime def get_elapsed_download_time(self): """ Return elapsed download time for this thread """ fetchtime=float(math.ceil((time.time()-self._dstartime)*100)/100) return fetchtime def long_running(self): """ Find out if this thread is running for a long time (more than given timeout) """ # if any thread is running for more than <timeout> # time, return TRUE return (self.get_elapsed_time() > self._timeout) def set_timeout(self, value): """ Set the timeout value for this thread """ self._timeout = value def close_file(self): """ Close temporary file objects of the connector """ # Currently used only by hget if self._conn: reader = self._conn.get_fileobj() if reader: reader.close() class HarvestManUrlThreadPool(Queue): """ Thread group/pool class to manage download threads """ def __init__(self): """ Initialize this class """ # list of spawned threads self._threads = [] # list of url tasks self._tasks = [] self._cfg = objects.config # Maximum number of threads spawned self._numthreads = self._cfg.threadpoolsize self._timeout = self._cfg.timeout # Last thread report time self._ltrt = 0.0 # Local buffer self.buffer = [] # Data dictionary for multi-part downloads # Keys are URLs and value is the data self._multipartdata = {} # Status of URLs being downloaded in # multipart. Keys are URLs self._multipartstatus = {} # Flag that is set when one of the threads # in a multipart download fails self._multiparterror = False # Number of parts self._parts = self._cfg.numparts # Condition object self._cond = threading.Condition(threading.Lock()) # Condition object for waiting on end condition self._endcond = threading.Condition(threading.Lock()) # Monitor object, used with hget self._monitor = None Queue.__init__(self, self._numthreads + 5) def start_threads(self): """ Start threads if they are not running """ for t in self._threads: try: t.start() except AssertionError, e: pass def spawn_threads(self): """ Start the download threads """ for x in range(self._numthreads): name = 'Worker-'+ str(x+1) fetcher = HarvestManUrlThread(name, self._timeout, self) fetcher.setDaemon(True) # Append this thread to the list of threads self._threads.append(fetcher) # print 'Starting thread',fetcher fetcher.start() def download_urls(self, listofurlobjects): """ Method to download a list of urls. Each member is an instance of a urlPathParser class """ for urlinfo in listofurlobjects: self.push(urlinfo) def _get_num_blocked_threads(self): blocked = 0 for t in self._threads: if not t.is_busy(): blocked += 1 return blocked def is_blocked(self):
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?