⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lrucache.py

📁 Harvestman-最新版本
💻 PY
字号:
# -- coding: utf-8"""lrucache.py - Length-limited O(1) LRU cache implementationAuthor: Anand B Pillai <abpillai at gmail dot com>    Created Anand B Pillai Jun 26 2007 from ASPN Python Cookbook recipe #252524.{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/252524}Original code courtesy Josiah Carlson.Copyright (C) 2007, Anand B Pillai."""import copyimport cPickle, os, sysimport timeimport cStringIOfrom threading import Semaphorefrom dictcache import DictCacheclass Node(object):    # __slots__ = ['prev', 'next', 'me']    def __init__(self, prev, me):        self.prev = prev        self.me = me        self.next = None    def __copy__(self):        n = Node(self.prev, self.me)        n.next = self.next        return n    #def __getstate__(self):    #    return self    class LRU(object):    """    Implementation of a length-limited O(1) LRU queue.    Built for and used by PyPE:    http://pype.sourceforge.net    Copyright 2003 Josiah Carlson.    """    def __init__(self, count, pairs=[]):        self.count = max(count, 1)        self.d = {}        self.first = None        self.last = None        for key, value in pairs:            self[key] = value    def __copy__(self):        lrucopy = LRU(self.count)        lrucopy.first = copy.copy(self.first)        lrucopy.last = copy.copy(self.last)        lrucopy.d = self.d.copy()        for key,value in self.iteritems():            lrucopy[key] = value        return lrucopy            def __contains__(self, obj):        return obj in self.d    def __getitem__(self, obj):        a = self.d[obj].me        self[a[0]] = a[1]        return a[1]    def __setitem__(self, obj, val):        if obj in self.d:            del self[obj]        nobj = Node(self.last, (obj, val))        if self.first is None:            self.first = nobj        if self.last:            self.last.next = nobj        self.last = nobj        self.d[obj] = nobj        if len(self.d) > self.count:            if self.first == self.last:                self.first = None                self.last = None                return            a = self.first            if a:                if a.next:                    a.next.prev = None                    self.first = a.next                    a.next = None                    try:                       del self.d[a.me[0]]                    except KeyError:                       pass                del a    def __delitem__(self, obj):        nobj = self.d[obj]        if nobj.prev:            nobj.prev.next = nobj.next        else:            self.first = nobj.next        if nobj.next:            nobj.next.prev = nobj.prev        else:            self.last = nobj.prev        del self.d[obj]    def __iter__(self):        cur = self.first        while cur != None:            cur2 = cur.next            yield cur.me[1]            cur = cur2    def iteritems(self):        cur = self.first        while cur != None:            cur2 = cur.next            yield cur.me            cur = cur2    def iterkeys(self):        return iter(self.d)    def itervalues(self):        for i,j in self.iteritems():            yield j    def keys(self):        return self.d.keys()    def clear(self):        self.d.clear()    def __len__(self):        return len(self.d)class LRU2(object):    """    Implementation of a length-limited O(1) LRU queue    with disk caching.     """    # This LRU drops off items to a disk dictionary cache    # when older items are dropped.     def __init__(self, count, freq, cachedir='', pairs=[]):        self.count = max(count, 1)        self.d = {}        self.lastmutex = Semaphore(1)        self.first = None        self.last = None        for key, value in pairs:            self[key] = value        # Set the frequency to something like 1/100th of        # the expected dictionary final size to achieve        # best performance.        self.diskcache = DictCache(freq, cachedir)            def __copy__(self):        lrucopy = LRU(self.count)        lrucopy.first = copy.copy(self.first)        lrucopy.last = copy.copy(self.last)        lrucopy.d = self.d.copy()        for key,value in self.iteritems():            lrucopy[key] = value        return lrucopy            def __contains__(self, obj):        return obj in self.d    def __getitem__(self, obj):       try:           a = self.d[obj].me           self[a[0]] = a[1]           return a[1]       except (KeyError,AttributeError):           return self.diskcache[obj]            def __setitem__(self, obj, val):        if obj in self.d:            del self[obj]        nobj = Node(self.last, (obj, val))        if self.first is None:            self.first = nobj        self.lastmutex.acquire()        try:            if self.last:               self.last.next = nobj            self.last = nobj        except:            pass        self.lastmutex.release()        self.d[obj] = nobj        if len(self.d) > self.count:            self.lastmutex.acquire()            try:                if self.first == self.last:                    self.first = None                    self.last = None                    self.lastmutex.release()                    return            except:                pass            self.lastmutex.release()            a = self.first            if a:                if a.next:                    a.next.prev = None                self.first = a.next                a.next = None            try:                key, val = a.me[0], self.d[a.me[0]]                del self.d[a.me[0]]                del a                self.diskcache[key] = val.me[1]            except (KeyError,AttributeError):                pass    def __delitem__(self, obj):        nobj = self.d[obj]        if nobj.prev:            nobj.prev.next = nobj.next        else:            self.first = nobj.next        if nobj.next:            nobj.next.prev = nobj.prev        else:            self.last = nobj.prev        del self.d[obj]    def __iter__(self):        cur = self.first        while cur != None:            cur2 = cur.next            yield cur.me[1]            cur = cur2    def iteritems(self):        cur = self.first        while cur != None:            cur2 = cur.next            yield cur.me            cur = cur2    def iterkeys(self):        return iter(self.d)    def itervalues(self):        for i,j in self.iteritems():            yield j    def keys(self):        return self.d.keys()    def clear(self):        self.d.clear()        self.diskcache.clear()    def __len__(self):        return len(self.d)    def get_stats(self):        """ Return statistics as a dictionary """        return self.diskcache.get_stats()        def test(self, N):        # Test to see if the diskcache works. Pass        # the total number of items added to this        # function...                flag = True                for x in range(N):            if self[x] == None:                flag = False                break        return flagdef test_lru2():    import random        n1, n2 = 10000, 5000        l=LRU2(n1, 100)    for x in range(n1):        l[x] = x + 1 #urlparser.HarvestManUrlParser('htt://www.python.org/doc/current/tut/tut.html')    # make use of first n2 random entries    for x in range(n2):        l[random.randint(0,n2)]            # Add another n2 entries    # This will cause the LRU to drop    # entries and cache old entries.    for x in range(n2):        l[n1+x] = x + 1 #urlparser.HarvestManUrlParser('htt://www.python.org/doc/current/tut/tut.html') #  x + 1    print l.test(n1+n2)    print 'Random access test...'    # Try to access random entries    for x in range(n1+n2):        # A random access will take more time since in-mem        # cache will be emptied more often        l[random.randint(0,n1+n2-1)]    print    print "Disk Hits",l.diskcache.dhits    print "Mem Hits",l.diskcache.mhits    print "Temp dict Hits",l.diskcache.thits        print "Time taken",l.diskcache.t    print 'Hit %=>',100*float(l.diskcache.dhits)/float(n1+n2)    print 'Time per disk cache hit=>',float(l.diskcache.t)/float(l.diskcache.dhits)    print 'Average disk access time=>',float(l.diskcache.t)/float(len(l.diskcache))        l.diskcache.clear_counters()    print 'Sequential access test...'        for x in range(n1+n2):        # A sequential access will be faster since in-mem cache        # will be hit sequentially...        l[x]            print        print "Disk Hits",l.diskcache.dhits    print "Mem Hits",l.diskcache.mhits    print "Temp dict Hits",l.diskcache.thits        print "Time taken",l.diskcache.t    print 'Hit %=>',100*float(l.diskcache.dhits)/float(n1+n2)    print 'Time per disk cache hit=>',float(l.diskcache.t)/float(l.diskcache.dhits)        print 'Average disk access time=>',float(l.diskcache.t)/float(len(l.diskcache))            l.clear()if __name__=="__main__":    test_lru2() ##    l = LRU2(10)##     for x in range(10):##         l[x] = x##     print l.keys()##     print l[3]##     print l[3]##     print l[9]##     print l[9]    ##     l[12]=11##     l[13]=12##     l[14]=15##     l[15]=16##     l[16]=17##     l[17]=18##     l[18]=19##     l[19]=20##     print l.keys()##     print len(l)##     print l[0]##     print l[1]##     print l[2]    ##     print copy.copy(l).keys()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -