urlthread.py

来自「Harvestman-最新版本」· Python 代码 · 共 877 行 · 第 1/2 页

PY
877
字号
        """ The queue is considered blocked if all threads        are waiting for data, and no data is coming """        blocked = self._get_num_blocked_threads()        if blocked == len(self._threads):            return True        else:            return False    def push(self, urlObj):        """ Push the url object and start downloading the url """        # print 'Pushed',urlObj.get_full_url()        # unpack the tuple        try:            filename, url = urlObj.get_full_filename(), urlObj.get_full_url()        except:            return        # Wait till we have a thread slot free, and push the        # current url's info when we get one        try:            self.put( urlObj )            urlObj.qstatus = urlparser.URL_IN_QUEUE                        # If this URL was multipart, mark it as such            self._multipartstatus[url] = MULTIPART_DOWNLOAD_QUEUED        except Full:            self.buffer.append(urlObj)            def get_next_urltask(self):        # Insert a random sleep in range        # of 0 - 0.5 seconds        # time.sleep(random.random()*0.5)        try:            if len(self.buffer):                # Get last item from buffer                item = buffer.pop()                return item            else:                # print 'Waiting to get item',threading.currentThread()                item = self.get()                return item                    except Empty:            return None    def notify(self, thread):        """ Method called by threads to notify that they        have finished """        try:            self._cond.acquire()            # Mark the time stamp (last thread report time)            self._ltrt = time.time()            urlObj = thread.get_urlobject()                        # See if this was a multi-part download            if urlObj.trymultipart:                status = thread.get_status()                if status == CONNECT_YES_DOWNLOADED:                    extrainfo('Thread %s reported %s' % (thread, urlObj.get_full_url()))                    # For flush mode, get the filename                    # for memory mode, get the data                    datamode = self._cfg.datamode                    fname, data = '',''                    if datamode == CONNECTOR_DATA_MODE_FLUSH:                        fname = thread.get_tmpfname()                        datalen = os.path.getsize(fname)                    else:                        data = thread.get_data()                        datalen = len(data)                                            # See if the data was downloaded fully...,else reschedule this piece                    expected = (urlObj.range[-1] - urlObj.range[0]) + 1                    if datalen != expected:                        extrainfo("Expected: %d, Got: %d" % (expected, datalen))                        extrainfo("Thread %s did only a partial download, rescheduling this piece..." % thread)                        if self._monitor:                            # print 'Notifying failure',thread                            self._monitor.notify_failure(urlObj, thread)                            return                                            index = urlObj.mirror_url.index                    # print 'Index=>',index                                        if index in self._multipartdata:                        infolist = self._multipartdata[index]                        if data:                            infolist.append((urlObj.range[0],data))                        elif fname:                            infolist.append((urlObj.range[0],fname))                                            else:                        infolist = []                        if data:                            infolist.append((urlObj.range[0],data))                        elif fname:                            infolist.append((urlObj.range[0],fname))                        #else:                        #    self._parts -= 1 # AD-HOC                        self._multipartdata[index] = infolist                    # print 'Length of data list is',len(infolist),self._parts                    if len(infolist)==self._parts:                        # Sort the data list  according to byte-range                        infolist.sort()                        # Download of this URL is complete...                        logconsole('Download of %s is complete...' % urlObj.get_full_url())                        if datamode == CONNECTOR_DATA_MODE_INMEM:                            data = ''.join([item[1] for item in infolist])                            self._multipartdata['data:' + str(index)] = data                        else:                            pass                        self._multipartstatus[index] = MULTIPART_DOWNLOAD_COMPLETED                else:                    # Currently when a thread reports an error, we abort the download                    # In future, we can inspect whether the error is fatal or not                    # and resume download in another thread etc...                    extrainfo('Thread %s reported error => %s' % (str(thread), str(thread.get_error())))                    if self._monitor:                        # print 'Notifying failure',thread                        self._monitor.notify_failure(urlObj, thread)                        # print 'Notified failure',thread                                    # if the thread failed, update failure stats on the data manager            err = thread.get_error()            tstatus = thread.get_status()            # Either file was fetched or file was uptodate            if err.number in (0, 304):                # thread succeeded, increment file count stats on the data manager                objects.datamgr.update_file_stats( urlObj, tstatus)        finally:            self._cond.release()    def has_busy_threads(self):        """ Return whether I have any busy threads """        val=0        for thread in self._threads:            if thread.is_busy():                val += 1                break                    return val    def get_busy_threads(self):        """ Return a list of busy threads """        return [thread for thread in self._threads if thread.is_busy()]    def get_busy_count(self):        """ Return a count of busy threads """        return len(self.get_busy_threads())    def get_busy_figure(self):        s=''        for t in self._threads:            if t.is_busy():                s=s + t.getName().split('-')[-1] + ' '        return s    def wait(self, period, timeout):        # Wait for the pool to signal that there        # are no more busy threads        # Note: timeout must be > period        count = 0.0        while self.has_busy_threads():            self._endcond.acquire()                        try:                self._endcond.wait(period)                count += period                self.end_hanging_threads()            except IOError, e:                break            self._endcond.release()            if count>=timeout:                break        def locate_thread(self, url):        """ Find a thread which downloaded a certain url """        for x in self._threads:            if not x.is_busy():                if x.get_url() == url:                    return x        return None    def locate_busy_threads(self, url):        """ Find all threads which are downloading a certain url """        threads=[]        for x in self._threads:            if x.is_busy():                if x.get_url() == url:                    threads.append(x)        return threads    def check_duplicates(self, urlobj):        """ Avoid downloading same url file twice.        It can happen that same url is linked from        different web pages """        filename = urlobj.get_full_filename()        url = urlobj.get_full_url()        # First check if any thread is in the process        # of downloading this url.        if self.locate_thread(url):            debug('Another thread is downloading %s' % url)            return True        return False    def end_hanging_threads(self):        """ If any download thread is running for too long,        kill it, and remove it from the thread pool """        pool=[]        for thread in self._threads:            if thread.long_running(): pool.append(thread)        for thread in pool:            extrainfo('Killing hanging thread ', thread)            # remove this thread from the thread list            self._threads.remove(thread)            # kill it            try:                thread.terminate()            except HarvestManUrlThreadInterrupt:                pass            del thread    def end_all_threads(self):        """ Kill all running threads """        try:            self._cond.acquire()            for t in self._threads:                try:                    t.terminate()                    t.join()                except HarvestManUrlThreadInterrupt, e:                    debug(str(e))                    pass            self._threads = []        finally:            self._cond.release()    def stop_all_threads(self):        """ Stop all running threads """        # Same as end_all_threads but only that        # we don't print the killed message.        try:            self._cond.acquire()            for t in self._threads:                try:                    t.terminate()                    t.join()                except HarvestManUrlThreadInterrupt, e:                    pass            self._threads = []        finally:            self._cond.release()    def remove_finished_threads(self):        """ Clean up all threads that have completed """        for thread in self._threads:            if not thread.is_busy():                self._threads.remove(thread)                del thread    def last_thread_report_time(self):        """ Return the last thread reported time """        return self._ltrt        def get_multipart_download_status(self, url):        """ Get status of multipart downloads """        # If a thread has failed, signal exit        if self._multiparterror:            return MULTIPART_DOWNLOAD_ERROR        else:            return self._multipartstatus.get(url.index, MULTIPART_DOWNLOAD_STATUS_UNKNOWN)    def get_multipart_url_data(self, url):        """ Return data for multipart downloads """        return self._multipartdata.get('data:'+ str(url.index), '')    def get_multipart_url_info(self, url):        """ Return information for multipart downloads """        return self._multipartdata.get(url.index, '')    def dead_thread_callback(self, t):        """ Call back function called by a thread if it        dies with an exception. This class then creates        a fresh thread, migrates the data of the dead        thread to it """        try:            self._cond.acquire()            new_t = HarvestManUrlThread(t.getName(), self._timeout, self)            # Migrate data and start thread            if new_t:                new_t.set_urlobject(t.get_urlobject())                # Replace dead thread in the list                idx = self._threads.index(t)                self._threads[idx] = new_t                new_t.start()            else:                # Could not make new thread, remove                # current thread anyway                self._threads.remove(t)        finally:            self._cond.release()                                        def get_threads(self):        """ Return the list of thread objects """        return self._threads    def get_thread_urls(self):        """ Return a list of current URLs being downloaded """        # This returns a list of URL objects, not URL strings        urlobjs = []        for t in self._threads:            if t.is_busy():                urlobj = t.get_urlobject()                if urlobj: urlobjs.append(urlobj)        return urlobjs    def reset_multipart_data(self):        """ Reset multipart related state """        self._multiparterror = False        self._multipartdata.clear()        self._multipartdata = {}        self._multipartstatus.clear()        self._multipartstatus = {}                class HarvestManUrlThreadPoolMonitor(threading.Thread):    def __init__(self, threadpool):        self._pool = threadpool        self._pool._monitor = self        self.lock = threading.Lock()        self._failedurls = []        self._listfailed = []        self._flag = False        # Mirror manager        self.mirrormgr = HarvestManMirrorManager.getInstance()        # initialize threading        threading.Thread.__init__(self, None, None, "Monitor")            def run(self):        while not self._flag:            try:                self.lock.acquire()                items = []                self._failedurls = self._listfailed[:]                                for urlobj, urlerror in self._failedurls:                    # Reset URL to parent and try again...                    if urlobj.mirrored:                        # Try getting a new mirror URL                        new_urlobj = self.mirrormgr.get_different_mirror_url(urlobj, urlerror)                                                if new_urlobj:                            extrainfo("New mirror URL=>", new_urlobj.get_full_url())                            items.append((urlobj, urlerror))                            self._pool.push(new_urlobj)                        else:                            logconsole('Could not find new working mirror. Exiting...')                            self._pool._multiparterror = True                            self._listfailed = []                            break                    else:                        logconsole("URL is not mirrored, so no new mirrors to try. Exiting...")                        self._pool._multiparterror = True                        break                for item in items:                    self._listfailed.remove(item)                self.lock.release()                time.sleep(0.1)            finally:                pass                def notify_failure(self, urlobj, thread):        self.lock.acquire()        self._listfailed.append((urlobj, thread.get_error()))        self.lock.release()    def stop(self):        self._flag = True    def reset(self):        """ Reset the state """        self._listfailed = []        self._failedurls = []        

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?