⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 newratelimiter.py

📁 bittorrent source by python. please enjoy
💻 PY
字号:
# This was built to be like SFQ but turned out like round-robin.# (Why didn't you just use Deficit Round Robin?) --Dave# (Because of unitsize) --Greg## I call it Heirarchical Round Robin Bucket Percentage Style## by Greg Hazelimport timeimport tracebackfrom BitTorrent.platform import bttimefrom BitTorrent.DictWithLists import DictWithLists, OrderedDictWithLists# these are for logging and suchclass GlobalRate(object):    def __init__(self):        self.total = 0.0            self.start_time = bttime()        self.last_time = self.start_time    def print_rate(self, size):        self.total += size        this_time = bttime()        start_delta = this_time - self.start_time        this_delta = this_time - self.last_time        if start_delta > 0 and this_delta > 0:            print "UPLOAD: This:", size / this_delta, "Total:", self.total / start_delta        self.last_time = this_timeglobal_rate = GlobalRate()# very simple.# every call gives you the duration since the last call in tokensclass DeltaTokens(object):    def __init__(self, rate):        self.set_rate(rate)    def set_rate(self, rate):                self.rate = rate        # clear the history since the rate has changed and it could be way off        self.last_time = bttime()    # return the number of tokens you can have since the last call    def __call__(self):        new_time = bttime()        delta_time = new_time - self.last_time        # if last time was more than a second ago, we can't give a clear        # approximation since rate is in tokens per second.        delta_time = min(delta_time, 1.0)        if delta_time <= 0:            return 0        tokens = self.rate * delta_time        self.last_time = new_time        return tokens    # allows you to subtract tokens from DeltaTokens to compensate    def remove_tokens(self, x):        if self.rate == 0:            # shit, I don't know.            self.last_time += x        else:            self.last_time += x / self.rate    # returns the time until you'll get tokens again    def get_remaining_time(self):        return max(0, self.last_time - bttime())class Classifer(object):    def __init__(self):        self.channels = DictWithLists()    def add_data(self, keyable, func):        # hmm, this should rotate every 10 seconds or so, but moving over the        # old data is hard (can't write out-of-order)        #key = sha.sha(id(o)).hexdigest()[0]        # this is technically round-robin        key = keyable        self.channels.push_to_row(key, func)    def rem_data(self, key):        try:            l = self.channels.poprow(key)            l.clear()        except KeyError:            pass    def rotate_data(self):        # the removes the top-most row from the ordereddict        k = self.channels.iterkeys().next()        l = self.channels.poprow(k)                data = l.popleft()        # this puts the whole row at the bottom of the ordereddict        self.channels.setrow(k, l)                return data            def __len__(self):        return len(self.channels)class Scheduler(object):    def __init__(self, rate, add_task):        """@param rate: rate at which 'tokens' are generated.            @param add_task: callback to schedule an event.        """        self.add_task = add_task        self.classifier = Classifer()        self.delta_tokens = DeltaTokens(rate)        self.task = None        self.children = {}    def set_rate(self, rate, cascade=True):        self.delta_tokens.set_rate(rate)        if cascade:                    for child, scale in self.children.iteritems():                child.set_rate(rate * scale)        # the rate changed, so it's possible the loop is        # running slower than it needs to        self.restart_loop(0)    def add_child(self, child, scale):        self.children[child] = scale        child.set_rate(self.delta_tokens.rate * scale)            def remove_child(self, child):        del self.children[child]    def add_data(self, keyable, func):        self.classifier.add_data(keyable, func)        # kick off a loop since we have data now        self.restart_loop(0)    def restart_loop(self, t):        # check for pending loop event        if self.task and not self.task.called:            ## look at when it's scheduled to occur            # we can special case events which have a delta of 0, since they            # should occur asap. no need to check the time.            if self.task.delta == 0:                return            # use time.time since twisted does anyway            s = self.task.getTime() - time.time()            if s > t:                # if it would occur after the time we want, reset it                self.task.reset(t)                self.task.delta = t        else:            if t == 0:                # don't spin the event loop needlessly                self.run()            else:                self.task = self.add_task(t, self.run)                self.task.delta = t    def _write(self, to_write):        amount = 0        each = min(self.delta_tokens.rate, self.unitsize)        if self.children:            for child, scale in self.children.iteritems():                child.set_rate(self.delta_tokens.rate * scale, cascade=False)            i = 0                            while amount < to_write and len(self.classifier) > 0:                (func, args) = self.classifier.rotate_data()                # ERROR: func can fill buffers, so use the on_flush technique                try:                    amount += func(each)                except:                    # don't stop the loop if we hit an error                    traceback.print_exc()                i += 1                if i == len(self.children):                    break                            for child, scale in self.children.iteritems():                # really max, but we happen to know it can't exceed amount                child.set_rate(amount, cascade=False)        while amount < to_write and len(self.classifier) > 0:            func = self.classifier.rotate_data()            # ERROR: func can fill buffers, so use the on_flush technique            try:                amount += func(each)            except:                # don't stop the loop if we hit an error                traceback.print_exc()                    return amount    def _run_once(self):        f_to_write = self.delta_tokens()        to_write = int(f_to_write)        if to_write == 0:            written = 0        else:            written = self._write(to_write)            # for debugging            #print "Ideal:", self.delta_tokens.rate, f_to_write            #global_rate.print_rate(written)        self.delta_tokens.remove_tokens(written - f_to_write)        return written            def run(self):        if len(self.classifier) == 0:            return        self._run_once()        t = self.delta_tokens.get_remaining_time()        self.restart_loop(t)    # made to look like the originalclass MultiRateLimiter(Scheduler):    # Since data is sent to peers in a round-robin fashion, max one    # full request at a time, setting this higher would send more data    # to peers that use request sizes larger than standard 16 KiB.    # 17000 instead of 16384 to allow room for metadata messages.    max_unitsize = 17000        def __init__(self, sched, parent=None):        Scheduler.__init__(self, rate = 0, add_task = sched)        if parent == None:                    self.run()    def set_parameters(self, rate, unitsize=2**500):        self.set_rate(rate)        unitsize = min(unitsize, self.max_unitsize)        self.unitsize = unitsize    def queue(self, conn):        keyable = conn        self.add_data(keyable, conn.send_partial)            def dequeue(self, keyable):        self.classifier.rem_data(keyable)    def increase_offset(self, bytes):        # hackity hack hack        self.delta_tokens.remove_tokens(0 - bytes)class FakeConnection(object):    def __init__(self, gr):        self.gr = gr    def _use_length_(self, length):        def do():            return length        return self.write(do)    def write(self, fn, *args):        size = fn(*args)        self.gr.print_rate(size)        return size if __name__ == '__main__':    profile = True    try:        import hotshot        import hotshot.stats        prof_file_name = 'NewRateLimiter.prof'    except ImportError, e:        print "profiling not available:", e        profile = False    import random    from BitTorrent.RawServer_twisted import RawServer    from twisted.internet import task    from BitTorrent.defer import DeferredEvent    config = {}    rawserver = RawServer(config)    doneflag = DeferredEvent()         s = Scheduler(4096, add_task = rawserver.add_task)    s.unitsize = 17000    a = []    for i in xrange(500):        keyable = FakeConnection(global_rate)        a.append(keyable)    freq = 0.01    def push():        if random.randint(0, 5 / freq) == 0:            rate = random.randint(1, 100) * 1000            print "new rate", rate            s.set_rate(rate)        for c in a:                s.add_data(c, c._use_length_)        t = task.LoopingCall(push)    t.start(freq)        rawserver.install_sigint_handler()    ##############################################################    if profile:        try:            os.unlink(prof_file_name)        except:            pass        prof = hotshot.Profile(prof_file_name)        prof.start()        rawserver.listen_forever(doneflag)        prof.stop()            prof.close()        stats = hotshot.stats.load(prof_file_name)        stats.strip_dirs()        stats.sort_stats('time', 'calls')        print "NewRateLimiter Profile:"        stats.print_stats(20)    else:    ##############################################################        rawserver.listen_forever(doneflag)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -