launchmanycore.py
来自「这是一个嵌入式linux系统下的命令工具包」· Python 代码 · 共 382 行
PY
382 行
#!/usr/bin/env python# Written by John Hoffman# see LICENSE.txt for license informationfrom BitTornado import PSYCOif PSYCO.psyco: try: import psyco assert psyco.__version__ >= 0x010100f0 psyco.full() except: passfrom download_bt1 import BT1Downloadfrom RawServer import RawServer, UPnP_ERRORfrom RateLimiter import RateLimiterfrom ServerPortHandler import MultiHandlerfrom parsedir import parsedirfrom natpunch import UPnP_testfrom random import seedfrom socket import error as socketerrorfrom threading import Eventfrom sys import argv, exitimport sys, osfrom clock import clockfrom __init__ import createPeerID, mapbase64, versionfrom cStringIO import StringIOfrom traceback import print_exctry: Trueexcept: True = 1 False = 0def fmttime(n): try: n = int(n) # n may be None or too large assert n < 5184000 # 60 days except: return 'downloading' m, s = divmod(n, 60) h, m = divmod(m, 60) return '%d:%02d:%02d' % (h, m, s)class SingleDownload: def __init__(self, controller, hash, response, config, myid): self.controller = controller self.hash = hash self.response = response self.config = config self.doneflag = Event() self.waiting = True self.checking = False self.working = False self.seed = False self.closed = False self.status_msg = '' self.status_err = [''] self.status_errtime = 0 self.status_done = 0.0 self.rawserver = controller.handler.newRawServer(hash, self.doneflag) d = BT1Download(self.display, self.finished, self.error, controller.exchandler, self.doneflag, config, response, hash, myid, self.rawserver, controller.listen_port) self.d = d def start(self): if not self.d.saveAs(self.saveAs): self._shutdown() return self._hashcheckfunc = self.d.initFiles() if not self._hashcheckfunc: self._shutdown() return self.controller.hashchecksched(self.hash) def saveAs(self, name, length, saveas, isdir): return self.controller.saveAs(self.hash, name, saveas, isdir) def hashcheck_start(self, donefunc): if self.is_dead(): self._shutdown() return self.waiting = False self.checking = True self._hashcheckfunc(donefunc) def hashcheck_callback(self): self.checking = False if self.is_dead(): self._shutdown() return if not self.d.startEngine(ratelimiter = self.controller.ratelimiter): self._shutdown() return self.d.startRerequester() self.statsfunc = self.d.startStats() self.rawserver.start_listening(self.d.getPortHandler()) self.working = True def is_dead(self): return self.doneflag.isSet() def _shutdown(self): self.shutdown(False) def shutdown(self, quiet=True): if self.closed: return self.doneflag.set() self.rawserver.shutdown() if self.checking or self.working: self.d.shutdown() self.waiting = False self.checking = False self.working = False self.closed = True self.controller.was_stopped(self.hash) if not quiet: self.controller.died(self.hash) def display(self, activity = None, fractionDone = None): # really only used by StorageWrapper now if activity: self.status_msg = activity if fractionDone is not None: self.status_done = float(fractionDone) def finished(self): self.seed = True def error(self, msg): if self.doneflag.isSet(): self._shutdown() self.status_err.append(msg) self.status_errtime = clock()class LaunchMany: def __init__(self, config, Output): try: self.config = config self.Output = Output self.torrent_dir = config['torrent_dir'] self.torrent_cache = {} self.file_cache = {} self.blocked_files = {} self.scan_period = config['parse_dir_interval'] self.stats_period = config['display_interval'] self.torrent_list = [] self.downloads = {} self.counter = 0 self.doneflag = Event() self.hashcheck_queue = [] self.hashcheck_current = None self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'], config['timeout'], ipv6_enable = config['ipv6_enabled'], failfunc = self.failed, errorfunc = self.exchandler) upnp_type = UPnP_test(config['upnp_nat_access']) while True: try: self.listen_port = self.rawserver.find_and_bind( config['minport'], config['maxport'], config['bind'], ipv6_socket_style = config['ipv6_binds_v4'], upnp = upnp_type, randomizer = config['random_port']) break except socketerror, e: if upnp_type and e == UPnP_ERROR: self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP') upnp_type = 0 continue self.failed("Couldn't listen - " + str(e)) return self.ratelimiter = RateLimiter(self.rawserver.add_task, config['upload_unit_size']) self.ratelimiter.set_upload_rate(config['max_upload_rate']) self.handler = MultiHandler(self.rawserver, self.doneflag) seed(createPeerID()) self.rawserver.add_task(self.scan, 0) self.rawserver.add_task(self.stats, 0) self.handler.listen_forever() self.Output.message('shutting down') self.hashcheck_queue = [] for hash in self.torrent_list: self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"') self.downloads[hash].shutdown() self.rawserver.shutdown() except: data = StringIO() print_exc(file = data) Output.exception(data.getvalue()) def scan(self): self.rawserver.add_task(self.scan, self.scan_period) r = parsedir(self.torrent_dir, self.torrent_cache, self.file_cache, self.blocked_files, return_metainfo = True, errfunc = self.Output.message) ( self.torrent_cache, self.file_cache, self.blocked_files, added, removed ) = r for hash, data in removed.items(): self.Output.message('dropped "'+data['path']+'"') self.remove(hash) for hash, data in added.items(): self.Output.message('added "'+data['path']+'"') self.add(hash, data) def stats(self): self.rawserver.add_task(self.stats, self.stats_period) data = [] for hash in self.torrent_list: cache = self.torrent_cache[hash] if self.config['display_path']: name = cache['path'] else: name = cache['name'] size = cache['length'] d = self.downloads[hash] progress = '0.0%' peers = 0 seeds = 0 seedsmsg = "S" dist = 0.0 uprate = 0.0 dnrate = 0.0 upamt = 0 dnamt = 0 t = 0 if d.is_dead(): status = 'stopped' elif d.waiting: status = 'waiting for hash check' elif d.checking: status = d.status_msg progress = '%.1f%%' % (d.status_done*100) else: stats = d.statsfunc() s = stats['stats'] if d.seed: status = 'seeding' progress = '100.0%' seeds = s.numOldSeeds seedsmsg = "s" dist = s.numCopies else: if s.numSeeds + s.numPeers: t = stats['time'] if t == 0: # unlikely t = 0.01 status = fmttime(t) else: t = -1 status = 'connecting to peers' progress = '%.1f%%' % (int(stats['frac']*1000)/10.0) seeds = s.numSeeds dist = s.numCopies2 dnrate = stats['down'] peers = s.numPeers uprate = stats['up'] upamt = s.upTotal dnamt = s.downTotal if d.is_dead() or d.status_errtime+300 > clock(): msg = d.status_err[-1] else: msg = '' data.append(( name, status, progress, peers, seeds, seedsmsg, dist, uprate, dnrate, upamt, dnamt, size, t, msg )) stop = self.Output.display(data) if stop: self.doneflag.set() def remove(self, hash): self.torrent_list.remove(hash) self.downloads[hash].shutdown() del self.downloads[hash] def add(self, hash, data): c = self.counter self.counter += 1 x = '' for i in xrange(3): x = mapbase64[c & 0x3F]+x c >>= 6 peer_id = createPeerID(x) d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id) self.torrent_list.append(hash) self.downloads[hash] = d d.start() def saveAs(self, hash, name, saveas, isdir): x = self.torrent_cache[hash] style = self.config['saveas_style'] if style == 1 or style == 3: if saveas: saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])]) else: saveas = x['path'][:-1-len(x['type'])] if style == 3: if not os.path.isdir(saveas): try: os.mkdir(saveas) except: raise OSError("couldn't create directory for "+x['path'] +" ("+saveas+")") if not isdir: saveas = os.path.join(saveas, name) else: if saveas: saveas = os.path.join(saveas, name) else: saveas = os.path.join(os.path.split(x['path'])[0], name) if isdir and not os.path.isdir(saveas): try: os.mkdir(saveas) except: raise OSError("couldn't create directory for "+x['path'] +" ("+saveas+")") return saveas def hashchecksched(self, hash = None): if hash: self.hashcheck_queue.append(hash) if not self.hashcheck_current: self._hashcheck_start() def _hashcheck_start(self): self.hashcheck_current = self.hashcheck_queue.pop(0) self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback) def hashcheck_callback(self): self.downloads[self.hashcheck_current].hashcheck_callback() if self.hashcheck_queue: self._hashcheck_start() else: self.hashcheck_current = None def died(self, hash): if self.torrent_cache.has_key(hash): self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"') def was_stopped(self, hash): try: self.hashcheck_queue.remove(hash) except: pass if self.hashcheck_current == hash: self.hashcheck_current = None if self.hashcheck_queue: self._hashcheck_start() def failed(self, s): self.Output.message('FAILURE: '+s) def exchandler(self, s): self.Output.exception(s)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?