⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bacnet.py

📁 BacNet Simulation Scripts(英文的)。用于数据采集。
💻 PY
📖 第 1 页 / 共 5 页
字号:
#!/usr/bin/python2.3"""BACnet Communications Library"""import sysimport typesimport exceptionsimport reimport socketimport structimport timeimport threadimport threadingimport randomimport CSThread# some debugging flags and functions_debug = 0_debugVLAN = _debug or 0_debugAdapterRefs = _debug or 0_debugSegmentation = _debug or 0_debugAPDUcodec = _debug or 0def StringToHex(x,sep=''):    return sep.join(["%02X" % (ord(c),) for c in x])def HexToString(x,sep=''):    return ''.join([chr(int(x[i:i+2],16)) for i in range(0,len(x),len(sep)+2)])##   Exceptions#class ConfigurationError(exceptions.ValueError):    def __init__(self,args=None):        self.args = argsclass EncodingError(exceptions.ValueError):    def __init__(self,args=None):        self.args = argsclass DecodingError(exceptions.ValueError):    def __init__(self,args=None):        self.args = args##   Address#IPAddrMaskPortRE = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$' )class Address:    nullAddr = 0    localBroadcastAddr = 1    localStationAddr = 2    remoteBroadcastAddr = 3    remoteStationAddr = 4    globalBroadcastAddr = 5    def __init__(self,*args):        self.addrType = Address.nullAddr        self.addrNet = None        self.addrLen = 0        self.addrAddr = ''        if len(args) == 1:            self.DecodeAddress(args[0])        elif len(args) == 2:            self.DecodeAddress(args[1])            if self.addrType == Address.localStationAddr:                self.addrType = Address.remoteStationAddr                self.addrNet = args[0]            elif self.addrType == Address.localBroadcastAddr:                self.addrType = Address.remoteBroadcastAddr                self.addrNet = args[0]            else:                raise ValueError, "unrecognized address ctor form"    def DecodeAddress(self,addr):        """Initialize the address from a string.  Lots of different forms are supported."""        if _debug:            print self, "DecodeAddress", addr        # start out assuming this is a local station        self.addrType = Address.localStationAddr        self.addrNet = None        if addr == "*":            self.addrType = Address.localBroadcastAddr            self.addrNet = None            self.addrAddr = None            self.addrLen = None        elif addr == "*:*":            self.addrType = Address.globalBroadcastAddr            self.addrNet = None            self.addrAddr = None            self.addrLen = None        elif isinstance(addr,types.IntType):            if (addr < 0) or (addr >= 256):                raise ValueError, "address out of range"            self.addrAddr = chr(addr)            self.addrLen = 1        elif isinstance(addr,types.StringType):            m = IPAddrMaskPortRE.match(addr)            if m:                net, addr, mask, port = m.groups()                if not mask: mask = '32'                if not port: port = '47808'                if net:                    net = int(net)                    if (net >= 65535):                        raise ValueError, "network out of range"                    self.addrType = Address.remoteStationAddr                    self.addrNet = net                self.addrPort = int(port)                self.addrTuple = (addr,self.addrPort)                addrstr = socket.inet_aton(addr)                self.addrIP = struct.unpack('!L',addrstr)[0]                self.addrMask = -1L << (32 - int(mask))                self.addrHost = (self.addrIP & ~self.addrMask)                self.addrSubnet = (self.addrIP & self.addrMask)                bcast = (self.addrSubnet | ~self.addrMask)                self.addrBroadcastTuple = (socket.inet_ntoa(struct.pack('!L',bcast)),self.addrPort)                self.addrAddr = addrstr + struct.pack('!H',self.addrPort)                self.addrLen = 6            elif re.match(r"^\d+$",addr):                addr = int(addr)                if (addr > 255):                    raise ValueError, "address out of range"                self.addrAddr = chr(addr)                self.addrLen = 1            elif re.match(r"^\d+:[*]$",addr):                addr = int(addr[:-2])                if (addr >= 65535):                    raise ValueError, "network out of range"                self.addrType = Address.remoteBroadcastAddr                self.addrNet = addr                self.addrAddr = None                self.addrLen = None            elif re.match(r"^\d+:\d+$",addr):                net, addr = addr.split(':')                net = int(net)                addr = int(addr)                if (net >= 65535):                    raise ValueError, "network out of range"                if (addr > 255):                    raise ValueError, "address out of range"                self.addrType = Address.remoteStationAddr                self.addrNet = net                self.addrAddr = chr(addr)                self.addrLen = 1            elif re.match(r"^0x([0-9A-Fa-f][0-9A-Fa-f])+$",addr):                self.addrAddr = HexToString(addr[2:])                self.addrLen = len(self.addrAddr)            elif re.match(r"^X'([0-9A-Fa-f][0-9A-Fa-f])+'$",addr):                self.addrAddr = HexToString(addr[2:-1])                self.addrLen = len(self.addrAddr)            elif re.match(r"^\d+:0x([0-9A-Fa-f][0-9A-Fa-f])+$",addr):                net, addr = addr.split(':')                net = int(net)                if (net >= 65535):                    raise ValueError, "network out of range"                self.addrType = Address.remoteStationAddr                self.addrNet = net                self.addrAddr = HexToString(addr[2:])                self.addrLen = len(self.addrAddr)            elif re.match(r"^\d+:X'([0-9A-Fa-f][0-9A-Fa-f])+'$",addr):                net, addr = addr.split(':')                net = int(net)                if (net >= 65535):                    raise ValueError, "network out of range"                self.addrType = Address.remoteStationAddr                self.addrNet = net                self.addrAddr = HexToString(addr[2:-1])                self.addrLen = len(self.addrAddr)            else:                raise ValueError, "unrecognized format"        elif isinstance(addr,types.TupleType):            addr, port = addr            self.addrPort = int(port)            if isinstance(addr,types.StringType):                addrstr = socket.inet_aton(addr)                self.addrTuple = (addr,self.addrPort)            elif isinstance(addr,types.LongType):                addrstr = struct.pack('!L',addr)                self.addrTuple = (socket.inet_ntoa(addrstr),self.addrPort)            else:                raise TypeError, "tuple must be (string,port) or (long,port)"            self.addrIP = struct.unpack('!L',addrstr)[0]            self.addrMask = -1L            self.addrHost = None            self.addrSubnet = None            self.addrBroadcastTuple = self.addrTuple            self.addrAddr = addrstr + struct.pack('!H',self.addrPort)            self.addrLen = 6        else:            raise TypeError, "integer, string or tuple required"    def __str__(self):        if self.addrType == Address.nullAddr:            return 'Null'        elif self.addrType == Address.localBroadcastAddr:            return '*'        elif self.addrType == Address.localStationAddr:            rslt = ''            if self.addrLen == 1:                rslt += str(ord(self.addrAddr[0]))            else:                port = ord(self.addrAddr[-2]) * 256 + ord(self.addrAddr[-1])                if (len(self.addrAddr) == 6) and (port >= 47808) and (port <= 47823):                    rslt += '.'.join(["%d" % ord(x) for x in self.addrAddr[0:4]])                    if port != 47808:                        rslt += ':' + str(port)                else:                    rslt += '0x' + StringToHex(self.addrAddr)            return rslt        elif self.addrType == Address.remoteBroadcastAddr:            return '%d:*' % (self.addrNet,)        elif self.addrType == Address.remoteStationAddr:            rslt = '%d:' % (self.addrNet,)            if self.addrLen == 1:                rslt += str(ord(self.addrAddr[0]))            else:                port = ord(self.addrAddr[-2]) * 256 + ord(self.addrAddr[-1])                if (len(self.addrAddr) == 6) and (port >= 47808) and (port <= 47823):                    rslt += '.'.join(["%d" % ord(x) for x in self.addrAddr[0:4]])                    if port != 47808:                        rslt += ':' + str(port)                else:                    rslt += '0x' + StringToHex(self.addrAddr)            return rslt        elif self.addrType == Address.globalBroadcastAddr:            return '*:*'        else:            raise TypeError, "unknown address type %d" % self.addrType    def __repr__(self):        return "<%s %s>" % (self.__class__.__name__, self.__str__())    def __hash__(self):        return hash( (self.addrType, self.addrNet, self.addrAddr) )    def __eq__(self,arg):        # try an coerce it into an address        if not isinstance(arg,Address):            arg = Address(arg)        # all of the components must match        return (self.addrType == arg.addrType) and (self.addrNet == arg.addrNet) and (self.addrAddr == arg.addrAddr)    def __ne__(self,arg):        return not self.__eq__(arg)def IPAddrPack(addr):    return socket.inet_aton(addr[0]) + struct.pack('!H',addr[1])def IPAddrUnpack(addr):    return (socket.inet_ntoa(addr[0:4]), struct.unpack('!H',addr[4:6])[0] )#class LocalStation(Address):    def __init__(self,addr):        self.addrType = Address.localStationAddr        self.addrNet = None        if isinstance(addr,types.IntType):            if (addr < 0) or (addr >= 256):                raise ValueError, "address out of range"            self.addrAddr = chr(addr)            self.addrLen = 1        else:            self.addrAddr = addr            self.addrLen = len(addr)class RemoteStation(Address):    def __init__(self,net,addr):        if (net < 0) or (net >= 65535):            raise ValueError, "network out of range"        self.addrType = Address.remoteStationAddr        self.addrNet = net        if isinstance(addr,types.IntType):            if (addr < 0) or (addr >= 256):                raise ValueError, "address out of range"            self.addrAddr = chr(addr)            self.addrLen = 1        else:            self.addrAddr = addr            self.addrLen = len(addr)class LocalBroadcast(Address):    def __init__(self):        self.addrType = Address.localBroadcastAddr        self.addrNet = None        self.addrAddr = None        self.addrLen = Noneclass RemoteBroadcast(Address):    def __init__(self,net):        if (net < 0) or (net >= 65535):            raise ValueError, "network out of range"        self.addrType = Address.remoteBroadcastAddr        self.addrNet = net        self.addrAddr = None        self.addrLen = Noneclass GlobalBroadcast(Address):    def __init__(self):        self.addrType = Address.globalBroadcastAddr        self.addrNet = None        self.addrAddr = None        self.addrLen = None##   PDUData#class PDUData:    def __init__(self, data=''):        if isinstance(data,PDUData):            self.pduData = data.pduData        elif isinstance(data,types.StringType):            self.pduData = data        else:            raise ValueError, "PDUData ctor parameter must be PDUData or a string"    def Get(self):        if len(self.pduData) == 0:            raise DecodingError, "no more packet data"        ch = self.pduData[0]        self.pduData = self.pduData[1:]        return ord(ch)    def GetShort(self):        if len(self.pduData) < 2:            raise DecodingError, "no more packet data"        rslt = (ord(self.pduData[0]) << 8) + ord(self.pduData[1])        self.pduData = self.pduData[2:]        return rslt    def GetLong(self):        return struct.unpack('>L',self.GetData(4))[0]    def GetData(self, dlen):        if len(self.pduData) < dlen:            raise DecodingError, "no more packet data"        data = self.pduData[:dlen]        self.pduData = self.pduData[dlen:]        return data    def Put(self, ch):        self.pduData += chr(ch)    def PutShort(self, n):        self.pduData += chr((n >> 8) & 0xFF) + chr(n & 0xFF)    def PutLong(self, n):        self.pduData += struct.pack('>L',n)    def PutData(self, data):        self.pduData += data    def __str__(self):        """Useful for debugging."""        return "PDUData(" + StringToHex(self.pduData,'.') + ")"##   Tag#class Tag:    applicationTagClass     = 0    contextTagClass         = 1    openingTagClass         = 2    closingTagClass         = 3    nullAppTag              = 0    booleanAppTag           = 1    unsignedAppTag          = 2    integerAppTag           = 3    realAppTag              = 4    doubleAppTag            = 5    octetStringAppTag       = 6    characterStringAppTag   = 7    bitStringAppTag         = 8    enumeratedAppTag        = 9    dateAppTag              = 10    timeAppTag              = 11    objectIdentifierAppTag  = 12    reservedAppTag13        = 13    reservedAppTag14        = 14

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -