📄 urlthread.py
字号:
""" HarvestManUrlThread.py - Url thread downloader module.
Has two classes, one for downloading of urls and another
for managing the url threads.
This software is part of the HarvestMan(R) program.
Author: Anand B Pillai (anandpillai at letterboxes dot org).
Copyright (C) 2004-2005 Anand B Pillai.
Modification History
Jun 4-9 2004 Anand 1.4 development.
*Adopted a queue model for the sub
threads in this module similar to the
tracker threads. Instead of spawning new
threads for every download job, we reuse
the threads in the thread pool. The threads
are pre-spawned at the beginning of download.
Derived harvestManUrlThreadPool from Queue
and added method 'spawn_threads' to it.
Jun 14 2004 1.3.9 release.
"""
import os, sys
import math
import time
import threading
import cStringIO
from Queue import Queue
from common import *
# __metaclass__ = object
class HarvestManUrlThreadInterrupt(Exception):
""" Interrupt raised to kill a harvestManUrlThread class's object """
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
class harvestManUrlThread(threading.Thread):
""" Class to download a url in a separate thread """
def __init__(self, name, timeout, threadpool):
""" Constructor, the constructor takes a url, a filename
, a timeout value, and the thread pool object pooling this
thread """
# url Object (This is an instance of urlPathParser class)
self.__urlobject = None
# thread queue object pooling this thread
self.__pool = threadpool
# max lifetime for the thread
self.__timeout = timeout
# start time of thread
self.__starttime = 0
# sleep time
self.__sleepTime = 1.0
# error dictionary
self.__error = {}
# download status flag
self.__downloadstatus = 0
# busy flag
self.__busyflag = False
# end flag
self.__endflag = False
# initialize threading
threading.Thread.__init__(self, None, None, name)
def get_error(self):
""" Get error value of this thread """
return self.__error
def get_status(self):
""" Get the download status of this thread """
return self.__downloadstatus
def is_busy(self):
""" Get busy status for this thread """
return self.__busyflag
def join(self):
""" The thread's join method to be called
by other threads """
threading.Thread.join(self, self.__timeout)
def terminate(self):
""" Kill this thread """
self.stop()
msg = 'Download thread, ' + self.getName() + ' killed!'
raise HarvestManUrlThreadInterrupt, msg
def stop(self):
""" Stop this thread """
self.__endflag = True
def download(self, url_obj):
""" Download this url """
url = url_obj.get_full_url()
filename = url_obj.get_full_filename()
server = url_obj.get_domain()
if url_obj.is_image():
info('Downloading image ...', url)
extrainfo('Downloading url ...', url)
server = url_obj.get_domain()
conn_factory = GetObject('connectorfactory')
# This call will block if we exceed the number of connections
# moreinfo("Creating connector for url ", urlobj.get_full_url())
conn = conn_factory.create_connector( server )
res = conn.save_url(url_obj)
# Remove the connector from the factory
conn_factory.remove_connector(conn, server)
# Set this as download status
self.__downloadstatus = res
if res==0:
# get error flag from connector
self.__error=conn.get_error()
# Notify thread pool
self.__pool.notify(self)
if res != 0:
extrainfo('Finished download of ', url)
def run(self):
""" Run this thread """
while not self.__endflag:
if os.name=='nt' or sys.platform == 'win32':
self.__starttime=time.clock()
else:
self.__starttime=time.time()
totaltime = 0.0
url_obj = self.__pool.get_next_urltask()
# set busy flag to 1
self.__busyflag = True
# Save reference
self.__urlobject = url_obj
filename, url = url_obj.get_full_filename(), url_obj.get_full_url()
# print 'Thread => ', self.getName(), url
if not filename and not url:
return
self.download(url_obj)
# reset busyflag
self.__busyflag = False
def get_url(self):
if self.__urlobject:
return self.__urlobject.get_full_url()
return ""
def get_filename(self):
if self.__urlobject:
return self.__urlobject.get_full_filename()
return ""
def get_urlobject(self):
""" Return this thread's url object """
return self.__urlobject
def get_elapsed_time(self):
""" Get the time taken for this thread """
now=0.0
if os.name=='nt' or sys.platform=='win32':
now=time.clock()
else:
now=time.time()
fetchtime=float(math.ceil((now-self._starttime)*100)/100)
return fetchtime
def long_running(self):
""" Find out if this thread is running for a long time
(more than given timeout) """
# if any thread is running for more than <timeout>
# time, return TRUE
return (self.get_elapsed_time() > self.__timeout)
def set_timeout(self, value):
""" Set the timeout value for this thread """
self.__timeout = value
class harvestManUrlThreadPool(Queue):
""" Thread group/pool class to manage download threads """
def __init__(self):
""" Initialize this class """
# list of spawned threads
self.__threads = []
# list of url tasks
self.__tasks = []
# Maximum number of threads spawned
self.__numthreads=20
# Last thread report time
self._ltrt = 0.0
Queue.__init__(self, 0)
def spawn_threads(self):
""" Start the download threads """
cfg = GetObject('config')
num_threads = cfg.threadpoolsize
for x in range(num_threads):
name = 'Fetcher-'+ str(x+1)
fetcher = harvestManUrlThread(name, cfg.timeout, self)
fetcher.setDaemon(True)
# Append this thread to the list of threads
self.__threads.append(fetcher)
fetcher.start()
def download_urls(self, listofurlobjects):
""" Method to download a list of urls.
Each member is an instance of a urlPathParser class """
for urlinfo in listofurlobjects:
self.push(urlinfo)
def __get_num_blocked_threads(self):
blocked = 0
for t in self.__threads:
if not t.is_busy(): blocked += 1
return blocked
def is_blocked(self):
""" The queue is considered blocked if all threads
are waiting for data, and no data is coming """
blocked = self.__get_num_blocked_threads()
if blocked == len(self.__threads):
return True
else:
return False
def push(self, urlObj):
""" Push the url object and start downloading the url """
flag=0
# unpack the tuple
try:
filename, url = urlObj.get_full_filename(), urlObj.get_full_url()
except:
return None
# Find if any thread is already downloading the same
# url. We dont want to run duplicate threads for the
# same url :-)
# atwork = self.locate_busy_threads(url)
# if len(atwork):
# extrainfo('This job is delegated!')
# return
# check whether this file was already downloaded
if self.avoid_duplicates((filename, url)):
extrainfo('Found a duplicate for url', url,'...')
return
# Wait till we have a thread slot free, and push the
# current url's info when we get one
self.put( urlObj )
def get_next_urltask(self):
return self.get()
def notify(self, thread):
""" Method called by threads to notify that they
have finished """
# Mark the time stamp (last thread report time)
self._ltrt = time.time()
urlObj = thread.get_urlobject()
# if the thread failed, update failure stats on the data manager
dmgr = GetObject('datamanager')
err = thread.get_error()
if err.has_key('fatal') and not err['fatal']:
dmgr.update_failed_files( urlObj )
else:
# thread succeeded, increment file count stats on the data manager
dmgr.update_file_stats( urlObj, thread.get_status())
def has_busy_threads(self):
""" Return whether I have any busy threads """
val=0
for thread in self.__threads:
if thread.is_busy():
val += 1
return val
def locate_thread(self, url):
""" Find a thread which downloaded a certain url """
for x in self.__threads:
if not x.is_busy():
if x.get_url() == url:
return x
return None
def locate_busy_threads(self, url):
""" Find all threads which are downloading a certain url """
threads=[]
for x in self.__threads:
if x.is_busy():
if x.get_url() == url:
threads.append(x)
return threads
def avoid_duplicates(self, urlinfo):
""" Avoid downloading same url file twice.
It can happen that same url is linked from
different web pages. We query any thread which
has downloaded this url, and copy the file to
the file location of the new download request """
filename = urlinfo[0]
# Since the filename and count statistics are maintained
# on the monitor object, it makes sense to query it for
# the information on a downloaded file.
# Get data manager object
dmgr = GetObject('datamanager')
if dmgr.is_file_downloaded(filename):
url = urlinfo[1]
threads = self.locate_busy_threads(url)
for thread in threads:
extrainfo('Killing download thread ...', thread)
try:
thread.terminate()
# we need to catch this exception here
except HarvestManUrlThreadInterrupt:
pass
return 1
else:
return 0
def end_hanging_threads(self):
""" If any download thread is running for too long,
kill it, and remove it from the thread pool """
pool=[]
for thread in self.__threads:
if thread.long_running(): pool.append(thread)
for thread in pool:
extrainfo('Killing hanging thread ', thread)
# remove this thread from the thread list
self.__threads.remove(thread)
# kill it
try:
thread.terminate()
except HarvestManUrlThreadInterrupt, e:
pass
del thread
def end_all_threads(self):
""" Kill all running threads """
for thread in self.__threads:
try:
thread.terminate()
except HarvestManUrlThreadInterrupt, e:
print e
del thread
def remove_finished_threads(self):
""" Clean up all threads that have completed """
for thread in self.__threads:
if not thread.is_busy():
self.__threads.remove(thread)
del thread
def last_thread_report_time(self):
""" Return the last thread reported time """
return self._ltrt
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -