📄 xendapi.py
字号:
#============================================================================# This library is free software; you can redistribute it and/or# modify it under the terms of version 2.1 of the GNU Lesser General Public# License as published by the Free Software Foundation.## This library is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU# Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public# License along with this library; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA#============================================================================# Copyright (C) 2006-2007 XenSource Ltd.#============================================================================import inspectimport osimport Queueimport setsimport stringimport sysimport tracebackimport threadingimport timeimport xmlrpclibimport XendDomain, XendDomainInfo, XendNode, XendDmesgimport XendLogging, XendTaskManager, XendAPIStorefrom XendAPIVersion import *from XendAuthSessions import instance as auth_managerfrom XendError import *from XendClient import ERROR_INVALID_DOMAINfrom XendLogging import logfrom XendNetwork import XendNetworkfrom XendTask import XendTaskfrom XendPIFMetrics import XendPIFMetricsfrom XendVMMetrics import XendVMMetricsfrom XendPIF import XendPIFfrom XendPBD import XendPBDfrom XendXSPolicy import XendXSPolicy, XendACMPolicyfrom XendAPIConstants import *from xen.util.xmlrpclib2 import stringifyfrom xen.util.blkif import blkdev_name_to_numberfrom xen.util import xsconstantsAUTH_NONE = 'none'AUTH_PAM = 'pam'argcounts = {}# ------------------------------------------# Utility Methods for Xen API Implementation# ------------------------------------------def xen_api_success(value): """Wraps a return value in XenAPI format.""" if value is None: s = '' else: s = stringify(value) return {"Status": "Success", "Value": s}def xen_api_success_void(): """Return success, but caller expects no return value.""" return xen_api_success("")def xen_api_error(error): """Wraps an error value in XenAPI format.""" if type(error) == tuple: error = list(error) if type(error) != list: error = [error] if len(error) == 0: error = ['INTERNAL_ERROR', 'Empty list given to xen_api_error'] return { "Status": "Failure", "ErrorDescription": [str(x) for x in error] }def xen_api_todo(): """Temporary method to make sure we track down all the TODOs""" return {"Status": "Error", "ErrorDescription": XEND_ERROR_TODO}def now(): return datetime()def datetime(when = None): """Marshall the given time as a Xen-API DateTime. @param when The time in question, given as seconds since the epoch, UTC. May be None, in which case the current time is used. """ if when is None: return xmlrpclib.DateTime(time.gmtime()) else: return xmlrpclib.DateTime(time.gmtime(when))# ---------------------------------------------------# Event dispatch# ---------------------------------------------------EVENT_QUEUE_LENGTH = 50event_registrations = {}def event_register(session, reg_classes): if session not in event_registrations: event_registrations[session] = { 'classes' : sets.Set(), 'queue' : Queue.Queue(EVENT_QUEUE_LENGTH), 'next-id' : 1 } if not reg_classes: reg_classes = classes event_registrations[session]['classes'].union_update(reg_classes)def event_unregister(session, unreg_classes): if session not in event_registrations: return if unreg_classes: event_registrations[session]['classes'].intersection_update( unreg_classes) if len(event_registrations[session]['classes']) == 0: del event_registrations[session] else: del event_registrations[session]def event_next(session): if session not in event_registrations: return xen_api_error(['SESSION_NOT_REGISTERED', session]) queue = event_registrations[session]['queue'] events = [queue.get()] try: while True: events.append(queue.get(False)) except Queue.Empty: pass return xen_api_success(events)def _ctor_event_dispatch(xenapi, ctor, api_cls, session, args): result = ctor(xenapi, session, *args) if result['Status'] == 'Success': ref = result['Value'] event_dispatch('add', api_cls, ref, '') return resultdef _dtor_event_dispatch(xenapi, dtor, api_cls, session, ref, args): result = dtor(xenapi, session, ref, *args) if result['Status'] == 'Success': event_dispatch('del', api_cls, ref, '') return resultdef _setter_event_dispatch(xenapi, setter, api_cls, attr_name, session, ref, args): result = setter(xenapi, session, ref, *args) if result['Status'] == 'Success': event_dispatch('mod', api_cls, ref, attr_name) return resultdef event_dispatch(operation, api_cls, ref, attr_name): assert operation in ['add', 'del', 'mod'] event = { 'timestamp' : now(), 'class' : api_cls, 'operation' : operation, 'ref' : ref, 'obj_uuid' : ref, 'field' : attr_name, } for reg in event_registrations.values(): if api_cls in reg['classes']: event['id'] = reg['next-id'] reg['next-id'] += 1 reg['queue'].put(event)# ---------------------------------------------------# Python Method Decorators for input value validation# ---------------------------------------------------def trace(func, api_name = ''): """Decorator to trace XMLRPC Xen API methods. @param func: function with any parameters @param api_name: name of the api call for debugging. """ if hasattr(func, 'api'): api_name = func.api def trace_func(self, *args, **kwargs): log.debug('%s: %s' % (api_name, args)) return func(self, *args, **kwargs) trace_func.api = api_name return trace_funcdef catch_typeerror(func): """Decorator to catch any TypeErrors and translate them into Xen-API errors. @param func: function with params: (self, ...) @rtype: callable object """ def f(self, *args, **kwargs): try: return func(self, *args, **kwargs) except TypeError, exn: #log.exception('catch_typeerror') if hasattr(func, 'api') and func.api in argcounts: # Assume that if the argument count was wrong and if the # exception was thrown inside this file, then it is due to an # invalid call from the client, otherwise it's an internal # error (which will be handled further up). expected = argcounts[func.api] actual = len(args) + len(kwargs) if expected != actual: tb = sys.exc_info()[2] try: sourcefile = traceback.extract_tb(tb)[-1][0] if sourcefile == inspect.getsourcefile(XendAPI): return xen_api_error( ['MESSAGE_PARAMETER_COUNT_MISMATCH', func.api, expected, actual]) finally: del tb raise except XendAPIError, exn: return xen_api_error(exn.get_api_error()) return fdef session_required(func): """Decorator to verify if session is valid before calling method. @param func: function with params: (self, session, ...) @rtype: callable object """ def check_session(self, session, *args, **kwargs): if auth_manager().is_session_valid(session): return func(self, session, *args, **kwargs) else: return xen_api_error(['SESSION_INVALID', session]) return check_sessiondef _is_valid_ref(ref, validator): return type(ref) == str and validator(ref)def _check_ref(validator, clas, func, api, session, ref, *args, **kwargs): if _is_valid_ref(ref, validator): return func(api, session, ref, *args, **kwargs) else: return xen_api_error(['HANDLE_INVALID', clas, ref])def valid_host(func): """Decorator to verify if host_ref is valid before calling method. @param func: function with params: (self, session, host_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(XendNode.instance().is_valid_host, 'host', func, *args, **kwargs)def valid_host_metrics(func): """Decorator to verify if host_metrics_ref is valid before calling method. @param func: function with params: (self, session, host_metrics_ref) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: r == XendNode.instance().host_metrics_uuid, 'host_metrics', func, *args, **kwargs)def valid_host_cpu(func): """Decorator to verify if host_cpu_ref is valid before calling method. @param func: function with params: (self, session, host_cpu_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(XendNode.instance().is_valid_cpu, 'host_cpu', func, *args, **kwargs)def valid_vm(func): """Decorator to verify if vm_ref is valid before calling method. @param func: function with params: (self, session, vm_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(XendDomain.instance().is_valid_vm, 'VM', func, *args, **kwargs)def valid_vbd(func): """Decorator to verify if vbd_ref is valid before calling method. @param func: function with params: (self, session, vbd_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: XendDomain.instance().is_valid_dev('vbd', r), 'VBD', func, *args, **kwargs)def valid_vbd_metrics(func): """Decorator to verify if ref is valid before calling method. @param func: function with params: (self, session, ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: XendDomain.instance().is_valid_dev('vbd', r), 'VBD_metrics', func, *args, **kwargs)def valid_vif(func): """Decorator to verify if vif_ref is valid before calling method. @param func: function with params: (self, session, vif_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: XendDomain.instance().is_valid_dev('vif', r), 'VIF', func, *args, **kwargs)def valid_vif_metrics(func): """Decorator to verify if ref is valid before calling method. @param func: function with params: (self, session, ref, ...)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -