urlthread.py

来自「Harvestman-最新版本」· Python 代码 · 共 877 行 · 第 1/2 页

PY
877
字号
# -- coding: utf-8""" urlthread.py - Url thread downloader module.    Has two classes, one for downloading of urls and another    for managing the url threads.    This module is part of the HarvestMan program.    Author: Anand B Pillai <abpillai at gmail dot com>        Modification History    Jan 10 2006  Anand  Converted from dos to unix format (removed Ctrl-Ms).    Jan 20 2006  Anand  Small change in printing debug info in download                        method.    Mar 05 2007  Anand  Implemented http 304 handling in notify(...).    Apr 09 2007  Anand  Added check to make sure that threads are not                        re-started for the same recurring problem.        Copyright (C) 2004 Anand B Pillai."""__version__ = '2.0 b1'__author__ = 'Anand B Pillai'import os, sysimport mathimport timeimport threadingimport copyimport randomimport shafrom collections import dequefrom Queue import Queue, Full, Emptyfrom harvestman.lib import urlparserfrom harvestman.lib.mirrors import HarvestManMirrorManagerfrom harvestman.lib.common.common import *from harvestman.lib.common.macros import *class HarvestManUrlThreadInterrupt(Exception):    """ Interrupt raised to kill a harvestManUrlThread class's object """    def __init__(self, value):        self.value = value    def __str__(self):        return str(self.value)class HarvestManUrlThread(threading.Thread):    """ Class to download a url in a separate thread """    # The last error which caused a thread instance to die    _lasterror = None        def __init__(self, name, timeout, threadpool):        """ Constructor, the constructor takes a url, a filename        , a timeout value, and the thread pool object pooling this        thread """        # url Object (This is an instance of urlPathParser class)        self._urlobject = None        # thread queue object pooling this thread        self._pool = threadpool        # max lifetime for the thread        self._timeout = timeout        # start time of thread        self._starttime = 0        # start time of a download        self._dstartime = 0        # sleep time        self._sleepTime = 1.0        # error object        self._error = None        # download status         self._downloadstatus = 0        # busy flag        self._busyflag = False        # end flag        self._endflag = False        # Url data, only used for CONNECTOR_DATA_MODE_INMEM        self._data = ''        # Url temp file, used for mode CONNECTOR_DATA_MODE_FLUSH        self._urltmpfile = ''        # Current connector        self._conn = None        # initialize threading        threading.Thread.__init__(self, None, None, name)    def __str__(self):        return self.getName()        def get_error(self):        """ Get error object of this thread """        return self._error    def get_status(self):        """ Get the download status of this thread """        return self._downloadstatus    def get_data(self):        """ Return the data of this thread """        return self._data    def get_tmpfname(self):        """ Return the temp filename if any """        return self._urltmpfile    def set_tmpfname(self, filename):        """ Set the temporary filename """        # Typically called by connector objects        self._urltmpfile = filename            def set_status(self, status):        """ Set the download status of this thread """        self._downloadstatus = status    def is_busy(self):        """ Get busy status for this thread """        return self._busyflag    def set_busy_flag(self, flag):        """ Set busy status for this thread """        self._busyflag = flag    def join(self):        """ The thread's join method to be called        by other threads """        threading.Thread.join(self, self._timeout)    def terminate(self):        """ Kill this thread """        self.stop()        msg = 'Download thread, ' + self.getName() + ' killed!'        raise HarvestManUrlThreadInterrupt, msg    def stop(self):        """ Stop this thread """        # If download was not completed, push-back object        # to the pool.        if self._downloadstatus==0 and self._urlobject:            self._pool.push(self._urlobject)                    self._endflag = True    def download(self, url_obj):        """ Download this url """        # Set download status        self._downloadstatus = 0        self._dstartime = time.time()                url = url_obj.get_full_url()        if not url_obj.trymultipart:            # print 'Gewt URL=>',url,self            if url_obj.is_image():                extrainfo('Downloading image ...', url)            else:                extrainfo('Downloading url ...', url)        else:            startrange = url_obj.range[0]            endrange = url_obj.range[-1]            # print "Got URL",url,self            extrainfo('%s: Downloading url %s, byte range(%d - %d)' % (str(self),url,startrange,endrange))        # This call will block if we exceed the number of connections        self._conn = objects.connfactory.create_connector()        mode = self._conn.get_data_mode()                if not url_obj.trymultipart:            res = self._conn.save_url(url_obj)        else:            # print 'Downloading URL',url,self            res = self._conn.wrapper_connect(url_obj)            # print 'Connector returned',self,url_obj.get_full_url()                        if mode == CONNECTOR_DATA_MODE_FLUSH:                self._urltmpfile = self._conn.get_tmpfname()            elif mode == CONNECTOR_DATA_MODE_INMEM:                self._data = self._conn.get_data()        # Add page hash to URL object        data = self._conn.get_data()        # Update pagehash on the URL object        if data:             url_obj.pagehash = sha.new(data).hexdigest()                    # Remove the connector from the factory        objects.connfactory.remove_connector(self._conn)                # Set this as download status        self._downloadstatus = res                # get error flag from connector        self._error = self._conn.get_error()        self._conn = None                # Notify thread pool        self._pool.notify(self)        if SUCCESS(res):            if not url_obj.trymultipart:                            extrainfo('Finished download of ', url)            else:                startrange = url_obj.range[0]                endrange = url_obj.range[-1]                                            debug('Finished download of byte range(%d - %d) of %s' % (startrange,endrange, url))        elif self._error.number != 304:            error('Failed to download URL',url)        objects.datamgr.update_url(url_obj)            def run(self):        """ Run this thread """        while not self._endflag:            try:                self._starttime=time.time()                # print 'Waiting for next URL task',self                url_obj = self._pool.get_next_urltask()                # Dont do duplicate checking for multipart...                if not url_obj.trymultipart and self._pool.check_duplicates(url_obj):                    print 'Is duplicate',url_obj.get_full_url()                    continue                if not url_obj:                    time.sleep(0.1)                    continue                # set busy flag to 1                self._busyflag = True                # Save reference                self._urlobject = url_obj                # print 'Got url=>',url_obj.get_full_url()                                filename, url = url_obj.get_full_filename(), url_obj.get_full_url()                if not filename and not url:                    return                # Perf fix: Check end flag                # in case the program was terminated                # between start of loop and now!                if not self._endflag: self.download(url_obj)                # reset busyflag                # print 'Setting busyflag to False',self                self._busyflag = False            except Exception, e:                raise                error('Worker thread Exception',e)                # Now I am dead - so I need to tell the pool                # object to migrate my data and produce a new thread.                                # See class for last error. If it is same as                # this error, don't do anything since this could                # be a programming error and will send us into                # a loop...                # Set busyflag to False                self._busyflag = False                # Remove the connector from the factory                                if self._conn and (not self._conn.is_released()):                    objects.connfactory.remove_connector(self._conn)                                if str(self.__class__._lasterror) == str(e):                    debug('Looks like a repeating error, not trying to restart worker thread %s' % (str(self)))                else:                    self.__class__._lasterror = e                    # self._pool.dead_thread_callback(self)                    error('Worker thread %s has died due to error: %s' % (str(self), str(e)))                    error('Worker thread was downloading URL %s' % url_obj.get_full_url())    def get_url(self):        if self._urlobject:            return self._urlobject.get_full_url()        return ""    def get_filename(self):        if self._urlobject:            return self._urlobject.get_full_filename()        return ""    def get_urlobject(self):        """ Return this thread's url object """        return self._urlobject    def get_connector(self):        """ Return the connector object """        return self._conn        def set_urlobject(self, urlobject):                    self._urlobject = urlobject            def get_start_time(self):        """ Return the start time of current download """        return self._starttime    def set_start_time(self, starttime):        """ Return the start time of current download """        self._starttime = starttime        def get_elapsed_time(self):        """ Get the time taken for this thread """        now=time.time()        fetchtime=float(math.ceil((now-self._starttime)*100)/100)        return fetchtime    def get_elapsed_download_time(self):        """ Return elapsed download time for this thread """        fetchtime=float(math.ceil((time.time()-self._dstartime)*100)/100)        return fetchtime            def long_running(self):        """ Find out if this thread is running for a long time        (more than given timeout) """        # if any thread is running for more than <timeout>        # time, return TRUE        return (self.get_elapsed_time() > self._timeout)    def set_timeout(self, value):        """ Set the timeout value for this thread """        self._timeout = value    def close_file(self):        """ Close temporary file objects of the connector """        # Currently used only by hget        if self._conn:            reader = self._conn.get_fileobj()            if reader: reader.close()        class HarvestManUrlThreadPool(Queue):    """ Thread group/pool class to manage download threads """    def __init__(self):        """ Initialize this class """        # list of spawned threads        self._threads = []        # list of url tasks        self._tasks = []        self._cfg = objects.config        # Maximum number of threads spawned        self._numthreads = self._cfg.threadpoolsize        self._timeout = self._cfg.timeout                # Last thread report time        self._ltrt = 0.0        # Local buffer        self.buffer = []        # Data dictionary for multi-part downloads        # Keys are URLs and value is the data        self._multipartdata = {}        # Status of URLs being downloaded in        # multipart. Keys are URLs        self._multipartstatus = {}        # Flag that is set when one of the threads        # in a multipart download fails        self._multiparterror = False        # Number of parts        self._parts = self._cfg.numparts        # Condition object        self._cond = threading.Condition(threading.Lock())        # Condition object for waiting on end condition        self._endcond = threading.Condition(threading.Lock())        # Monitor object, used with hget        self._monitor = None                Queue.__init__(self, self._numthreads + 5)            def start_threads(self):        """ Start threads if they are not running """        for t in self._threads:            try:                t.start()            except AssertionError, e:                pass                def spawn_threads(self):        """ Start the download threads """        for x in range(self._numthreads):            name = 'Worker-'+ str(x+1)            fetcher = HarvestManUrlThread(name, self._timeout, self)            fetcher.setDaemon(True)            # Append this thread to the list of threads            self._threads.append(fetcher)            # print 'Starting thread',fetcher            fetcher.start()    def download_urls(self, listofurlobjects):        """ Method to download a list of urls.        Each member is an instance of a urlPathParser class """        for urlinfo in listofurlobjects:            self.push(urlinfo)    def _get_num_blocked_threads(self):        blocked = 0        for t in self._threads:            if not t.is_busy(): blocked += 1        return blocked    def is_blocked(self):

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?