⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 urltracker.py

📁 网络蜘蛛
💻 PY
📖 第 1 页 / 共 2 页
字号:
        except SGMLParseError, e:
            debug(e)
        except IOError,e:
            print e
        except htmlparser.HTMLParseError, e:
            debug(e)

        return None         

class HarvestManUrlFetcher(HarvestManBaseUrlCrawler):
    """ This is the fetcher class, which downloads data for a url
    and writes its files. It also posts the data for web pages
    to a data queue """

    def __init___(self, index, url_obj = None, isThread=True):
        HarvestManBaseUrlCrawler.__init__(self, index, url_obj, isThread)

    def __str__(self):
        return `self`

    def _initialize(self):
        HarvestManBaseUrlCrawler._initialize(self)
        self._role = "fetcher"

    def get_role(self):
        return "fetcher"

    def set_url_object(self, obj):

        if obj is None: return None
        try:
            prior, url_obj = obj
        except TypeError:
            url_obj = obj

        #if url_obj.is_image():
        #    info("Processing image url ->", url_obj.get_full_url())
        HarvestManBaseUrlCrawler.set_url_object(self, url_obj)
    
    def action(self):
        
        if isinstance(self, threading.Thread):
            self._loops = 0

            while not self._endflag:
                obj = self._crawlerqueue.get_url_data(self.get_role(), self.get_specific_role())
                
                if obj is None: continue
                self.set_url_object(obj)

                # Set status to busy 
                self._status = 1
                self.process_url()
                self._status = 0
                self._loops += 1
                
        else:
            self.process_url()
            self.crawl_url()

    def process_url(self):
        """ This function downloads the data for a url and writes its files.
        It also posts the data for web pages to a data queue """

        filename = self._urlobject.get_full_filename()

        # No need to download html?, return
        if self._urlobject.is_webpage() and not self._configobj.html:
            return None

        if GetObject('ruleschecker').violates_download_rules(self._urlobject):
            # Bug: Time taken to set download flag affects download of files
            # especially when a url exclusion filter is active. So return
            # right away
            return None
            
        mgr = GetObject('datamanager')            
        moreinfo('Downloading file for url ', self._url)
        self._data = mgr.download_url(self._urlobject)

        # Add webpage links in datamgr, if we managed to
        # download the url
        base_url_obj = self._urlobject.get_base_urlobject()

        if self._urlobject.get_download_status()==0:
            if base_url_obj and base_url_obj.is_webpage():
                mgr.update_links(base_url_obj.get_full_filename(), self._urlobject)
        
        if self._urlobject.is_webpage():
            self._status = 2
            import zlib
            self._crawlerqueue.push((self._urlobject, zlib.compress(self._data)), 'fetcher')
            
        return None

class PriorityQueue(Queue):
    """ Priority queue based on bisect module (courtesy: Effbot) """

    def __init__(self, maxsize=0):
        Queue.__init__(self, maxsize)
        
    def _put(self, item):
        bisect.insort(self.queue, item)
        

class HarvestManCrawlerQueue(object):
    """ This class functions as the thread safe queue
    for storing url data for tracker threads """

    def __init__(self):
        self._basetracker = None
        self._flag = 0
        self._pushes = 0
        self._lockedinst = 0
        self._lasttimestamp = time.time()
        self._trackers  = []
        self._requests = 0
        self._trackerindex = 0
        self._lastblockedtime = 0
        self._numfetchers = 0
        self._numcrawlers = 0
        self.__qsize = 0
        self._baseUrlObj = None
        # Time to wait for a data operation on the queue
        # before stopping the project with a timeout.
        self._waittime = GetObject('config').projtimeout

        self._configobj = GetObject('config')
        self.lck = threading.Condition(threading.Lock())
        
        self.url_q = PriorityQueue(0)
        self.data_q = PriorityQueue(0)
        
    def __getattr__(self, name):
        try:
            return self.__dict__[name]
        except KeyError:
            return None

    def __setattr__(self, name, value):
        self.__dict__[name] = value

    def increment_lock_instance(self, val=1):
        self._lockedinst += val

    def get_locked_instances(self):
        return self._lockedinst
    
    def configure(self):
        """ Configure this class with this config object """

        try:
            self._baseUrlObj = urlparser.HarvestManUrlParser(self._configobj.url, 'normal',
                                                             0, self._configobj.url,
                                                             self._configobj.projdir)
        except urlparser.HarvestManUrlParserError, e:
            return -1

        self._baseUrlObj.is_starting_url = True        
        self.push(self._baseUrlObj, 'crawler')
        
        if self._configobj.fastmode:
            iamathread=True
        else:
            iamathread=False

        self._basetracker = HarvestManUrlFetcher( 0, self._baseUrlObj, iamathread )
        self._trackers.append(self._basetracker)

    def crawl(self):
        """ Starts crawling for this project """

        if os.name=='nt':
            t1=time.clock()
        else:
            t1=time.time()

        # Set start time on config object
        self._configobj.starttime = t1
        
        debug('Threads active = ', threading.activeCount())
        debug('Maxium threads allowed = ', self._configobj.maxtrackers)
        debug('Images = ', self._configobj.images)
        debug('Usethreads = ', self._configobj.usethreads)
        debug('Extdirs = ', self._configobj.epagelinks)

        if self._configobj.fastmode:
            # Create the number of threads in the config file
            # Pre-launch the number of threads specified
            # in the config file.

            # Initialize thread dictionary
            self._basetracker.setDaemon(True)
            self._basetracker.start()

            dataavlbl=False
            
            for x in range(self._configobj.maxtrackers - 1):
                
                # Designate every third thread as crawler
                if x % 3==0:
                    t = HarvestManUrlCrawler(x+1, None)
                else:
                    t = HarvestManUrlFetcher(x+1, None)
                    
                self.add_tracker(t)
                t.setDaemon(True)
                t.start()

            # We launch x crawlers & 2*x fetchers
            for t in self._trackers:
                
                if t.get_role() == 'fetcher':
                    self._numfetchers += 1
                elif t.get_role() == 'crawler':
                    self._numcrawlers += 1

            # bug: give the threads some time to start,
            # otherwise we exit immediately sometimes.
            time.sleep(5.0)
            
            while not self.is_exit_condition():
                time.sleep(2.0)

            # Set flag to 1 to denote that downloading is finished.
            self._flag = 1

            self.stop_trackers(noexit = True)
        else:
            self._basetracker.action()

    def get_base_tracker(self):
        """ Get the base tracker object """

        return self._basetracker

    def get_base_urlobject(self):

        return self._baseUrlObj
    
    def get_url_data(self, role, sp_role):
        """ Pop url data from the queue """

        if self._flag: return None

        # moreinfo("Getting url object ...")
        obj = None
        
        if role == 'crawler':
            obj = self.data_q.get()
        elif role == 'fetcher' or role=='tracker':
            obj = self.url_q.get()
            
        self._lasttimestamp = time.time()        

        self._requests += 1
        return obj

    def __get_num_blocked_threads(self):

        blocked = 0
        for t in self._trackers:
            if not t.is_busy(): blocked += 1

        return blocked

    def get_num_alive_threads(self):

        live = 0
        for t in self._trackers:
            if t.isAlive(): live += 1

        return live
        
    def __get_num_locked_crawler_threads(self):

        locked = 0
        for t in self._trackers:
            if t.get_role() == 'crawler':
                if t.is_locked(): locked += 1

        return locked

    def __get_num_locked_fetcher_threads(self):
        
        locked = 0
        for t in self._trackers:
            if t.get_role() == 'fetcher':
                if t.is_locked(): locked += 1

        return locked
    
    def add_tracker(self, tracker):
        self._trackers.append( tracker )
        self._trackerindex += 1

    def remove_tracker(self, tracker):
        self._trackers.remove(tracker)
        
    def get_last_tracker_index(self):
        return self._trackerindex
    
    def print_busy_tracker_info(self):
        
        for t in self._trackers:
            if t.is_busy():
                print t,' =>', t.getUrl()

            
    def is_locked_up(self, role):
         """ The queue is considered locked up if all threads
         are waiting to push data, but none can since queue
         is already full, and no thread is popping data. This
         is a deadlock condition as the program cannot go any
         forward without creating new threads that will pop out
         some of the data (We need to take care of it by spawning
         new threads which can pop data) """

         locked = 0
         
         if role == 'fetcher':
             locked = self.__get_num_locked_fetcher_threads()
             if locked == self._numfetchers - 1:
                 return True
         elif role == 'crawler':
             locked = self.__get_num_locked_crawler_threads()
             if locked == self._numcrawlers - 1:
                 return True             

         return False
     
    def is_exit_condition(self):
        """ Exit condition is when there are no download
        sub-threads running and all the tracker threads
        are blocked or if the project times out """

        dmgr = GetObject('datamanager')
            
        currtime = time.time()
        last_thread_time = dmgr.last_download_thread_report_time()

        if last_thread_time > self._lasttimestamp:
            self._lasttimestamp = last_thread_time
            
        timediff = currtime - self._lasttimestamp

        is_blocked = self.is_blocked()
        if is_blocked:
            self._lastblockedtime = time.time()
            
        has_running_threads = dmgr.has_download_threads()
        timed_out = False

        # If the trackers are blocked, but waiting for sub-threads
        # to finish, kill the sub-threads.
        if is_blocked and has_running_threads:
            # Find out time difference between when trackers
            # got blocked and curr time. If greater than 1 minute
            # Kill hanging threads
            timediff2 = currtime - self._lastblockedtime
            if timediff2 > 60.0:
                moreinfo("Killing download threads ...")
                dmgr.kill_download_threads()
            
        if is_blocked and not has_running_threads:
            return True
        
        if timediff > self._waittime:
            timed_out = True
        
        if timed_out:
            moreinfo("Project", self._configobj.project, "timed out.")
            moreinfo('(Time since last data operation was', timediff, 'seconds)')
            return True

        return False
        
    def is_blocked(self):
        """ The queue is considered blocked if all threads
        are waiting for data, and no data is coming """

        blocked = self.__get_num_blocked_threads()
        
        if blocked == len(self._trackers):
            return True
        else:
            return False

    def is_fetcher_queue_full(self):
        """ Check whether the fetcher queue is full """

        if self.__get_num_locked_fetcher_threads() == self._numfetchers - 1:
            return True
        
        return False

    def is_crawler_queue_full(self):
        """ Check whether the crawler queue is full """

        if self.__get_num_locked_crawler_threads() == self._numcrawlers - 1:
            return True
        
        return False        
        
    def push(self, obj, role):
        """ Push trackers to the queue """

        if self._flag: return

        if role == 'crawler' or role=='tracker':
            self.url_q.put((obj.get_priority(), obj))

        elif role == 'fetcher':
            # moreinfo('Putting data obtained from url', (obj[0]).get_full_url())
            self.data_q.put((obj[0].get_priority(), obj))
            
        self._pushes += 1
        self._lasttimestamp = time.time()

    def stop_trackers(self, noexit=False):
        """ Stop all running trackers """

        moreinfo("Ending Project", self._configobj.project,'...')
        for t in self._trackers:
            # t.join(0)
            t.stop()

        # Exit the system
        if not noexit:
            sys.exit(2)
        
    def kill_trackers(self):
        """ This function kills running tracker threads """

        moreinfo('Killing tracker threads...')
        self._flag=1

        count =0

        debug('Current running threads => ', threading.activeCount())
        debug('Current tracker count => ', len(self._trackers))
        extrainfo('Waiting for threads to clean up ...')

        # Kill the individual download threads
        mgr = GetObject('datamanager')
        mgr.kill_download_threads()

        for tracker in self._trackers:
            count += 1
            sys.stdout.write('...')

            if count % 10 == 0: sys.stdout.write('\n')

            try:
                tracker.terminate()
                del tracker
            except HarvestManUrlCrawlerException, e:
                pass
            except AssertionError, e:
                print e, '=> ', tracker
            except ValueError, e:
                print e, '=> ', tracker


        

        

            
            

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -