📄 bacnet.py
字号:
#!/usr/bin/python2.3"""BACnet Communications Library"""import sysimport typesimport exceptionsimport reimport socketimport structimport timeimport threadimport threadingimport randomimport CSThread# some debugging flags and functions_debug = 0_debugVLAN = _debug or 0_debugAdapterRefs = _debug or 0_debugSegmentation = _debug or 0_debugAPDUcodec = _debug or 0def StringToHex(x,sep=''): return sep.join(["%02X" % (ord(c),) for c in x])def HexToString(x,sep=''): return ''.join([chr(int(x[i:i+2],16)) for i in range(0,len(x),len(sep)+2)])## Exceptions#class ConfigurationError(exceptions.ValueError): def __init__(self,args=None): self.args = argsclass EncodingError(exceptions.ValueError): def __init__(self,args=None): self.args = argsclass DecodingError(exceptions.ValueError): def __init__(self,args=None): self.args = args## Address#IPAddrMaskPortRE = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$' )class Address: nullAddr = 0 localBroadcastAddr = 1 localStationAddr = 2 remoteBroadcastAddr = 3 remoteStationAddr = 4 globalBroadcastAddr = 5 def __init__(self,*args): self.addrType = Address.nullAddr self.addrNet = None self.addrLen = 0 self.addrAddr = '' if len(args) == 1: self.DecodeAddress(args[0]) elif len(args) == 2: self.DecodeAddress(args[1]) if self.addrType == Address.localStationAddr: self.addrType = Address.remoteStationAddr self.addrNet = args[0] elif self.addrType == Address.localBroadcastAddr: self.addrType = Address.remoteBroadcastAddr self.addrNet = args[0] else: raise ValueError, "unrecognized address ctor form" def DecodeAddress(self,addr): """Initialize the address from a string. Lots of different forms are supported.""" if _debug: print self, "DecodeAddress", addr # start out assuming this is a local station self.addrType = Address.localStationAddr self.addrNet = None if addr == "*": self.addrType = Address.localBroadcastAddr self.addrNet = None self.addrAddr = None self.addrLen = None elif addr == "*:*": self.addrType = Address.globalBroadcastAddr self.addrNet = None self.addrAddr = None self.addrLen = None elif isinstance(addr,types.IntType): if (addr < 0) or (addr >= 256): raise ValueError, "address out of range" self.addrAddr = chr(addr) self.addrLen = 1 elif isinstance(addr,types.StringType): m = IPAddrMaskPortRE.match(addr) if m: net, addr, mask, port = m.groups() if not mask: mask = '32' if not port: port = '47808' if net: net = int(net) if (net >= 65535): raise ValueError, "network out of range" self.addrType = Address.remoteStationAddr self.addrNet = net self.addrPort = int(port) self.addrTuple = (addr,self.addrPort) addrstr = socket.inet_aton(addr) self.addrIP = struct.unpack('!L',addrstr)[0] self.addrMask = -1L << (32 - int(mask)) self.addrHost = (self.addrIP & ~self.addrMask) self.addrSubnet = (self.addrIP & self.addrMask) bcast = (self.addrSubnet | ~self.addrMask) self.addrBroadcastTuple = (socket.inet_ntoa(struct.pack('!L',bcast)),self.addrPort) self.addrAddr = addrstr + struct.pack('!H',self.addrPort) self.addrLen = 6 elif re.match(r"^\d+$",addr): addr = int(addr) if (addr > 255): raise ValueError, "address out of range" self.addrAddr = chr(addr) self.addrLen = 1 elif re.match(r"^\d+:[*]$",addr): addr = int(addr[:-2]) if (addr >= 65535): raise ValueError, "network out of range" self.addrType = Address.remoteBroadcastAddr self.addrNet = addr self.addrAddr = None self.addrLen = None elif re.match(r"^\d+:\d+$",addr): net, addr = addr.split(':') net = int(net) addr = int(addr) if (net >= 65535): raise ValueError, "network out of range" if (addr > 255): raise ValueError, "address out of range" self.addrType = Address.remoteStationAddr self.addrNet = net self.addrAddr = chr(addr) self.addrLen = 1 elif re.match(r"^0x([0-9A-Fa-f][0-9A-Fa-f])+$",addr): self.addrAddr = HexToString(addr[2:]) self.addrLen = len(self.addrAddr) elif re.match(r"^X'([0-9A-Fa-f][0-9A-Fa-f])+'$",addr): self.addrAddr = HexToString(addr[2:-1]) self.addrLen = len(self.addrAddr) elif re.match(r"^\d+:0x([0-9A-Fa-f][0-9A-Fa-f])+$",addr): net, addr = addr.split(':') net = int(net) if (net >= 65535): raise ValueError, "network out of range" self.addrType = Address.remoteStationAddr self.addrNet = net self.addrAddr = HexToString(addr[2:]) self.addrLen = len(self.addrAddr) elif re.match(r"^\d+:X'([0-9A-Fa-f][0-9A-Fa-f])+'$",addr): net, addr = addr.split(':') net = int(net) if (net >= 65535): raise ValueError, "network out of range" self.addrType = Address.remoteStationAddr self.addrNet = net self.addrAddr = HexToString(addr[2:-1]) self.addrLen = len(self.addrAddr) else: raise ValueError, "unrecognized format" elif isinstance(addr,types.TupleType): addr, port = addr self.addrPort = int(port) if isinstance(addr,types.StringType): addrstr = socket.inet_aton(addr) self.addrTuple = (addr,self.addrPort) elif isinstance(addr,types.LongType): addrstr = struct.pack('!L',addr) self.addrTuple = (socket.inet_ntoa(addrstr),self.addrPort) else: raise TypeError, "tuple must be (string,port) or (long,port)" self.addrIP = struct.unpack('!L',addrstr)[0] self.addrMask = -1L self.addrHost = None self.addrSubnet = None self.addrBroadcastTuple = self.addrTuple self.addrAddr = addrstr + struct.pack('!H',self.addrPort) self.addrLen = 6 else: raise TypeError, "integer, string or tuple required" def __str__(self): if self.addrType == Address.nullAddr: return 'Null' elif self.addrType == Address.localBroadcastAddr: return '*' elif self.addrType == Address.localStationAddr: rslt = '' if self.addrLen == 1: rslt += str(ord(self.addrAddr[0])) else: port = ord(self.addrAddr[-2]) * 256 + ord(self.addrAddr[-1]) if (len(self.addrAddr) == 6) and (port >= 47808) and (port <= 47823): rslt += '.'.join(["%d" % ord(x) for x in self.addrAddr[0:4]]) if port != 47808: rslt += ':' + str(port) else: rslt += '0x' + StringToHex(self.addrAddr) return rslt elif self.addrType == Address.remoteBroadcastAddr: return '%d:*' % (self.addrNet,) elif self.addrType == Address.remoteStationAddr: rslt = '%d:' % (self.addrNet,) if self.addrLen == 1: rslt += str(ord(self.addrAddr[0])) else: port = ord(self.addrAddr[-2]) * 256 + ord(self.addrAddr[-1]) if (len(self.addrAddr) == 6) and (port >= 47808) and (port <= 47823): rslt += '.'.join(["%d" % ord(x) for x in self.addrAddr[0:4]]) if port != 47808: rslt += ':' + str(port) else: rslt += '0x' + StringToHex(self.addrAddr) return rslt elif self.addrType == Address.globalBroadcastAddr: return '*:*' else: raise TypeError, "unknown address type %d" % self.addrType def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.__str__()) def __hash__(self): return hash( (self.addrType, self.addrNet, self.addrAddr) ) def __eq__(self,arg): # try an coerce it into an address if not isinstance(arg,Address): arg = Address(arg) # all of the components must match return (self.addrType == arg.addrType) and (self.addrNet == arg.addrNet) and (self.addrAddr == arg.addrAddr) def __ne__(self,arg): return not self.__eq__(arg)def IPAddrPack(addr): return socket.inet_aton(addr[0]) + struct.pack('!H',addr[1])def IPAddrUnpack(addr): return (socket.inet_ntoa(addr[0:4]), struct.unpack('!H',addr[4:6])[0] )#class LocalStation(Address): def __init__(self,addr): self.addrType = Address.localStationAddr self.addrNet = None if isinstance(addr,types.IntType): if (addr < 0) or (addr >= 256): raise ValueError, "address out of range" self.addrAddr = chr(addr) self.addrLen = 1 else: self.addrAddr = addr self.addrLen = len(addr)class RemoteStation(Address): def __init__(self,net,addr): if (net < 0) or (net >= 65535): raise ValueError, "network out of range" self.addrType = Address.remoteStationAddr self.addrNet = net if isinstance(addr,types.IntType): if (addr < 0) or (addr >= 256): raise ValueError, "address out of range" self.addrAddr = chr(addr) self.addrLen = 1 else: self.addrAddr = addr self.addrLen = len(addr)class LocalBroadcast(Address): def __init__(self): self.addrType = Address.localBroadcastAddr self.addrNet = None self.addrAddr = None self.addrLen = Noneclass RemoteBroadcast(Address): def __init__(self,net): if (net < 0) or (net >= 65535): raise ValueError, "network out of range" self.addrType = Address.remoteBroadcastAddr self.addrNet = net self.addrAddr = None self.addrLen = Noneclass GlobalBroadcast(Address): def __init__(self): self.addrType = Address.globalBroadcastAddr self.addrNet = None self.addrAddr = None self.addrLen = None## PDUData#class PDUData: def __init__(self, data=''): if isinstance(data,PDUData): self.pduData = data.pduData elif isinstance(data,types.StringType): self.pduData = data else: raise ValueError, "PDUData ctor parameter must be PDUData or a string" def Get(self): if len(self.pduData) == 0: raise DecodingError, "no more packet data" ch = self.pduData[0] self.pduData = self.pduData[1:] return ord(ch) def GetShort(self): if len(self.pduData) < 2: raise DecodingError, "no more packet data" rslt = (ord(self.pduData[0]) << 8) + ord(self.pduData[1]) self.pduData = self.pduData[2:] return rslt def GetLong(self): return struct.unpack('>L',self.GetData(4))[0] def GetData(self, dlen): if len(self.pduData) < dlen: raise DecodingError, "no more packet data" data = self.pduData[:dlen] self.pduData = self.pduData[dlen:] return data def Put(self, ch): self.pduData += chr(ch) def PutShort(self, n): self.pduData += chr((n >> 8) & 0xFF) + chr(n & 0xFF) def PutLong(self, n): self.pduData += struct.pack('>L',n) def PutData(self, data): self.pduData += data def __str__(self): """Useful for debugging.""" return "PDUData(" + StringToHex(self.pduData,'.') + ")"## Tag#class Tag: applicationTagClass = 0 contextTagClass = 1 openingTagClass = 2 closingTagClass = 3 nullAppTag = 0 booleanAppTag = 1 unsignedAppTag = 2 integerAppTag = 3 realAppTag = 4 doubleAppTag = 5 octetStringAppTag = 6 characterStringAppTag = 7 bitStringAppTag = 8 enumeratedAppTag = 9 dateAppTag = 10 timeAppTag = 11 objectIdentifierAppTag = 12 reservedAppTag13 = 13 reservedAppTag14 = 14
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -