⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 urlthread.py

📁 网络蜘蛛
💻 PY
字号:
""" HarvestManUrlThread.py - Url thread downloader module.
    Has two classes, one for downloading of urls and another
    for managing the url threads.

    This software is part of the HarvestMan(R) program.

    Author: Anand B Pillai (anandpillai at letterboxes dot org).
    
    Copyright (C) 2004-2005 Anand B Pillai.

    Modification History

    Jun 4-9 2004 Anand  1.4 development.

                        *Adopted a queue model for the sub
                        threads in this module similar to the
                        tracker threads. Instead of spawning new
                        threads for every download job, we reuse
                        the threads in the thread pool. The threads
                        are pre-spawned at the beginning of download.

                        Derived harvestManUrlThreadPool from Queue
                        and added method 'spawn_threads' to it.

    Jun 14 2004        1.3.9 release.                        

"""

import os, sys
import math
import time
import threading
import cStringIO

from Queue import Queue
from common import *

# __metaclass__ = object

class HarvestManUrlThreadInterrupt(Exception):
    """ Interrupt raised to kill a harvestManUrlThread class's object """

    def __init__(self, value):
        self.value = value

    def __str__(self):
        return str(self.value)

class harvestManUrlThread(threading.Thread):
    """ Class to download a url in a separate thread """

    def __init__(self, name, timeout, threadpool):
        """ Constructor, the constructor takes a url, a filename
        , a timeout value, and the thread pool object pooling this
        thread """

        # url Object (This is an instance of urlPathParser class)
        self.__urlobject = None
        # thread queue object pooling this thread
        self.__pool = threadpool
        # max lifetime for the thread
        self.__timeout = timeout
        # start time of thread
        self.__starttime = 0
        # sleep time
        self.__sleepTime = 1.0
        # error dictionary
        self.__error = {}
        # download status flag
        self.__downloadstatus = 0
        # busy flag
        self.__busyflag = False
        # end flag
        self.__endflag = False
        # initialize threading
        threading.Thread.__init__(self, None, None, name)

    def get_error(self):
        """ Get error value of this thread """

        return self.__error

    def get_status(self):
        """ Get the download status of this thread """

        return self.__downloadstatus

    def is_busy(self):
        """ Get busy status for this thread """

        return self.__busyflag

    def join(self):
        """ The thread's join method to be called
        by other threads """

        threading.Thread.join(self, self.__timeout)

    def terminate(self):
        """ Kill this thread """

        self.stop()
        msg = 'Download thread, ' + self.getName() + ' killed!'
        raise HarvestManUrlThreadInterrupt, msg

    def stop(self):
        """ Stop this thread """

        self.__endflag = True

    def download(self, url_obj):
        """ Download this url """

        url = url_obj.get_full_url()
        filename = url_obj.get_full_filename()
        server = url_obj.get_domain()

        if url_obj.is_image():
            info('Downloading image ...', url)

        extrainfo('Downloading url ...', url)
        server = url_obj.get_domain()

        conn_factory = GetObject('connectorfactory')
        # This call will block if we exceed the number of connections
        # moreinfo("Creating connector for url ", urlobj.get_full_url())
        conn = conn_factory.create_connector( server )

        res = conn.save_url(url_obj)

        # Remove the connector from the factory
        conn_factory.remove_connector(conn, server)
        
        # Set this as download status
        self.__downloadstatus = res

        if res==0:
            # get error flag from connector
            self.__error=conn.get_error()

        # Notify thread pool
        self.__pool.notify(self)

        if res != 0:
            extrainfo('Finished download of ', url)

    def run(self):
        """ Run this thread """

        while not self.__endflag:
            if os.name=='nt' or sys.platform == 'win32':
              self.__starttime=time.clock()
            else:
                self.__starttime=time.time()

            totaltime = 0.0
            url_obj = self.__pool.get_next_urltask()

            # set busy flag to 1
            self.__busyflag = True

            # Save reference
            self.__urlobject = url_obj

            filename, url = url_obj.get_full_filename(), url_obj.get_full_url()
            # print 'Thread => ', self.getName(), url
            if not filename and not url:
                return

            self.download(url_obj)
            # reset busyflag
            self.__busyflag = False

    def get_url(self):

        if self.__urlobject:
            return self.__urlobject.get_full_url()

        return ""

    def get_filename(self):

        if self.__urlobject:
            return self.__urlobject.get_full_filename()

        return ""

    def get_urlobject(self):
        """ Return this thread's url object """

        return self.__urlobject

    def get_elapsed_time(self):
        """ Get the time taken for this thread """

        now=0.0

        if os.name=='nt' or sys.platform=='win32':
            now=time.clock()
        else:
            now=time.time()

        fetchtime=float(math.ceil((now-self._starttime)*100)/100)
        return fetchtime

    def long_running(self):
        """ Find out if this thread is running for a long time
        (more than given timeout) """

        # if any thread is running for more than <timeout>
        # time, return TRUE
        return (self.get_elapsed_time() > self.__timeout)

    def set_timeout(self, value):
        """ Set the timeout value for this thread """

        self.__timeout = value

class harvestManUrlThreadPool(Queue):
    """ Thread group/pool class to manage download threads """

    def __init__(self):
        """ Initialize this class """

        # list of spawned threads
        self.__threads = []
        # list of url tasks
        self.__tasks = []
        # Maximum number of threads spawned
        self.__numthreads=20
        # Last thread report time
        self._ltrt = 0.0
        Queue.__init__(self, 0)

    def spawn_threads(self):
        """ Start the download threads """

        cfg = GetObject('config')
        num_threads = cfg.threadpoolsize

        for x in range(num_threads):
            name = 'Fetcher-'+ str(x+1)
            fetcher = harvestManUrlThread(name, cfg.timeout, self)
            fetcher.setDaemon(True)
            # Append this thread to the list of threads
            self.__threads.append(fetcher)
            fetcher.start()

    def download_urls(self, listofurlobjects):
        """ Method to download a list of urls.
        Each member is an instance of a urlPathParser class """

        for urlinfo in listofurlobjects:
            self.push(urlinfo)

    def __get_num_blocked_threads(self):

        blocked = 0
        for t in self.__threads:
            if not t.is_busy(): blocked += 1

        return blocked

    def is_blocked(self):
        """ The queue is considered blocked if all threads
        are waiting for data, and no data is coming """

        blocked = self.__get_num_blocked_threads()

        if blocked == len(self.__threads):
            return True
        else:
            return False

    def push(self, urlObj):
        """ Push the url object and start downloading the url """

        flag=0

        # unpack the tuple
        try:
            filename, url = urlObj.get_full_filename(), urlObj.get_full_url()
        except:
            return None

        # Find if any thread is already downloading the same
        # url. We dont want to run duplicate threads for the
        # same url :-)
        # atwork = self.locate_busy_threads(url)
        # if len(atwork):
        #    extrainfo('This job is delegated!')
        #    return

        # check whether this file was already downloaded
        if self.avoid_duplicates((filename, url)):
            extrainfo('Found a duplicate for url', url,'...')
            return

        # Wait till we have a thread slot free, and push the
        # current url's info when we get one
        self.put( urlObj )

    def get_next_urltask(self):

        return self.get()

    def notify(self, thread):
        """ Method called by threads to notify that they
        have finished """

        # Mark the time stamp (last thread report time)
        self._ltrt = time.time()

        urlObj = thread.get_urlobject()
        # if the thread failed, update failure stats on the data manager
        dmgr = GetObject('datamanager')

        err = thread.get_error()

        if err.has_key('fatal') and not err['fatal']:
            dmgr.update_failed_files( urlObj )
        else:
            # thread succeeded, increment file count stats on the data manager
            dmgr.update_file_stats( urlObj, thread.get_status())

    def has_busy_threads(self):
        """ Return whether I have any busy threads """

        val=0
        for thread in self.__threads:
            if thread.is_busy():
                val += 1

        return val

    def locate_thread(self, url):
        """ Find a thread which downloaded a certain url """

        for x in self.__threads:
            if not x.is_busy():
                if x.get_url() == url:
                    return x

        return None

    def locate_busy_threads(self, url):
        """ Find all threads which are downloading a certain url """

        threads=[]
        for x in self.__threads:
            if x.is_busy():
                if x.get_url() == url:
                    threads.append(x)

        return threads

    def avoid_duplicates(self, urlinfo):
        """ Avoid downloading same url file twice.
        It can happen that same url is linked from
        different web pages. We query any thread which
        has downloaded this url, and copy the file to
        the file location of the new download request """

        filename = urlinfo[0]
        # Since the filename and count statistics are maintained
        # on the monitor object, it makes sense to query it for
        # the information on a downloaded file.

        # Get data manager object
        dmgr = GetObject('datamanager')

        if dmgr.is_file_downloaded(filename):
            url = urlinfo[1]

            threads = self.locate_busy_threads(url)

            for thread in threads:
                extrainfo('Killing download thread ...', thread)

                try:
                    thread.terminate()
                # we need to catch this exception here
                except HarvestManUrlThreadInterrupt:
                    pass

            return 1
        else:
            return 0

    def end_hanging_threads(self):
        """ If any download thread is running for too long,
        kill it, and remove it from the thread pool """

        pool=[]
        for thread in self.__threads:
            if thread.long_running(): pool.append(thread)

        for thread in pool:
            extrainfo('Killing hanging thread ', thread)
            # remove this thread from the thread list
            self.__threads.remove(thread)
            # kill it
            try:
                thread.terminate()
            except HarvestManUrlThreadInterrupt, e:
                pass

            del thread

    def end_all_threads(self):
        """ Kill all running threads """

        for thread in self.__threads:
            try:
                thread.terminate()
            except HarvestManUrlThreadInterrupt, e:
                print e
            del thread

    def remove_finished_threads(self):
        """ Clean up all threads that have completed """

        for thread in self.__threads:
            if not thread.is_busy():
                self.__threads.remove(thread)
                del thread

    def last_thread_report_time(self):
        """ Return the last thread reported time """

        return self._ltrt



⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -