common.py
来自「Harvestman-最新版本」· Python 代码 · 共 604 行 · 第 1/2 页
PY
604 行
# -- coding: utf-8""" common.py - Global functions for HarvestMan Program. This file is part of the HarvestMan software. For licensing information, see file LICENSE.TXT. Author: Anand B Pillai <abpillai at gmail dot com> Created: Jun 10 2003 Aug 17 2006 Anand Modifications for the new logging module. Feb 7 2007 Anand Some changes. Added logconsole function. Split Initialize() to InitConfig() and InitLogger(). Feb 26 2007 Anand Replaced urlmappings dictionary with a WeakValueDictionary. Copyright (C) 2004 - Anand B Pillai."""__version__ = '2.0 b1'__author__ = 'Anand B Pillai'import weakrefimport os, sysimport socketimport binasciiimport copyimport threadingimport shelveimport cStringIOimport tracebackimport threadingimport collectionsimport randomimport cStringIOimport tokenizefrom types import *from singleton import Singletonclass Alias(Singleton): def __getattr__(self, name): try: return super(Alias, self).__getattr__(name) except AttributeError: return None passclass AliasError(Exception): passclass GlobalData(Singleton): def __getattr__(self, name): try: return super(Alias, self).__getattr__(name) except AttributeError: return None# Namespace for global unique objects# This varible holds each global object in HarvestMan# If any module redefines an 'objects' variable locally, it# is doing at its own peril!objects = Alias()# Namespace for global dataglobaldata = GlobalData()globaldata.userdebug = []class SleepEvent(object): """ A class representing a timeout event. This can be used to passively wait for a given time-period instead of using time.sleep(...) """ def __init__(self, sleeptime): self._sleeptime = sleeptime self.evt = threading.Event() self.evt.set() def sleep(self): self.evt.clear() self.evt.wait(self._sleeptime) self.evt.set()class RandomSleepEvent(SleepEvent): """ A class representing a timeout event. This can be used to passively wait for a given time-period instead of using time.sleep(...) """ def sleep(self): self.evt.clear() self.evt.wait(random.random()*self._sleeptime) self.evt.set() class DummyStderr(object): """ A dummy class to imitate stderr """ def write(self, msg): passclass CaselessDict(dict): def __init__(self, mapping=None): if mapping: if type(mapping) is dict: for k,v in d.items(): self.__setitem__(k, v) elif type(mapping) in (list, tuple): d = dict(mapping) for k,v in d.items(): self.__setitem__(k, v) # super(CaselessDict, self).__init__(d) def __setitem__(self, name, value): if type(name) in StringTypes: super(CaselessDict, self).__setitem__(name.lower(), value) else: super(CaselessDict, self).__setitem__(name, value) def __getitem__(self, name): if type(name) in StringTypes: return super(CaselessDict, self).__getitem__(name.lower()) else: return super(CaselessDict, self).__getitem__(name) def __copy__(self): pass class Ldeque(collections.deque): """ Length-limited deque """ def __init__(self, count=10): self.max = count super(Ldeque, self).__init__() def append(self, item): super(Ldeque, self).append(item) if len(self)>self.max: # if size exceeds, pop from left self.popleft() def appendleft(self, item): super(Ldeque, self).appendleft(item) if len(self)>self.max: # if size exceeds, pop from right self.pop() def index(self, item): """ Return the index of an item from the deque """ return list(self).index(item) def remove(self, item): """ Remove an item from the deque """ idx = self.index(item) self.__delitem__(idx) def SysExceptHook(typ, val, tracebak): """ Dummy function to replace sys.excepthook """ passdef SetAlias(obj): """ Set unique alias for the object """ # Alias is another name for the object, it should be unique # The object's class should have a field name 'alias' if getattr(obj, 'alias') == None: raise AliasError, "object does not define 'alias' attribute!" setattr(objects, obj.alias, obj)def SetLogFile(): logfile = objects.config.logfile if logfile: objects.logger.setLogSeverity(objects.config.verbosity) # If simulation is turned off, add file-handle if not objects.config.simulate: objects.logger.addLogHandler('FileHandler',logfile)def SetUserDebug(message): """ Used to store error messages related to user settings in the config file/project file. These will be printed at the end of the program """ if message: try: globaldata.userdebug.index(message) except: globaldata.userdebug.append(message)def SetLogSeverity(): objects.logger.setLogSeverity(objects.config.verbosity) def wasOrWere(val): """ What it says """ if val > 1: return 'were' else: return 'was'def plural((s, val)): """ What it says """ if val>1: if s[len(s)-1] == 'y': return s[:len(s)-1]+'ies' else: return s + 's' else: return s# file type identification functions# this is the precursor of a more generic file identificator# based on the '/etc/magic' file on unices.signatures = { "gif" : [0, ("GIF87a", "GIF89a")], "jpeg" :[6, ("JFIF",)], "bmp" : [0, ("BM6",)] }aliases = { "gif" : (), # common extension aliases "jpeg" : ("jpg", "jpe", "jfif"), "bmp" : ("dib",) }def bin_crypt(data): """ Encryption using binascii and obfuscation """ if data=='': return '' try: return binascii.hexlify(obfuscate(data)) except TypeError, e: debug('Error in encrypting data: <',data,'>', e) return data except ValueError, e: debug('Error in encrypting data: <',data,'>', e) return datadef bin_decrypt(data): """ Decrypttion using binascii and deobfuscation """ if data=='': return '' try: return unobfuscate(binascii.unhexlify(data)) except TypeError, e: logconsole('Error in decrypting data: <',data,'>', e) return data except ValueError, e: logconsole('Error in decrypting data: <',data,'>', e) return datadef obfuscate(data): """ Obfuscate a string using repeated xor """ out = "" import operator e0=chr(operator.xor(ord(data[0]), ord(data[1]))) out = "".join((out, e0)) x=1 eprev=e0 for x in range(1, len(data)): ax=ord(data[x]) ex=chr(operator.xor(ax, ord(eprev))) out = "".join((out,ex)) eprev = ex return outdef unobfuscate(data): """ Unobfuscate a xor obfuscated string """ out = "" x=len(data) - 1 import operator while x>1: apos=data[x] aprevpos=data[x-1] epos=chr(operator.xor(ord(apos), ord(aprevpos))) out = "".join((out, epos)) x -= 1 out=str(reduce(lambda x, y: y + x, out)) e2, a2 = data[1], data[0] a1=chr(operator.xor(ord(a2), ord(e2))) a1 = "".join((a1, out)) out = a1 e1,a1=out[0], data[0]
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?