📄 urlqueue.py
字号:
self.pushes = 0 self.lasttimestamp = time.time() self.trackers = [] self.savers = [] self.requests = 0 self.trackerindex = 0 self.baseurl = None self.stateobj = HarvestManCrawlerState(self) self.configobj = objects.config self.url_q = PriorityQueue(self.configobj.queuesize) self.data_q = PriorityQueue(self.configobj.queuesize) # Local buffer self.buffer = [] # Lock for creating new threads self.cond = threading.Lock() # Flag indicating a forceful exit self.forcedexit = False # Sleep event self.evnt = SleepEvent(self.configobj.queuetime) def get_controller(self): """ Return the controller thread object """ return self.controller def configure(self): """ Configure this class """ try: self.baseurl = urlparser.HarvestManUrl(self.configobj.url, urltypes.URL_TYPE_ANY, 0, self.configobj.url, self.configobj.projdir) # Put the original hash of the start url in the class urlparser.HarvestManUrl.hashes[self.baseurl.index] = 1 # Reset index to zero self.baseurl.index = 0 objects.datamgr.add_url(self.baseurl) except urlparser.HarvestManUrlError: return False self.baseurl.starturl = True #if self.configobj.fastmode: try: self.basetracker = crawler.HarvestManUrlFetcher( 0, self.baseurl, True ) except Exception, e: print "Fatal Error:",e hexit(1) #else: # # Not much point keeping this code...! # # # Disable usethreads # self.configobj.usethreads = False # # Disable blocking # self.configobj.blocking = False # self.basetracker = crawler.HarvestManUrlDownloader( 0, self.baseurl, False ) self.trackers.append(self.basetracker) # Reset state object self.stateobj.reset() return True def mainloop(self): """ Main program loop which waits for the threads to do their work """ # print 'Waiting...' timeslot, tottime = 0.5, 0 pool = objects.datamgr.get_url_threadpool() while not self.stateobj.end_state(): self.stateobj.wait2(timeslot) tottime += timeslot if self.flag: break if pool: pool.wait(10.0, self.configobj.timeout) if self.stateobj.abortmsg: extrainfo(self.stateobj.abortmsg) if not self.forcedexit: self.end_threads() def endloop(self, forced=False): """ Exit the mainloop """ # Set flag to 1 to denote that downloading is finished. self.flag = 1 if forced: self.forcedexit = True # A forced exit happens when we exit because a # download limit is breached, so instruct connectors # to not save anything from here on... conndict = objects.connfactory.get_connector_dict() for conn in conndict.keys(): if conndict.get(conn): conn.blockwrite = True def restart(self): """ Alternate method to start from a previous restored state """ # Start harvestman controller thread import datamgr if self.configobj.enable_controller(): self.controller = datamgr.HarvestManController() self.controller.start() # Start base tracker self.basetracker.start() time.sleep(2.0) for t in self.trackers[1:]: try: t.start() except AssertionError, e: logconsole(e) pass self.mainloop() def crawl(self): """ Starts crawling for this project """ # Reset flag self.flag = 0 t1=time.time() # Clear the queues... self.url_q.clear() self.data_q.clear() # Push the first URL directly to the url queue self.url_q.put((self.baseurl.priority, self.baseurl)) # This is pushed to url queue, so increment crawler push... self.stateobj.cpush += 1 #if self.configobj.fastmode: # Start harvestman controller thread if self.configobj.enable_controller(): self.controller = datamgr.HarvestManController() self.controller.start() # Create the number of threads in the config file # Pre-launch the number of threads specified # in the config file. # Initialize thread dictionary self.stateobj.numfetchers = int(0.75*self.configobj.maxtrackers) self.stateobj.numcrawlers = self.configobj.maxtrackers - self.stateobj.numfetchers self.basetracker.setDaemon(True) self.basetracker.start() evt = SleepEvent(0.1) while self.stateobj.get(self.basetracker) == crawler.FETCHER_WAITING: evt.sleep() # Set start time on config object self.configobj.starttime = t1 del evt for x in range(1, self.stateobj.numfetchers): t = crawler.HarvestManUrlFetcher(x, None) self.add_tracker(t) t.setDaemon(True) t.start() for x in range(self.stateobj.numcrawlers): t = crawler.HarvestManUrlCrawler(x, None) self.add_tracker(t) t.setDaemon(True) t.start() self.mainloop() #else: # self.basetracker.action() def get_base_tracker(self): """ Get the base tracker object """ return self.basetracker def get_base_url(self): return self.baseurl def get_url_data(self, role): """ Pop url data from the queue """ if self.flag: return None obj = None blk = self.configobj.blocking slptime = self.configobj.queuetime ct = threading.currentThread() if role == 'crawler': if blk: obj=self.data_q.get() self.stateobj.set(ct, crawler.CRAWLER_GOT_DATA) else: self.stateobj.set(ct, crawler.CRAWLER_WAITING) try: obj = self.data_q.get(timeout=slptime) self.stateobj.set(ct, crawler.CRAWLER_GOT_DATA) except Empty, TypeError: obj = None elif role == 'fetcher' or role=='tracker': if blk: obj = self.url_q.get() self.stateobj.set(ct, crawler.FETCHER_GOT_DATA) else: self.stateobj.set(ct, crawler.FETCHER_WAITING) try: obj = self.url_q.get(timeout=slptime) self.stateobj.set(ct, crawler.FETCHER_GOT_DATA) except Empty, TypeError: obj = None self.lasttimestamp = time.time() self.requests += 1 return obj def add_tracker(self, tracker): self.trackers.append( tracker ) self.trackerindex += 1 def remove_tracker(self, tracker): self.trackers.remove(tracker) def dead_thread_callback(self, t): """ Call back function called by a thread if it dies with an exception. This class then creates a fresh thread, migrates the data of the dead thread to it """ try: debug('Trying to regenerate thread...') self.cond.acquire() # First find out the type role = t._role new_t = None if role == 'fetcher': new_t = crawler.HarvestManUrlFetcher(t.get_index(), None) elif role == 'crawler': new_t = crawler.HarvestManUrlCrawler(t.get_index(), None) # Migrate data and start thread if new_t: new_t._url = t._url new_t._urlobject = t._urlobject new_t.buffer = copy.deepcopy(t.buffer) # If this is a crawler get links also if role == 'crawler': new_t.links = t.links[:] # Replace dead thread in the list idx = self.trackers.index(t) self.trackers[idx] = new_t new_t.resuming = True new_t.start() debug('Regenerated thread...') return THREAD_MIGRATION_OK else: # Could not make new thread, so decrement # count of threads. # Remove from tracker list self.trackers.remove(t) if role == 'fetcher': self.stateobj.numfetchers -= 1 elif role == 'crawler': self.stateobj.numcrawlers -= 1 return THREAD_MIGRATION_ERROR finally: self.cond.release() def push(self, obj, role): """ Push trackers to the queue """ if self.flag: return ntries, status = 0, 0 ct = threading.currentThread() if role == 'crawler' or role=='tracker' or role =='downloader': # debug('Pushing stuff to buffer',ct) self.stateobj.set(ct, crawler.CRAWLER_PUSH_URL) while ntries < 5: try: ntries += 1 self.url_q.put((obj.priority, obj)) self.pushes += 1 status = 1 self.stateobj.set(ct, crawler.CRAWLER_PUSHED_URL) break except Full: self.evnt.sleep() elif role == 'fetcher': # print 'Pushing stuff to buffer', ct self.stateobj.set(ct, crawler.FETCHER_PUSH_URL) # stuff = (obj[0].priority, (obj[0].index, obj[1])) while ntries < 5: try: ntries += 1 self.data_q.put(obj) self.pushes += 1 status = 1 self.stateobj.set(ct, crawler.FETCHER_PUSHED_URL) break except Full: self.evnt.sleep() self.lasttimestamp = time.time() return status def end_threads(self): """ Stop all running threads and clean up the program. This function is called for a normal/abnormal exit of HravestMan """ extrainfo("Ending threads...") if self.configobj.project: if self.forcedexit: info('Terminating project ',self.configobj.project,'...') else: info("Ending Project", self.configobj.project,'...') # Stop controller if self.controller: self.controller.stop() if self.forcedexit: self._kill_tracker_threads() else: # Do a regular stop and join for t in self.trackers: try: t.stop() except Exception, e: pass # Wait till all threads report # to the state machine, with a # timeout of 5 minutes. extrainfo("Waiting for threads to finish up...") timeslot, tottime = 0.5, 0 while not self.stateobj.exit_state(): # print 'Waiting...' self.stateobj.wait2(timeslot) tottime += timeslot if tottime>=300.0: break pool = objects.datamgr.get_url_threadpool() if pool: pool.wait(10.0, 120.0) extrainfo("Done.") # print 'Done.' self.trackers = [] self.basetracker = None def _kill_tracker_threads(self): """ This function kills running tracker threads """ count =0 for tracker in self.trackers: count += 1 sys.stdout.write('...') if count % 10 == 0: sys.stdout.write('\n') try: tracker.stop() except AssertionError, e: logconsole(str(e)) except ValueError, e: logconsole(str(e)) except crawler.HarvestManUrlCrawlerException, e: pass
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -