📄 bacnet.py
字号:
reservedAppTag15 = 15 _applicationTagName = \ [ 'null', 'boolean', 'unsigned', 'integer' , 'real', 'double', 'octetString', 'characterString' , 'bitString', 'enumerated', 'date', 'time' , 'objectIdentifier', 'reserved13', 'reserved14', 'reserved15' ] _applicationTagClass = [] # defined later def __init__(self, *args): self.tagClass = None self.tagNumber = None self.tagLVT = None self.tagData = None if args: if (len(args) == 1) and isinstance(args[0],PDUData): self.Decode(args[0]) elif (len(args) >= 2): self.Set(*args) else: raise ValueError, "invalid Tag ctor arguments" def Set(self, tclass, tnum, tlvt=0, tdata=''): """Set the values of the tag.""" self.tagClass = tclass self.tagNumber = tnum self.tagLVT = tlvt self.tagData = tdata def SetAppData(self, tnum, tdata): """Set the values of the tag.""" self.tagClass = Tag.applicationTagClass self.tagNumber = tnum self.tagLVT = len(tdata) self.tagData = tdata def Encode(self, pdu): # check for special encoding of open and close tags if (self.tagClass == Tag.openingTagClass): pdu.Put(((self.tagNumber & 0x0F) << 4) + 0x0E) return if (self.tagClass == Tag.closingTagClass): pdu.Put(((self.tagNumber & 0x0F) << 4) + 0x0F) return # check for context encoding if (self.tagClass == Tag.contextTagClass): data = 0x08 else: data = 0x00 # encode the tag number part if (self.tagNumber < 15): data += (self.tagNumber << 4) else: data += 0xF0 # encode the length/value/type part if (self.tagLVT < 5): data += self.tagLVT else: data += 0x05 # save this and the extended tag value pdu.Put( data ) if (self.tagNumber >= 15): pdu.Put(self.tagNumber) # really short lengths are already done if (self.tagLVT >= 5): if (self.tagLVT <= 253): pdu.Put( self.tagLVT ) elif (self.tagLVT <= 65535): enc.Put( 254 ) pdu.PutShort( self.tagLVT ) else: pdu.Put( 255 ) pdu.PutLong( self.tagLVT ) # now put the data pdu.PutData(self.tagData) def Decode(self, pdu): tag = pdu.Get() # extract the type self.tagClass = (tag >> 3) & 0x01 # extract the tag number self.tagNumber = (tag >> 4) if (self.tagNumber == 0x0F): self.tagNumber = dec.Get() # extract the length self.tagLVT = tag & 0x07 if (self.tagLVT == 5): self.tagLVT = pdu.Get() if (self.tagLVT == 254): self.tagLVT = pdu.GetShort() elif (self.tagLVT == 255): self.tagLVT = pdu.GetLong() elif (self.tagLVT == 6): self.tagClass = Tag.openingTagClass self.tagLVT = 0 elif (self.tagLVT == 7): self.tagClass = Tag.closingTagClass self.tagLVT = 0 # application tagged boolean has no more data if (self.tagClass == Tag.applicationTagClass) and (self.tagNumber == Tag.booleanAppTag): # tagLVT contains value self.tagData = '' else: # tagLVT contains length self.tagData = pdu.GetData(self.tagLVT) def AppToCtx(self, context): """Return a context encoded tag.""" if self.tagClass != Tag.applicationTagClass: raise ValueError, "application tag required" # application tagged boolean now has data if (self.tagNumber == Tag.booleanAppTag): return ContextTag(context, chr(self.tagLVT)) else: return ContextTag(context, self.tagData) def CtxToApp(self, dataType): """Return an application encoded tag.""" if self.tagClass != Tag.contextTagClass: raise ValueError, "context tag required" # context booleans have value in data if (dataType == Tag.booleanAppTag): return Tag(Tag.applicationTagClass, Tag.booleanAppTag, ord(self.tagData[0]), '') else: return ApplicationTag(dataType, self.tagData) def AppToObject(self): """Return the application object encoded by the tag.""" if self.tagClass != Tag.applicationTagClass: raise ValueError, "application tag required" # get the class to build klass = self._applicationTagClass[self.tagNumber] if not klass: return None # build an object, tell it to decode this tag, and return it return klass(self) def __repr__(self): xid = id(self) if (xid < 0): xid += (1L << 32) sname = self.__module__ + '.' + self.__class__.__name__ try: if self.tagClass == Tag.openingTagClass: desc = "(open(%d))" % (self.tagNumber,) elif self.tagClass == Tag.closingTagClass: desc = "(close(%d))" % (self.tagNumber,) elif self.tagClass == Tag.contextTagClass: desc = "(context(%d))" % (self.tagNumber,) elif self.tagClass == Tag.applicationTagClass: desc = "(%s)" % (self._applicationTagName[self.tagNumber],) else: raise ValueError, "invalid tag class" except: desc = "(?)" return '<' + sname + desc + ' instance at 0x%08x' % (xid,) + '>' def __eq__(self, tag): return (self.tagClass == tag.tagClass) \ and (self.tagNumber == tag.tagNumber) \ and (self.tagLVT == tag.tagLVT) \ and (self.tagData == tag.tagData) def __ne__(self,arg): return not self.__eq__(arg)## ApplicationTag#class ApplicationTag(Tag): def __init__(self, *args): if len(args) == 1 and isinstance(args[0], PDUData): Tag.__init__(self, args[0]) if self.tagClass != Tag.applicationTagClass: raise DecodingError, "application tag not decoded" elif len(args) == 2: tnum, tdata = args Tag.__init__(self, Tag.applicationTagClass, tnum, len(tdata), tdata) else: raise ValueError, "ApplicationTag ctor requires a type and data or PDUData"## ContextTag#class ContextTag(Tag): def __init__(self, *args): if len(args) == 1 and isinstance(args[0], PDUData): Tag.__init__(self, args[0]) if self.tagClass != Tag.contextTagClass: raise DecodingError, "context tag not decoded" elif len(args) == 2: tnum, tdata = args Tag.__init__(self, Tag.contextTagClass, tnum, len(tdata), tdata) else: raise ValueError, "ContextyTag ctor requires a type and data or PDUData"## OpeningTag#class OpeningTag(Tag): def __init__(self, context): if isinstance(context, PDUData): Tag.__init__(self, context) if self.tagClass != Tag.openingTagClass: raise DecodingError, "opening tag not decoded" elif isinstance(context, types.IntType): Tag.__init__(self, Tag.openingTagClass, context) else: raise TypeError, "OpeningTag ctor requires an integer or PDUData"## ClosingTag#class ClosingTag(Tag): def __init__(self, context): if isinstance(context, PDUData): Tag.__init__(self, context) if self.tagClass != Tag.closingTagClass: raise DecodingError, "closing tag not decoded" elif isinstance(context, types.IntType): Tag.__init__(self, Tag.closingTagClass, context) else: raise TypeError, "OpeningTag ctor requires an integer or PDUData"## DebugTag#def DebugTag(tag): print "DebugTag", tag print " tagClass =", tag.tagClass, if tag.tagClass == Tag.applicationTagClass: print 'application' elif tag.tagClass == Tag.contextTagClass: print 'context' elif tag.tagClass == Tag.openingTagClass: print 'opening' elif tag.tagClass == Tag.closingTagClass: print 'closing' else: print "?" print " tagNumber =", tag.tagNumber, if tag.tagClass == Tag.applicationTagClass: try: print tag._applicationTagName[tag.tagNumber] except: print "?" else: print print " tagLVT =", tag.tagLVT, if tag.tagLVT != len(tag.tagData): print "(length does not match data)" else: print "(length match)" print " tagData = '%s'" % (StringToHex(tag.tagData,'.'),)## TagList#class TagList: def __init__(self, arg=None): self.tagList = [] if isinstance(arg, types.ListType): self.tagList = arg elif isinstance(arg, TagList): self.tagList = arg.tagList[:] elif isinstance(arg, PDUData): self.Decode(arg) def append(self, tag): self.tagList.append(tag) def extend(self, taglist): self.tagList.extend(taglist) def __getitem__(self, item): return self.tagList[item] def __len__(self): return len(self.tagList) def Peek(self): """Return the tag at the front of the list.""" if self.tagList: tag = self.tagList[0] else: tag = None if _debug: print "(peek)", tag return tag def Push(self, tag): """Return a tag back to the front of the list.""" if _debug: print "(push)", tag self.tagList = [tag] + self.tagList def Pop(self): """Remove the tag from the front of the list and return it.""" if self.tagList: tag = self.tagList[0] del self.tagList[0] else: tag = None if _debug: print "(pop)", tag return tag def GetContext(self, context): """Return a tag or a list of tags context encoded.""" # forward pass i = 0 while i < len(self.tagList): tag = self.tagList[i] # skip application stuff if tag.tagClass == Tag.applicationTagClass: pass # check for context encoded atomic value elif tag.tagClass == Tag.contextTagClass: if tag.tagNumber == context: return tag # check for context encoded group elif tag.tagClass == Tag.openingTagClass: keeper = tag.tagNumber == context rslt = [] i += 1 lvl = 0 while i < len(self.tagList): tag = self.tagList[i] if tag.tagClass == Tag.openingTagClass: lvl += 1 elif tag.tagClass == Tag.closingTagClass: lvl -= 1 if lvl < 0: break rslt.append(tag) i += 1 # make sure everything balances if lvl >= 0: raise DecodingError, "mismatched open/close tags" # get everything we need? if keeper: return TagList(rslt) else: raise DecodingError, "unexpected tag" # try the next tag i += 1 # nothing found return None def Encode(self, pdu): """Encode the tag list into a PDU.""" for tag in self.tagList: tag.Encode(pdu) def Decode(self, pdu): """Decode the tags from a PDU.""" while pdu.pduData: self.tagList.append( Tag(pdu) )## DebugTagList#def DebugTagList(tags): print "DebugTagList", tags for tag in tags.tagList: print " ", tag## Atomic#class Atomic: _appTag = None def __cmp__(self, other): # hoop jump it if not isinstance(other, self.__class__): other = self.__class__(other) # now compare the values if (self.value < other.value): return -1 elif (self.value > other.value): return 1 else: return 0## Null#
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -