⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 client.py

📁 开发snmp的开发包有两个开放的SNMP开发库
💻 PY
字号:
import client_intfimport stringimport reimport types# control verbosity of error outputverbose = 1secLevelMap = { 'noAuthNoPriv':1, 'authNoPriv':2, 'authPriv':3 }def _parse_session_args(kargs):    sessArgs = {        'Version':3,        'DestHost':'localhost',        'Community':'public',        'Timeout':1000000,        'Retries':3,        'RemotePort':161,        'LocalPort':0,        'SecLevel':'noAuthNoPriv',        'SecName':'initial',        'PrivProto':'DEFAULT',        'PrivPass':'',        'AuthProto':'DEFAULT',        'AuthPass':'',        'ContextEngineId':'',        'SecEngineId':'',        'Context':'',        'Engineboots':0,        'Enginetime':0,        }    keys = kargs.keys()    for key in keys:        if sessArgs.has_key(key):            sessArgs[key] = kargs[key]        else:            print stderr, "ERROR: unknown key", key    return sessArgsdef STR(obj):    if obj != None:        obj = str(obj)    return obj    class Varbind(object):    def __init__(self, tag=None, iid=None, val=None, type=None):        self.tag = STR(tag)        self.iid = STR(iid)        self.val = STR(val)        self.type = STR(type)        # parse iid out of tag if needed        if iid == None and tag != None:            regex = re.compile(r'^((?:\.\d+)+|(?:\.?\w+(?:\-*\w+)+)+)\.?(.*)$')            match = regex.match(tag)            if match:                (self.tag, self.iid) = match.group(1,2)    def __setattr__(self, name, val):        self.__dict__[name] = STR(val)    def print_str(self):        return self.tag, self.iid, self.val, self.typeclass VarList(object):    def __init__(self, *vs):        self.varbinds = []        for var in vs:            if isinstance(var, netsnmp.client.Varbind):                self.varbinds.append(var)            else:                self.varbinds.append(Varbind(var))    def __len__(self):        return len(self.varbinds)        def __getitem__(self, index):        return self.varbinds[index]    def __setitem__(self, index, val):        if isinstance(val, netsnmp.client.Varbind):            self.varbinds[index] = val        else:            raise TypeError    def __iter__(self):        return iter(self.varbinds)    def __delitem__(self, index):        del self.varbinds[index]    def __repr__(self):        return repr(self.varbinds)    def __getslice__(self, i, j):        return self.varbinds[i:j]    def append(self, *vars):         for var in vars:            if isinstance(var, netsnmp.client.Varbind):                self.varbinds.append(var)            else:                raise TypeError       class Session(object):    def __init__(self, **args):        self.sess_ptr = None        self.UseLongNames = 0        self.UseNumeric = 0        self.UseSprintValue = 0        self.UseEnums = 0        self.BestGuess = 0        self.RetryNoSuch = 0        self.ErrorStr = ''        self.ErrorNum = 0        self.ErrorInd = 0            sess_args = _parse_session_args(args)        for k,v in sess_args.items():            self.__dict__[k] = v        if sess_args['Version'] == 3:            self.sess_ptr = client_intf.session_v3(                sess_args['Version'],                sess_args['DestHost'],                sess_args['LocalPort'],                sess_args['Retries'],                sess_args['Timeout'],                sess_args['SecName'],                secLevelMap[sess_args['SecLevel']],                sess_args['SecEngineId'],                sess_args['ContextEngineId'],                sess_args['Context'],                sess_args['AuthProto'],                sess_args['AuthPass'],                sess_args['PrivProto'],                sess_args['PrivPass'],                sess_args['Engineboots'],                sess_args['Enginetime'])        else:            self.sess_ptr = client_intf.session(                sess_args['Version'],                sess_args['Community'],                sess_args['DestHost'],                sess_args['LocalPort'],                sess_args['Retries'],                sess_args['Timeout'])            def get(self, varlist):        res = client_intf.get(self, varlist)        return res    def set(self, varlist):        res = client_intf.set(self, varlist)        return res        def getnext(self, varlist):        res = client_intf.getnext(self, varlist)        return res    def getbulk(self, nonrepeaters, maxrepetitions, varlist):        if self.Version == 1:            return None        res = client_intf.getbulk(self, nonrepeaters, maxrepetitions, varlist)        return res    def walk(self, varlist):        res = client_intf.walk(self, varlist)        return res    def __del__(self):        res = client_intf.delete_session(self)        return resimport netsnmp        def snmpget(*args, **kargs):    sess = Session(**kargs)    var_list = VarList()    for arg in args:        if isinstance(arg, netsnmp.client.Varbind):            var_list.append(arg)        else:            var_list.append(Varbind(arg))    res = sess.get(var_list)    return resdef snmpset(*args, **kargs):    sess = Session(**kargs)    var_list = VarList()    for arg in args:        if isinstance(arg, netsnmp.client.Varbind):            var_list.append(arg)        else:            var_list.append(Varbind(arg))    res = sess.set(var_list)    return resdef snmpgetnext(*args, **kargs):    sess = Session(**kargs)    var_list = VarList()    for arg in args:        if isinstance(arg, netsnmp.client.Varbind):            var_list.append(arg)        else:            var_list.append(Varbind(arg))    res = sess.getnext(var_list)    return resdef snmpgetbulk(nonrepeaters, maxrepetitions,*args, **kargs):    sess = Session(**kargs)    var_list = VarList()    for arg in args:        if isinstance(arg, netsnmp.client.Varbind):            var_list.append(arg)        else:            var_list.append(Varbind(arg))    res = sess.getbulk(nonrepeaters, maxrepetitions, var_list)    return resdef snmpwalk(*args, **kargs):    sess = Session(**kargs)    if isinstance(args[0], netsnmp.client.VarList):        var_list = args[0]    else:        var_list = VarList()        for arg in args:            if isinstance(arg, netsnmp.client.Varbind):                var_list.append(arg)            else:                var_list.append(Varbind(arg))    for var in var_list:        print "  ",var.tag, var.iid, "=", var.val, '(',var.type,')'    res = sess.walk(var_list)    return res    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -