📄 urltracker.py
字号:
except SGMLParseError, e:
debug(e)
except IOError,e:
print e
except htmlparser.HTMLParseError, e:
debug(e)
return None
class HarvestManUrlFetcher(HarvestManBaseUrlCrawler):
""" This is the fetcher class, which downloads data for a url
and writes its files. It also posts the data for web pages
to a data queue """
def __init___(self, index, url_obj = None, isThread=True):
HarvestManBaseUrlCrawler.__init__(self, index, url_obj, isThread)
def __str__(self):
return `self`
def _initialize(self):
HarvestManBaseUrlCrawler._initialize(self)
self._role = "fetcher"
def get_role(self):
return "fetcher"
def set_url_object(self, obj):
if obj is None: return None
try:
prior, url_obj = obj
except TypeError:
url_obj = obj
#if url_obj.is_image():
# info("Processing image url ->", url_obj.get_full_url())
HarvestManBaseUrlCrawler.set_url_object(self, url_obj)
def action(self):
if isinstance(self, threading.Thread):
self._loops = 0
while not self._endflag:
obj = self._crawlerqueue.get_url_data(self.get_role(), self.get_specific_role())
if obj is None: continue
self.set_url_object(obj)
# Set status to busy
self._status = 1
self.process_url()
self._status = 0
self._loops += 1
else:
self.process_url()
self.crawl_url()
def process_url(self):
""" This function downloads the data for a url and writes its files.
It also posts the data for web pages to a data queue """
filename = self._urlobject.get_full_filename()
# No need to download html?, return
if self._urlobject.is_webpage() and not self._configobj.html:
return None
if GetObject('ruleschecker').violates_download_rules(self._urlobject):
# Bug: Time taken to set download flag affects download of files
# especially when a url exclusion filter is active. So return
# right away
return None
mgr = GetObject('datamanager')
moreinfo('Downloading file for url ', self._url)
self._data = mgr.download_url(self._urlobject)
# Add webpage links in datamgr, if we managed to
# download the url
base_url_obj = self._urlobject.get_base_urlobject()
if self._urlobject.get_download_status()==0:
if base_url_obj and base_url_obj.is_webpage():
mgr.update_links(base_url_obj.get_full_filename(), self._urlobject)
if self._urlobject.is_webpage():
self._status = 2
import zlib
self._crawlerqueue.push((self._urlobject, zlib.compress(self._data)), 'fetcher')
return None
class PriorityQueue(Queue):
""" Priority queue based on bisect module (courtesy: Effbot) """
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
def _put(self, item):
bisect.insort(self.queue, item)
class HarvestManCrawlerQueue(object):
""" This class functions as the thread safe queue
for storing url data for tracker threads """
def __init__(self):
self._basetracker = None
self._flag = 0
self._pushes = 0
self._lockedinst = 0
self._lasttimestamp = time.time()
self._trackers = []
self._requests = 0
self._trackerindex = 0
self._lastblockedtime = 0
self._numfetchers = 0
self._numcrawlers = 0
self.__qsize = 0
self._baseUrlObj = None
# Time to wait for a data operation on the queue
# before stopping the project with a timeout.
self._waittime = GetObject('config').projtimeout
self._configobj = GetObject('config')
self.lck = threading.Condition(threading.Lock())
self.url_q = PriorityQueue(0)
self.data_q = PriorityQueue(0)
def __getattr__(self, name):
try:
return self.__dict__[name]
except KeyError:
return None
def __setattr__(self, name, value):
self.__dict__[name] = value
def increment_lock_instance(self, val=1):
self._lockedinst += val
def get_locked_instances(self):
return self._lockedinst
def configure(self):
""" Configure this class with this config object """
try:
self._baseUrlObj = urlparser.HarvestManUrlParser(self._configobj.url, 'normal',
0, self._configobj.url,
self._configobj.projdir)
except urlparser.HarvestManUrlParserError, e:
return -1
self._baseUrlObj.is_starting_url = True
self.push(self._baseUrlObj, 'crawler')
if self._configobj.fastmode:
iamathread=True
else:
iamathread=False
self._basetracker = HarvestManUrlFetcher( 0, self._baseUrlObj, iamathread )
self._trackers.append(self._basetracker)
def crawl(self):
""" Starts crawling for this project """
if os.name=='nt':
t1=time.clock()
else:
t1=time.time()
# Set start time on config object
self._configobj.starttime = t1
debug('Threads active = ', threading.activeCount())
debug('Maxium threads allowed = ', self._configobj.maxtrackers)
debug('Images = ', self._configobj.images)
debug('Usethreads = ', self._configobj.usethreads)
debug('Extdirs = ', self._configobj.epagelinks)
if self._configobj.fastmode:
# Create the number of threads in the config file
# Pre-launch the number of threads specified
# in the config file.
# Initialize thread dictionary
self._basetracker.setDaemon(True)
self._basetracker.start()
dataavlbl=False
for x in range(self._configobj.maxtrackers - 1):
# Designate every third thread as crawler
if x % 3==0:
t = HarvestManUrlCrawler(x+1, None)
else:
t = HarvestManUrlFetcher(x+1, None)
self.add_tracker(t)
t.setDaemon(True)
t.start()
# We launch x crawlers & 2*x fetchers
for t in self._trackers:
if t.get_role() == 'fetcher':
self._numfetchers += 1
elif t.get_role() == 'crawler':
self._numcrawlers += 1
# bug: give the threads some time to start,
# otherwise we exit immediately sometimes.
time.sleep(5.0)
while not self.is_exit_condition():
time.sleep(2.0)
# Set flag to 1 to denote that downloading is finished.
self._flag = 1
self.stop_trackers(noexit = True)
else:
self._basetracker.action()
def get_base_tracker(self):
""" Get the base tracker object """
return self._basetracker
def get_base_urlobject(self):
return self._baseUrlObj
def get_url_data(self, role, sp_role):
""" Pop url data from the queue """
if self._flag: return None
# moreinfo("Getting url object ...")
obj = None
if role == 'crawler':
obj = self.data_q.get()
elif role == 'fetcher' or role=='tracker':
obj = self.url_q.get()
self._lasttimestamp = time.time()
self._requests += 1
return obj
def __get_num_blocked_threads(self):
blocked = 0
for t in self._trackers:
if not t.is_busy(): blocked += 1
return blocked
def get_num_alive_threads(self):
live = 0
for t in self._trackers:
if t.isAlive(): live += 1
return live
def __get_num_locked_crawler_threads(self):
locked = 0
for t in self._trackers:
if t.get_role() == 'crawler':
if t.is_locked(): locked += 1
return locked
def __get_num_locked_fetcher_threads(self):
locked = 0
for t in self._trackers:
if t.get_role() == 'fetcher':
if t.is_locked(): locked += 1
return locked
def add_tracker(self, tracker):
self._trackers.append( tracker )
self._trackerindex += 1
def remove_tracker(self, tracker):
self._trackers.remove(tracker)
def get_last_tracker_index(self):
return self._trackerindex
def print_busy_tracker_info(self):
for t in self._trackers:
if t.is_busy():
print t,' =>', t.getUrl()
def is_locked_up(self, role):
""" The queue is considered locked up if all threads
are waiting to push data, but none can since queue
is already full, and no thread is popping data. This
is a deadlock condition as the program cannot go any
forward without creating new threads that will pop out
some of the data (We need to take care of it by spawning
new threads which can pop data) """
locked = 0
if role == 'fetcher':
locked = self.__get_num_locked_fetcher_threads()
if locked == self._numfetchers - 1:
return True
elif role == 'crawler':
locked = self.__get_num_locked_crawler_threads()
if locked == self._numcrawlers - 1:
return True
return False
def is_exit_condition(self):
""" Exit condition is when there are no download
sub-threads running and all the tracker threads
are blocked or if the project times out """
dmgr = GetObject('datamanager')
currtime = time.time()
last_thread_time = dmgr.last_download_thread_report_time()
if last_thread_time > self._lasttimestamp:
self._lasttimestamp = last_thread_time
timediff = currtime - self._lasttimestamp
is_blocked = self.is_blocked()
if is_blocked:
self._lastblockedtime = time.time()
has_running_threads = dmgr.has_download_threads()
timed_out = False
# If the trackers are blocked, but waiting for sub-threads
# to finish, kill the sub-threads.
if is_blocked and has_running_threads:
# Find out time difference between when trackers
# got blocked and curr time. If greater than 1 minute
# Kill hanging threads
timediff2 = currtime - self._lastblockedtime
if timediff2 > 60.0:
moreinfo("Killing download threads ...")
dmgr.kill_download_threads()
if is_blocked and not has_running_threads:
return True
if timediff > self._waittime:
timed_out = True
if timed_out:
moreinfo("Project", self._configobj.project, "timed out.")
moreinfo('(Time since last data operation was', timediff, 'seconds)')
return True
return False
def is_blocked(self):
""" The queue is considered blocked if all threads
are waiting for data, and no data is coming """
blocked = self.__get_num_blocked_threads()
if blocked == len(self._trackers):
return True
else:
return False
def is_fetcher_queue_full(self):
""" Check whether the fetcher queue is full """
if self.__get_num_locked_fetcher_threads() == self._numfetchers - 1:
return True
return False
def is_crawler_queue_full(self):
""" Check whether the crawler queue is full """
if self.__get_num_locked_crawler_threads() == self._numcrawlers - 1:
return True
return False
def push(self, obj, role):
""" Push trackers to the queue """
if self._flag: return
if role == 'crawler' or role=='tracker':
self.url_q.put((obj.get_priority(), obj))
elif role == 'fetcher':
# moreinfo('Putting data obtained from url', (obj[0]).get_full_url())
self.data_q.put((obj[0].get_priority(), obj))
self._pushes += 1
self._lasttimestamp = time.time()
def stop_trackers(self, noexit=False):
""" Stop all running trackers """
moreinfo("Ending Project", self._configobj.project,'...')
for t in self._trackers:
# t.join(0)
t.stop()
# Exit the system
if not noexit:
sys.exit(2)
def kill_trackers(self):
""" This function kills running tracker threads """
moreinfo('Killing tracker threads...')
self._flag=1
count =0
debug('Current running threads => ', threading.activeCount())
debug('Current tracker count => ', len(self._trackers))
extrainfo('Waiting for threads to clean up ...')
# Kill the individual download threads
mgr = GetObject('datamanager')
mgr.kill_download_threads()
for tracker in self._trackers:
count += 1
sys.stdout.write('...')
if count % 10 == 0: sys.stdout.write('\n')
try:
tracker.terminate()
del tracker
except HarvestManUrlCrawlerException, e:
pass
except AssertionError, e:
print e, '=> ', tracker
except ValueError, e:
print e, '=> ', tracker
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -