📄 client.py
字号:
import client_intfimport stringimport reimport types# control verbosity of error outputverbose = 1secLevelMap = { 'noAuthNoPriv':1, 'authNoPriv':2, 'authPriv':3 }def _parse_session_args(kargs): sessArgs = { 'Version':3, 'DestHost':'localhost', 'Community':'public', 'Timeout':1000000, 'Retries':3, 'RemotePort':161, 'LocalPort':0, 'SecLevel':'noAuthNoPriv', 'SecName':'initial', 'PrivProto':'DEFAULT', 'PrivPass':'', 'AuthProto':'DEFAULT', 'AuthPass':'', 'ContextEngineId':'', 'SecEngineId':'', 'Context':'', 'Engineboots':0, 'Enginetime':0, } keys = kargs.keys() for key in keys: if sessArgs.has_key(key): sessArgs[key] = kargs[key] else: print stderr, "ERROR: unknown key", key return sessArgsdef STR(obj): if obj != None: obj = str(obj) return obj class Varbind(object): def __init__(self, tag=None, iid=None, val=None, type=None): self.tag = STR(tag) self.iid = STR(iid) self.val = STR(val) self.type = STR(type) # parse iid out of tag if needed if iid == None and tag != None: regex = re.compile(r'^((?:\.\d+)+|(?:\.?\w+(?:\-*\w+)+)+)\.?(.*)$') match = regex.match(tag) if match: (self.tag, self.iid) = match.group(1,2) def __setattr__(self, name, val): self.__dict__[name] = STR(val) def print_str(self): return self.tag, self.iid, self.val, self.typeclass VarList(object): def __init__(self, *vs): self.varbinds = [] for var in vs: if isinstance(var, netsnmp.client.Varbind): self.varbinds.append(var) else: self.varbinds.append(Varbind(var)) def __len__(self): return len(self.varbinds) def __getitem__(self, index): return self.varbinds[index] def __setitem__(self, index, val): if isinstance(val, netsnmp.client.Varbind): self.varbinds[index] = val else: raise TypeError def __iter__(self): return iter(self.varbinds) def __delitem__(self, index): del self.varbinds[index] def __repr__(self): return repr(self.varbinds) def __getslice__(self, i, j): return self.varbinds[i:j] def append(self, *vars): for var in vars: if isinstance(var, netsnmp.client.Varbind): self.varbinds.append(var) else: raise TypeError class Session(object): def __init__(self, **args): self.sess_ptr = None self.UseLongNames = 0 self.UseNumeric = 0 self.UseSprintValue = 0 self.UseEnums = 0 self.BestGuess = 0 self.RetryNoSuch = 0 self.ErrorStr = '' self.ErrorNum = 0 self.ErrorInd = 0 sess_args = _parse_session_args(args) for k,v in sess_args.items(): self.__dict__[k] = v if sess_args['Version'] == 3: self.sess_ptr = client_intf.session_v3( sess_args['Version'], sess_args['DestHost'], sess_args['LocalPort'], sess_args['Retries'], sess_args['Timeout'], sess_args['SecName'], secLevelMap[sess_args['SecLevel']], sess_args['SecEngineId'], sess_args['ContextEngineId'], sess_args['Context'], sess_args['AuthProto'], sess_args['AuthPass'], sess_args['PrivProto'], sess_args['PrivPass'], sess_args['Engineboots'], sess_args['Enginetime']) else: self.sess_ptr = client_intf.session( sess_args['Version'], sess_args['Community'], sess_args['DestHost'], sess_args['LocalPort'], sess_args['Retries'], sess_args['Timeout']) def get(self, varlist): res = client_intf.get(self, varlist) return res def set(self, varlist): res = client_intf.set(self, varlist) return res def getnext(self, varlist): res = client_intf.getnext(self, varlist) return res def getbulk(self, nonrepeaters, maxrepetitions, varlist): if self.Version == 1: return None res = client_intf.getbulk(self, nonrepeaters, maxrepetitions, varlist) return res def walk(self, varlist): res = client_intf.walk(self, varlist) return res def __del__(self): res = client_intf.delete_session(self) return resimport netsnmp def snmpget(*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.get(var_list) return resdef snmpset(*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.set(var_list) return resdef snmpgetnext(*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.getnext(var_list) return resdef snmpgetbulk(nonrepeaters, maxrepetitions,*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.getbulk(nonrepeaters, maxrepetitions, var_list) return resdef snmpwalk(*args, **kargs): sess = Session(**kargs) if isinstance(args[0], netsnmp.client.VarList): var_list = args[0] else: var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) for var in var_list: print " ",var.tag, var.iid, "=", var.val, '(',var.type,')' res = sess.walk(var_list) return res
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -