⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 urlqueue.py

📁 Harvestman-最新版本
💻 PY
📖 第 1 页 / 共 2 页
字号:
        self.pushes = 0        self.lasttimestamp = time.time()        self.trackers  = []        self.savers = []        self.requests = 0        self.trackerindex = 0        self.baseurl = None        self.stateobj = HarvestManCrawlerState(self)        self.configobj = objects.config        self.url_q = PriorityQueue(self.configobj.queuesize)        self.data_q = PriorityQueue(self.configobj.queuesize)                    # Local buffer        self.buffer = []        # Lock for creating new threads        self.cond = threading.Lock()        # Flag indicating a forceful exit        self.forcedexit = False        # Sleep event        self.evnt = SleepEvent(self.configobj.queuetime)    def get_controller(self):        """ Return the controller thread object """        return self.controller        def configure(self):        """ Configure this class """        try:            self.baseurl = urlparser.HarvestManUrl(self.configobj.url,                                                   urltypes.URL_TYPE_ANY,                                                   0, self.configobj.url,                                                   self.configobj.projdir)            # Put the original hash of the start url in the class            urlparser.HarvestManUrl.hashes[self.baseurl.index] = 1            # Reset index to zero            self.baseurl.index = 0            objects.datamgr.add_url(self.baseurl)                    except urlparser.HarvestManUrlError:            return False        self.baseurl.starturl = True                #if self.configobj.fastmode:        try:            self.basetracker = crawler.HarvestManUrlFetcher( 0, self.baseurl, True )        except Exception, e:            print "Fatal Error:",e            hexit(1)                        #else:        #    # Not much point keeping this code...!        #            #    # Disable usethreads        #    self.configobj.usethreads = False        #    # Disable blocking        #    self.configobj.blocking = False        #    self.basetracker = crawler.HarvestManUrlDownloader( 0, self.baseurl, False )                    self.trackers.append(self.basetracker)        # Reset state object        self.stateobj.reset()                return True    def mainloop(self):        """ Main program loop which waits for        the threads to do their work """        # print 'Waiting...'        timeslot, tottime = 0.5, 0        pool = objects.datamgr.get_url_threadpool()                while not self.stateobj.end_state():            self.stateobj.wait2(timeslot)            tottime += timeslot            if self.flag:                 break                    if pool: pool.wait(10.0, self.configobj.timeout)        if self.stateobj.abortmsg:            extrainfo(self.stateobj.abortmsg)                    if not self.forcedexit:            self.end_threads()    def endloop(self, forced=False):        """ Exit the mainloop """        # Set flag to 1 to denote that downloading is finished.        self.flag = 1        if forced:            self.forcedexit = True            # A forced exit happens when we exit because a            # download limit is breached, so instruct connectors            # to not save anything from here on...            conndict = objects.connfactory.get_connector_dict()            for conn in conndict.keys():                if conndict.get(conn):                    conn.blockwrite = True    def restart(self):        """ Alternate method to start from a previous restored state """        # Start harvestman controller thread        import datamgr        if self.configobj.enable_controller():            self.controller = datamgr.HarvestManController()            self.controller.start()        # Start base tracker        self.basetracker.start()        time.sleep(2.0)        for t in self.trackers[1:]:            try:                t.start()            except AssertionError, e:                logconsole(e)                pass        self.mainloop()            def crawl(self):        """ Starts crawling for this project """        # Reset flag        self.flag = 0        t1=time.time()        # Clear the queues...        self.url_q.clear()        self.data_q.clear()                # Push the first URL directly to the url queue        self.url_q.put((self.baseurl.priority, self.baseurl))        # This is pushed to url queue, so increment crawler push...        self.stateobj.cpush += 1                #if self.configobj.fastmode:        # Start harvestman controller thread        if self.configobj.enable_controller():                    self.controller = datamgr.HarvestManController()            self.controller.start()        # Create the number of threads in the config file        # Pre-launch the number of threads specified        # in the config file.        # Initialize thread dictionary        self.stateobj.numfetchers = int(0.75*self.configobj.maxtrackers)        self.stateobj.numcrawlers = self.configobj.maxtrackers - self.stateobj.numfetchers        self.basetracker.setDaemon(True)        self.basetracker.start()        evt = SleepEvent(0.1)        while self.stateobj.get(self.basetracker) == crawler.FETCHER_WAITING:            evt.sleep()        # Set start time on config object        self.configobj.starttime = t1        del evt        for x in range(1, self.stateobj.numfetchers):            t = crawler.HarvestManUrlFetcher(x, None)            self.add_tracker(t)            t.setDaemon(True)            t.start()        for x in range(self.stateobj.numcrawlers):            t = crawler.HarvestManUrlCrawler(x, None)            self.add_tracker(t)            t.setDaemon(True)            t.start()        self.mainloop()        #else:        #    self.basetracker.action()    def get_base_tracker(self):        """ Get the base tracker object """        return self.basetracker    def get_base_url(self):        return self.baseurl        def get_url_data(self, role):        """ Pop url data from the queue """        if self.flag: return None                obj = None        blk = self.configobj.blocking        slptime = self.configobj.queuetime        ct = threading.currentThread()                if role == 'crawler':            if blk:                obj=self.data_q.get()                self.stateobj.set(ct, crawler.CRAWLER_GOT_DATA)                            else:                self.stateobj.set(ct, crawler.CRAWLER_WAITING)                try:                    obj = self.data_q.get(timeout=slptime)                    self.stateobj.set(ct, crawler.CRAWLER_GOT_DATA)                                                    except Empty, TypeError:                    obj = None                            elif role == 'fetcher' or role=='tracker':                        if blk:                obj = self.url_q.get()                self.stateobj.set(ct, crawler.FETCHER_GOT_DATA)                                            else:                self.stateobj.set(ct, crawler.FETCHER_WAITING)                try:                    obj = self.url_q.get(timeout=slptime)                    self.stateobj.set(ct, crawler.FETCHER_GOT_DATA)                                                                    except Empty, TypeError:                    obj = None                    self.lasttimestamp = time.time()                self.requests += 1        return obj    def add_tracker(self, tracker):        self.trackers.append( tracker )        self.trackerindex += 1    def remove_tracker(self, tracker):        self.trackers.remove(tracker)    def dead_thread_callback(self, t):        """ Call back function called by a thread if it        dies with an exception. This class then creates        a fresh thread, migrates the data of the dead        thread to it """                try:            debug('Trying to regenerate thread...')            self.cond.acquire()            # First find out the type            role = t._role            new_t = None            if role == 'fetcher':                new_t = crawler.HarvestManUrlFetcher(t.get_index(), None)            elif role == 'crawler':                new_t = crawler.HarvestManUrlCrawler(t.get_index(), None)            # Migrate data and start thread            if new_t:                new_t._url = t._url                new_t._urlobject = t._urlobject                                new_t.buffer = copy.deepcopy(t.buffer)                # If this is a crawler get links also                if role == 'crawler':                    new_t.links = t.links[:]                                    # Replace dead thread in the list                idx = self.trackers.index(t)                self.trackers[idx] = new_t                new_t.resuming = True                new_t.start()                debug('Regenerated thread...')                                return THREAD_MIGRATION_OK            else:                # Could not make new thread, so decrement                # count of threads.                # Remove from tracker list                self.trackers.remove(t)                                if role == 'fetcher':                    self.stateobj.numfetchers -= 1                elif role == 'crawler':                    self.stateobj.numcrawlers -= 1                return THREAD_MIGRATION_ERROR        finally:            self.cond.release()                    def push(self, obj, role):        """ Push trackers to the queue """        if self.flag: return                ntries, status = 0, 0        ct = threading.currentThread()                if role == 'crawler' or role=='tracker' or role =='downloader':            # debug('Pushing stuff to buffer',ct)            self.stateobj.set(ct, crawler.CRAWLER_PUSH_URL)                        while ntries < 5:                try:                    ntries += 1                    self.url_q.put((obj.priority, obj))                    self.pushes += 1                    status = 1                    self.stateobj.set(ct, crawler.CRAWLER_PUSHED_URL)                    break                except Full:                    self.evnt.sleep()                            elif role == 'fetcher':            # print 'Pushing stuff to buffer', ct            self.stateobj.set(ct, crawler.FETCHER_PUSH_URL)                                            # stuff = (obj[0].priority, (obj[0].index, obj[1]))            while ntries < 5:                try:                    ntries += 1                    self.data_q.put(obj)                    self.pushes += 1                    status = 1                    self.stateobj.set(ct, crawler.FETCHER_PUSHED_URL)                                        break                except Full:                    self.evnt.sleep()                            self.lasttimestamp = time.time()        return status        def end_threads(self):        """ Stop all running threads and clean        up the program. This function is called        for a normal/abnormal exit of HravestMan """        extrainfo("Ending threads...")        if self.configobj.project:            if self.forcedexit:                info('Terminating project ',self.configobj.project,'...')            else:                info("Ending Project", self.configobj.project,'...')        # Stop controller        if self.controller:            self.controller.stop()                if self.forcedexit:            self._kill_tracker_threads()        else:            # Do a regular stop and join            for t in self.trackers:                try:                    t.stop()                except Exception, e:                    pass            # Wait till all threads report            # to the state machine, with a            # timeout of 5 minutes.            extrainfo("Waiting for threads to finish up...")            timeslot, tottime = 0.5, 0            while not self.stateobj.exit_state():                # print 'Waiting...'                self.stateobj.wait2(timeslot)                tottime += timeslot                if tottime>=300.0:                    break            pool = objects.datamgr.get_url_threadpool()            if pool: pool.wait(10.0, 120.0)                        extrainfo("Done.")            # print 'Done.'                self.trackers = []        self.basetracker = None    def _kill_tracker_threads(self):        """ This function kills running tracker threads """        count =0        for tracker in self.trackers:            count += 1            sys.stdout.write('...')            if count % 10 == 0: sys.stdout.write('\n')            try:                tracker.stop()            except AssertionError, e:                logconsole(str(e))            except ValueError, e:                logconsole(str(e))            except crawler.HarvestManUrlCrawlerException, e:                pass

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -