📄 bacnet.py
字号:
objType, objInstance = self.GetTuple() # pack the components together return long((objType << 22) + objInstance) def Encode(self, tag): # encode the tag tag.SetAppData(Tag.objectIdentifierAppTag, struct.pack('>L',self.GetLong())) def Decode(self, tag): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.objectIdentifierAppTag): raise ValueError, "object identifier application tag required" # extract the data self.SetLong( struct.unpack('>L',tag.tagData)[0] ) def __str__(self): # rip it apart objType, objInstance = self.value if isinstance(objType, types.StringType): typestr = objType elif objType < 0: typestr = "Bad %d" % (objType,) elif self.objectTypeClass._xlateTable.has_key(objType): typestr = self.objectTypeClass._xlateTable[objType] elif (objType < 128): typestr = "Reserved %d" % (objType,) else: typestr = "Vendor %d" % (objType,) return "ObjectIdentifier(%s,%d)" % (typestr, objInstance) def __hash__(self): return hash(self.value) def __cmp__(self, other): """Special function to make sure comparisons are done in enumeration order, not alphabetic order.""" # hoop jump it if not isinstance(other, self.__class__): other = self.__class__(other) # get the numeric version a = self.GetLong() b = other.GetLong() # now compare the values if (a < b): return -1 elif (a > b): return 1 else: return 0## Application Tag Classes## This list is set in the Tag class so that the AppToObject# function can return one of the appliction datatypes. It# can't be provided in the Tag class definition because the# classes aren't defined yet.#Tag._applicationTagClass = \ [ Null, Boolean, Unsigned, Integer , Real, Double, OctetString, CharacterString , BitString, Enumerated, Date, Time , ObjectIdentifier, None, None, None ]## Element#class Element: def __init__(self, name, klass, context=None, optional=False): self.name = name self.klass = klass self.context = context self.optional = optional## Sequence#class Sequence: sequenceElements = [] def __init__(self, **kwargs): for element in self.sequenceElements: setattr(self, element.name, kwargs.get(element.name, None)) def Encode(self, taglist): if _debug: print "%s.Encode" % (self.__class__.__name__,), taglist global _SequenceOfClasses for element in self.sequenceElements: value = getattr(self, element.name, None) if element.optional and value is None: continue if not element.optional and value is None: raise AttributeError, "'%s' is a required element of %s" % (element.name,self.__class__.__name__) if _SequenceOfClasses.has_key(element.klass): # might need to encode an opening tag if element.context is not None: taglist.append(OpeningTag(element.context)) if _debug: print " - build sequence helper", element.klass, value helper = element.klass(value) # encode the value helper.Encode(taglist) # might need to encode a closing tag if element.context is not None: taglist.append(ClosingTag(element.context)) elif issubclass(element.klass, Atomic): # a helper cooperates between the atomic value and the tag if _debug: print " - build helper", element.klass, value helper = element.klass(value) # build a tag and encode the data into it tag = Tag() helper.Encode(tag) # convert it to context encoding iff necessary if element.context is not None: tag = tag.AppToCtx(element.context) # now append the tag taglist.append(tag) elif isinstance(value, element.klass): # might need to encode an opening tag if element.context is not None: taglist.append(OpeningTag(element.context)) # encode the value value.Encode(taglist) # might need to encode a closing tag if element.context is not None: taglist.append(ClosingTag(element.context)) else: raise TypeError, "'%s' must be a %s" % (element.name, element.klass.__name__) def Decode(self, taglist): if _debug: print "%s.Decode" % (self.__class__.__name__,), taglist for element in self.sequenceElements: tag = taglist.Peek() # no more elements if tag is None: if not element.optional: raise AttributeError, "'%s' is a required element of %s" % (element.name,self.__class__.__name__) # omitted optional element setattr(self, element.name, None) # we have been enclosed in a context elif tag.tagClass == Tag.closingTagClass: if not element.optional: raise AttributeError, "'%s' is a required element of %s" % (element.name,self.__class__.__name__) # omitted optional element setattr(self, element.name, None) # check for a sequence element elif _SequenceOfClasses.has_key(element.klass): # check for context encoding if element.context is not None: if tag.tagClass != Tag.openingTagClass or tag.tagNumber != element.context: if not element.optional: raise DecodingError, "'%s' expected opening tag %d" % (element.name, element.context) else: # omitted optional element setattr(self, element.name, []) continue taglist.Pop() # a helper cooperates between the atomic value and the tag helper = element.klass() helper.Decode(taglist) # now save the value setattr(self, element.name, helper.value) # check for context closing tag if element.context is not None: tag = taglist.Pop() if tag.tagClass != Tag.closingTagClass or tag.tagNumber != element.context: raise DecodingError, "'%s' expected closing tag %d" % (element.name, context) # check for an atomic element elif issubclass(element.klass, Atomic): # convert it to application encoding if element.context is not None: if tag.tagClass != Tag.contextTagClass or tag.tagNumber != element.context: if not element.optional: raise DecodingError, "'%s' expected context tag %d" % (element.name, element.context) else: setattr(self, element.name, None) continue tag = tag.CtxToApp(element.klass._appTag) else: if tag.tagClass != Tag.applicationTagClass or tag.tagNumber != element.klass._appTag: if not element.optional: raise DecodingError, "'%s' expected application tag %s" % (element.name, Tag._applicationTagName[element.klass._appTag]) else: setattr(self, element.name, None) continue # consume the tag taglist.Pop() # a helper cooperates between the atomic value and the tag helper = element.klass(tag) # now save the value setattr(self, element.name, helper.value) # some kind of structure else: if element.context is not None: if tag.tagClass != Tag.openingTagClass or tag.tagNumber != element.context: if not element.optional: raise DecodingError, "'%s' expected opening tag %d" % (element.name, element.context) else: setattr(self, element.name, None) continue taglist.Pop() try: # make a backup of the tag list in case the structure manages to # decode some content but not all of it. This is not supposed to # happen if the ASN.1 has been formed correctly. backup = taglist.tagList[:] # build a value and decode it value = element.klass() value.Decode(taglist) # save the result setattr(self, element.name, value) except DecodingError: # if the context tag was matched, the substructure has to be decoded # correctly. if element.context is None and element.optional: # omitted optional element setattr(self, element.name, None) # restore the backup taglist.tagList = backup else: raise if element.context is not None: tag = taglist.Pop() if (not tag) or tag.tagClass != Tag.closingTagClass or tag.tagNumber != element.context: raise DecodingError, "'%s' expected closing tag %d" % (element.name, element.context)## SequenceOf#_SequenceOfMap = {}_SequenceOfClasses = {}def SequenceOf(klass): """Function to return a class that can encode and decode a list of some other type.""" global _SequenceOfMap global _SequenceOfClasses # if this has already been built, return the cached one if _SequenceOfMap.has_key(klass): return _SequenceOfMap[klass] # no SequenceOf(SequenceOf(...)) allowed if _SequenceOfClasses.has_key(klass): raise TypeError, "nested sequences disallowed" # define a generic class for lists class SequenceOf: subtype = None def __init__(self,value=None): if value is None: self.value = [] elif isinstance(value,types.ListType): self.value = value else: raise TypeError, "invalid constructor datatype" def append(self, value): if issubclass(self.subtype, Atomic): pass elif not isinstance(value, self.subtype): raise TypeError, "%s value required" % (value, self.subtype.__name__,) self.value.append(value) def __getitem__(self, item): return self.value[item] def __len__(self): return len(self.value) def Encode(self, taglist): if _debug: print "%s.Encode" % (self.__class__.__name__,), taglist for value in self.value: if issubclass(self.subtype, Atomic): # a helper cooperates between the atomic value and the tag helper = self.subtype(value) # build a tag and encode the data into it tag = Tag() helper.Encode(tag) # now encode the tag taglist.append(tag) elif isinstance(value, self.subtype): # it must have its own encoder value.Encode(taglist) else: raise TypeError, "%s must be a %s" % (value, self.subtype.__name__) def Decode(self, taglist): if _debug: print "%s.Decode" % (self.__class__.__name__,), taglist while len(taglist) != 0: tag = taglist.Peek() if tag.tagClass == Tag.closingTagClass: return if issubclass(self.subtype, Atomic): if _debug: print " - building helper", self.subtype, tag taglist.Pop() # a helper cooperates between the atomic value and the tag helper = self.subtype(tag) # save the value self.value.append(helper.value) else: if _debug: print " - building value", self.subtype # build an element value = self.subtype() # let it decode itself value.Decode(taglist) # save what was built self.value.append(value) # constrain it to a list of a specific type of item setattr(SequenceOf, 'subtype', klass) SequenceOf.__name__ = 'SequenceOf(%s.%s)' % (klass.__module__, klass.__name__) # cache this type _SequenceOfMap[klass] = SequenceOf _SequenceOfClasses[SequenceOf] = 1 # return this new type return SequenceOfdef DebugSequenceOfClasses(): print "DebugSequenceOfClasses" for klass in _SequenceOfClasses: print " ", klass## Choice#class Choice: choiceElements = [] def __init__(self, **kwargs): for element in self.choiceElements: setattr(self, element.name, kwargs.get(element.name, None)) def Encode(self, taglist): if _debug: print "%s.Encode" % (self.__class__.__name__,), taglist for element in self.choiceElements: value = getattr(self, element.name, None) if value is None: continue if issubclass(element.klass,Atomic): # a helper cooperates between the atomic value and the tag helper = element.klass(value) # build a tag and encode the data into it tag = Tag() helper.Encode(tag) # convert it to context encoding if element.context is not None: tag = tag.AppToCtx(element.context) # now encode the tag taglist.append(tag) break elif isinstance(value, element.klass): # encode an opening tag if element.context is not None: taglist.append(OpeningTag(element.context)) # encode the value value.Encode(taglist) # encode a closing tag if element.context is not None:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -