⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 urlqueue.py

📁 Harvestman-最新版本
💻 PY
📖 第 1 页 / 共 2 页
字号:
# -- coding: utf-8""" urlqueue.py - Module which controls queueing of urls    created by crawler threads. This is part of the HarvestMan    program.    Author: Anand B Pillai <abpillai at gmail dot com>        Modification History     Anand Jan 12 2005 -   Created this module, by splitting urltracker.py     Aug 11 2006  Anand    Checked in changes for reducing CPU utilization.     Aug 22 2006  Anand    Changes for fixing single-thread mode.     Oct 19 2007  Anand    Added a very basic state-machine for managing                           crawler end condition.     Oct 22 2007  Anand    Enhanced the state machine with additional states,                           checks and a modified mainloop etc.     April 04 2008 Anand   Fixes in state machine and mainloop.     Jun   03 2008 Anand   Fixes in abnormal exit logic. Other fixes.        Copyright (C) 2005 Anand B Pillai.     """__version__ = '2.0 b1'__author__ = 'Anand B Pillai'import bisectimport timeimport threadingimport sys, osimport copyfrom collections import dequefrom Queue import *from harvestman.lib import crawlerfrom harvestman.lib import urlparserfrom harvestman.lib import documentfrom harvestman.lib import datamgrfrom harvestman.lib import urltypesfrom harvestman.lib.common.common import *from harvestman.lib.common.macros import *from harvestman.lib.common.singleton import Singletonclass HarvestManCrawlerState(Singleton):    """ State machine for signalling crawler end condition    and for managing end-condition stalemates and other    issues """    def __init__(self, queue):        self.reset()        self.cond = threading.Condition(threading.Lock())        self.queue = queue    def reset(self):        self.ts = {}        # Flags        # All threads blocked (waiting)        self.blocked = False        # All fetchers blocked (waiting)        self.fblocked = False        # All crawlers blocked (waiting)        self.cblocked = False        # Crawler thread transitions        self.ctrans = 0        # Fetcher thread transitions        self.ftrans = 0        # Pushes by fetcher        self.fpush = 0        # Pushes by crawler        self.cpush = 0        # Gets by fetcher        self.fgets = 0        # Gets by crawler        self.cgets = 0        # Number of crawlers        self.numcrawlers = 0        # Number of fetchers        self.numfetchers = 0        # Suspend time-stamp, initially        # set to None. To suspend end-state        # checking, set this to current time...        self.st = None        # End state message. If normal exit        # this is not set, for abormal exit        # this could be set...        self.abortmsg = ''        self.blkcnt = 0        self.lastcheck = time.time()            def set(self, thread, state):        curr, role = None, None        item = self.get(thread)        if item:            curr, role = item                    if curr != state:            # print 'Thread %s changes from state %s to state %s' % (thread, curr, state)            self.ts[thread.getName()] = state, thread._role        self.state_callback(thread, state)    def get(self, thread):        return self.ts.get(thread.getName())    def zero_thread(self):        """ Function which returns whether any of the        thread counts (fetcher/crawler) have gone to zero """        return (self.numfetchers==0) or (self.numcrawlers == 0)    def suspend(self):        """ Suspend checks on end state. This uses a timeout        on the suspend flag which automaticall ages the flag        and resets if, if not set within the aging period """        self.st = time.time()    def unsuspend(self):        """ Unsuspend checks on end-state. """        self.st = None            def state_callback(self, thread, state):        """ Callbacks for taking action according to state transitions """        self.cond.acquire()        typ = thread._role                if state == crawler.THREAD_STARTED:            if thread.resuming:                # Resuming threads should call unsuspend..                self.unsuspend()                    # If the thread is killed, try to regenerate it...        elif state == crawler.THREAD_DIED:            # print 'THREAD DIED',thread            # Don't try to regenerate threads if this is a local exception.            e = thread.exception            logconsole("Thread died due to exception => ", str(e))            # See class for last error. If it is same as            # this error, don't do anything since this could            # be a programming error and will send us into            # a loop...            #if str(thread.__class__._lasterror) == str(e):            #    debug('Looks like a repeating error, not trying to restart thread %s' % (str(thread)))            # In this case the thread has died, so reduce local thread count            if typ=='crawler':                self.numcrawlers -= 1            elif typ == 'fetcher':                self.numfetchers -= 1                            del self.ts[thread.getName()]            #else:            #    thread.__class__._lasterror = e            #    # Release the lock now!            #    self.cond.release()            #    # Set suspend flag            #    self.suspend()            #                #    del self.ts[thread]            #    extrainfo('Tracker thread %s has died due to error: %s' % (str(thread), str(e)))            #    self.queue.dead_thread_callback(thread)            #    return                    elif state == crawler.FETCHER_PUSHED_URL:            # Push count for fetcher threads            self.fpush += 1        elif state == crawler.CRAWLER_PUSHED_URL:                        # Push count for fetcher threads            self.cpush += 1        elif state == crawler.FETCHER_GOT_DATA:            # Get count for fetcher threads            self.fgets += 1        elif state == crawler.CRAWLER_GOT_DATA:            # Get count for fetcher threads            self.cgets += 1                    elif state == crawler.THREAD_SLEEPING:            # A sleep state can be achieved only after a work state            # so this indicates a cycle of transitions since            # a cycle ends with a sleep...            if typ == 'crawler':                # Transition count for crawler threads                                self.ctrans += 1            elif typ == 'fetcher':                # Transition count for crawler threads                                self.ftrans += 1                                        elif state in (crawler.FETCHER_WAITING, crawler.CRAWLER_WAITING):            if self.end_state():                # This is useful only if the waiter is waiting                # using wait1(...) method. If he is waiting                # using wait2(...) method, he needs to devise                # his own wake-up logic.                self.cond.notify()        self.cond.release()    def all_are_waiting(self):        """ This method returns whether the threads are all starved for        data during regular crawl, which signals an end condition for the        program """                # Time stamp of calling this function        currt = time.time()        # Check suspend time-stamp        if self.st:            # Calculate difference, do not allow suspending            # for more than 5 seconds.            if (currt - self.st)>5.0:                self.st = None            return False        if self.zero_thread():            self.abortmsg = "Fatal thread reduction, stopping program"            return True        for status, role in self.ts.values():            if status.__name__ not in ('PERM_EXCEPT','FETCHER_WAITING','CRAWLER_WAITING','THREAD_DIED', 'THREAD_STOPPED'):                return False                #if self.queue.url_q.qsize() or self.queue.data_q.qsize():        #    return False        return ((self.fpush == self.cgets) and \               (self.cpush == self.fgets))    def all_have_stopped(self):        """ This method returns whether the threads are all stopped        (or sleeping) after an abnormal exit of the program either        by an explicit interrupt or after a program exception """                if self.zero_thread():            self.abortmsg = "Fatal thread reduction, stopping program"            return True        for status, role in self.ts.values():            if status.__name__ not in ('PERM_EXCEPT','THREAD_STOPPED','THREAD_SLEEPING'):                return False                return True    def end_state(self):        """ Check end state for the program. Returns True        if the program is ready to end. Abnormal exits are        not handled here """        return self.all_are_waiting()    def exit_state(self):        """ Checks end of state for program for abnormal        exits. Returns True if the program is ready to end """        return self.all_have_stopped()        ##         # Time stamp of calling this function##         currt = time.time()##         # Check suspend time-stamp##         if self.st:##             # Calculate difference, do not allow suspending##             # for more than 5 seconds.##             if (currt - self.st)>5.0:##                 self.st = None##             return False##         if self.zero_thread():##             self.abortmsg = "Fatal thread reduction, stopping program"##             return True        ##         flag = True##         numthreads = 0        ##         fcount, fnblock, ccount, cnblock = 0, 0, 0, 0##         self.blocked, self.fblocked, self.cblocked = False, False, False##         for state, role in self.ts.values():##             numthreads += 1##             print role,'=>',state            ##             if role == 'fetcher':##                 fcount += 1##                 if state == crawler.FETCHER_WAITING:##                     fnblock += 1##                 else:##                     flag = False##                     break##             else:##                 ccount += 1##                 if state == crawler.CRAWLER_WAITING:##                     cnblock += 1##                 else:##                     flag = False##                     break##         # print 'flag=>',flag##          # For exit based on thread waiting state allignment##         if flag:##             self.blocked = True##             print 'Numthreads=>',numthreads##         if ccount==cnblock:##             self.cblocked = True##         if fcount==fnblock:##             self.fblocked = True##         if self.blocked:##             # print 'BLOCKED!'##             # print 'Length1 => ',len(self.queue.data_q)##             # print 'Length2 => ',len(self.queue.url_q)            ##             #print "Pushes=>",self.queue._pushes##             # print 'Transitions',self.ctrans, self.ftrans,self.fpush,self.cpush##             # If we have one fpush event, we need to have at least##             # one cpush associated to it...##             # Error: this is a dangerous condition... we never know if the##             # crawler has filtered out the children of the first URL itself##             # so cpush could be == 0, commented this out!            ##             #if self.fpush>0:##             #    return (self.cpush>0)##             self.blkcnt += 1##             if self.blkcnt > 10:##                 return True##             #return False##         return False    def __str__(self):        return str(self.ts)    def wait1(self, timeout):        """ Regular wait method. This should be typically        called with a large timeout value """        while not self.end_state():            self.cond.acquire()            try:                self.cond.wait(timeout)            except IOError, e:                break                        self.cond.release()            def wait2(self, timeout):        """ Secondary wait method. This should be typically        called with a small timeout value. When calling        this the caller should have additional logic to        make sure he does not time out before the condition        is met """                try:            self.cond.acquire()            self.cond.wait(timeout)            self.cond.release()        except IOError, e:            pass        class PriorityQueue(Queue):    """ Priority queue based on bisect module (courtesy: Effbot) """    def __init__(self, maxsize=0):        Queue.__init__(self, maxsize)    def _init(self, maxsize):        self.maxsize = maxsize        self.queue = []            def _put(self, item):        bisect.insort(self.queue, item)    def __len__(self):        return self.qsize()        def _qsize(self):        return len(self.queue)    def _empty(self):        return not self.queue    def _full(self):        return self.maxsize>0 and len(self.queue) == self.maxsize    def _get(self):        return self.queue.pop(0)        def clear(self):        while True:            try:                self.queue.pop()            except IndexError:                break        class HarvestManCrawlerQueue(object):    """ This class functions as the thread safe queue    for storing url data for tracker threads """    alias = 'queuemgr'        def __init__(self):        self.reset()    def reset(self):                self.basetracker = None        self.controller = None         self.flag = 0

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -