common.py

来自「Harvestman-最新版本」· Python 代码 · 共 604 行 · 第 1/2 页

PY
604
字号
# -- coding: utf-8""" common.py - Global functions for HarvestMan Program.    This file is part of the HarvestMan software.    For licensing information, see file LICENSE.TXT.    Author: Anand B Pillai <abpillai at gmail dot com>    Created: Jun 10 2003    Aug 17 2006          Anand          Modifications for the new logging                                        module.    Feb 7 2007           Anand          Some changes. Added logconsole                                        function. Split Initialize() to                                        InitConfig() and InitLogger().    Feb 26 2007          Anand          Replaced urlmappings dictionary                                        with a WeakValueDictionary.   Copyright (C) 2004 - Anand B Pillai."""__version__ = '2.0 b1'__author__ = 'Anand B Pillai'import weakrefimport os, sysimport socketimport binasciiimport copyimport threadingimport shelveimport cStringIOimport tracebackimport threadingimport collectionsimport randomimport cStringIOimport tokenizefrom types import *from singleton import Singletonclass Alias(Singleton):    def __getattr__(self, name):        try:            return super(Alias, self).__getattr__(name)        except AttributeError:            return None    passclass AliasError(Exception):    passclass GlobalData(Singleton):    def __getattr__(self, name):        try:            return super(Alias, self).__getattr__(name)        except AttributeError:            return None# Namespace for global unique objects# This varible holds each global object in HarvestMan# If any module redefines an 'objects' variable locally, it# is doing at its own peril!objects = Alias()# Namespace for global dataglobaldata = GlobalData()globaldata.userdebug = []class SleepEvent(object):    """ A class representing a timeout event. This can be    used to passively wait for a given time-period instead of    using time.sleep(...) """    def __init__(self, sleeptime):        self._sleeptime = sleeptime        self.evt = threading.Event()        self.evt.set()    def sleep(self):        self.evt.clear()        self.evt.wait(self._sleeptime)        self.evt.set()class RandomSleepEvent(SleepEvent):    """ A class representing a timeout event. This can be    used to passively wait for a given time-period instead of    using time.sleep(...) """    def sleep(self):        self.evt.clear()        self.evt.wait(random.random()*self._sleeptime)        self.evt.set()    class DummyStderr(object):    """ A dummy class to imitate stderr """        def write(self, msg):        passclass CaselessDict(dict):    def __init__(self, mapping=None):        if mapping:            if type(mapping) is dict:                for k,v in d.items():                    self.__setitem__(k, v)            elif type(mapping) in (list, tuple):                d = dict(mapping)                for k,v in d.items():                    self.__setitem__(k, v)                            # super(CaselessDict, self).__init__(d)            def __setitem__(self, name, value):        if type(name) in StringTypes:            super(CaselessDict, self).__setitem__(name.lower(), value)        else:            super(CaselessDict, self).__setitem__(name, value)    def __getitem__(self, name):        if type(name) in StringTypes:            return super(CaselessDict, self).__getitem__(name.lower())        else:            return super(CaselessDict, self).__getitem__(name)    def __copy__(self):        pass            class Ldeque(collections.deque):    """ Length-limited deque """        def __init__(self, count=10):        self.max = count        super(Ldeque, self).__init__()    def append(self, item):        super(Ldeque, self).append(item)        if len(self)>self.max:            # if size exceeds, pop from left            self.popleft()    def appendleft(self, item):        super(Ldeque, self).appendleft(item)        if len(self)>self.max:            # if size exceeds, pop from right            self.pop()                def index(self, item):        """ Return the index of an item from the deque """                return list(self).index(item)    def remove(self, item):        """ Remove an item from the deque """                idx = self.index(item)        self.__delitem__(idx)      def SysExceptHook(typ, val, tracebak):    """ Dummy function to replace sys.excepthook """    passdef SetAlias(obj):    """ Set unique alias for the object """    # Alias is another name for the object, it should be unique    # The object's class should have a field name 'alias'    if getattr(obj, 'alias') == None:        raise AliasError, "object does not define 'alias' attribute!"    setattr(objects, obj.alias, obj)def SetLogFile():    logfile = objects.config.logfile    if logfile:        objects.logger.setLogSeverity(objects.config.verbosity)        # If simulation is turned off, add file-handle        if not objects.config.simulate:            objects.logger.addLogHandler('FileHandler',logfile)def SetUserDebug(message):    """ Used to store error messages related    to user settings in the config file/project file.    These will be printed at the end of the program """    if message:        try:            globaldata.userdebug.index(message)        except:            globaldata.userdebug.append(message)def SetLogSeverity():    objects.logger.setLogSeverity(objects.config.verbosity)        def wasOrWere(val):    """ What it says """    if val > 1: return 'were'    else: return 'was'def plural((s, val)):    """ What it says """    if val>1:        if s[len(s)-1] == 'y':            return s[:len(s)-1]+'ies'        else: return s + 's'    else:        return s# file type identification functions# this is the precursor of a more generic file identificator# based on the '/etc/magic' file on unices.signatures = { "gif" : [0, ("GIF87a", "GIF89a")],               "jpeg" :[6, ("JFIF",)],               "bmp" : [0, ("BM6",)]             }aliases = { "gif" : (),                       # common extension aliases            "jpeg" : ("jpg", "jpe", "jfif"),            "bmp" : ("dib",) }def bin_crypt(data):    """ Encryption using binascii and obfuscation """    if data=='':        return ''    try:        return binascii.hexlify(obfuscate(data))    except TypeError, e:        debug('Error in encrypting data: <',data,'>', e)        return data    except ValueError, e:        debug('Error in encrypting data: <',data,'>', e)        return datadef bin_decrypt(data):    """ Decrypttion using binascii and deobfuscation """    if data=='':        return ''    try:        return unobfuscate(binascii.unhexlify(data))    except TypeError, e:        logconsole('Error in decrypting data: <',data,'>', e)        return data    except ValueError, e:        logconsole('Error in decrypting data: <',data,'>', e)        return datadef obfuscate(data):    """ Obfuscate a string using repeated xor """    out = ""    import operator    e0=chr(operator.xor(ord(data[0]), ord(data[1])))    out = "".join((out, e0))    x=1    eprev=e0    for x in range(1, len(data)):        ax=ord(data[x])        ex=chr(operator.xor(ax, ord(eprev)))        out = "".join((out,ex))        eprev = ex    return outdef unobfuscate(data):    """ Unobfuscate a xor obfuscated string """    out = ""    x=len(data) - 1    import operator    while x>1:        apos=data[x]        aprevpos=data[x-1]        epos=chr(operator.xor(ord(apos), ord(aprevpos)))        out = "".join((out, epos))        x -= 1    out=str(reduce(lambda x, y: y + x, out))    e2, a2 = data[1], data[0]    a1=chr(operator.xor(ord(a2), ord(e2)))    a1 = "".join((a1, out))    out = a1    e1,a1=out[0], data[0]

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?