xendapi.py
来自「xen虚拟机源代码安装包」· Python 代码 · 共 1,735 行 · 第 1/5 页
PY
1,735 行
#============================================================================# This library is free software; you can redistribute it and/or# modify it under the terms of version 2.1 of the GNU Lesser General Public# License as published by the Free Software Foundation.## This library is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU# Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public# License along with this library; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA#============================================================================# Copyright (C) 2006-2007 XenSource Ltd.#============================================================================import inspectimport osimport Queueimport setsimport stringimport sysimport tracebackimport threadingimport timeimport xmlrpclibimport XendDomain, XendDomainInfo, XendNode, XendDmesgimport XendLogging, XendTaskManager, XendAPIStorefrom XendAPIVersion import *from XendAuthSessions import instance as auth_managerfrom XendError import *from XendClient import ERROR_INVALID_DOMAINfrom XendLogging import logfrom XendNetwork import XendNetworkfrom XendTask import XendTaskfrom XendPIFMetrics import XendPIFMetricsfrom XendVMMetrics import XendVMMetricsfrom XendPIF import XendPIFfrom XendPBD import XendPBDfrom XendPPCI import XendPPCIfrom XendDPCI import XendDPCIfrom XendXSPolicy import XendXSPolicy, XendACMPolicyfrom XendAPIConstants import *from xen.util.xmlrpclib2 import stringifyfrom xen.util.blkif import blkdev_name_to_numberfrom xen.util import xsconstantsAUTH_NONE = 'none'AUTH_PAM = 'pam'argcounts = {}# ------------------------------------------# Utility Methods for Xen API Implementation# ------------------------------------------def xen_api_success(value): """Wraps a return value in XenAPI format.""" if value is None: s = '' else: s = stringify(value) return {"Status": "Success", "Value": s}def xen_api_success_void(): """Return success, but caller expects no return value.""" return xen_api_success("")def xen_api_error(error): """Wraps an error value in XenAPI format.""" if type(error) == tuple: error = list(error) if type(error) != list: error = [error] if len(error) == 0: error = ['INTERNAL_ERROR', 'Empty list given to xen_api_error'] return { "Status": "Failure", "ErrorDescription": [str(x) for x in error] }def xen_api_todo(): """Temporary method to make sure we track down all the TODOs""" return {"Status": "Error", "ErrorDescription": XEND_ERROR_TODO}def now(): return datetime()def datetime(when = None): """Marshall the given time as a Xen-API DateTime. @param when The time in question, given as seconds since the epoch, UTC. May be None, in which case the current time is used. """ if when is None: return xmlrpclib.DateTime(time.gmtime()) else: return xmlrpclib.DateTime(time.gmtime(when))# ---------------------------------------------------# Event dispatch# ---------------------------------------------------EVENT_QUEUE_LENGTH = 50event_registrations = {}def event_register(session, reg_classes): if session not in event_registrations: event_registrations[session] = { 'classes' : sets.Set(), 'queue' : Queue.Queue(EVENT_QUEUE_LENGTH), 'next-id' : 1 } if not reg_classes: reg_classes = classes event_registrations[session]['classes'].union_update(reg_classes)def event_unregister(session, unreg_classes): if session not in event_registrations: return if unreg_classes: event_registrations[session]['classes'].intersection_update( unreg_classes) if len(event_registrations[session]['classes']) == 0: del event_registrations[session] else: del event_registrations[session]def event_next(session): if session not in event_registrations: return xen_api_error(['SESSION_NOT_REGISTERED', session]) queue = event_registrations[session]['queue'] events = [queue.get()] try: while True: events.append(queue.get(False)) except Queue.Empty: pass return xen_api_success(events)def _ctor_event_dispatch(xenapi, ctor, api_cls, session, args): result = ctor(xenapi, session, *args) if result['Status'] == 'Success': ref = result['Value'] event_dispatch('add', api_cls, ref, '') return resultdef _dtor_event_dispatch(xenapi, dtor, api_cls, session, ref, args): result = dtor(xenapi, session, ref, *args) if result['Status'] == 'Success': event_dispatch('del', api_cls, ref, '') return resultdef _setter_event_dispatch(xenapi, setter, api_cls, attr_name, session, ref, args): result = setter(xenapi, session, ref, *args) if result['Status'] == 'Success': event_dispatch('mod', api_cls, ref, attr_name) return resultdef event_dispatch(operation, api_cls, ref, attr_name): assert operation in ['add', 'del', 'mod'] event = { 'timestamp' : now(), 'class' : api_cls, 'operation' : operation, 'ref' : ref, 'obj_uuid' : ref, 'field' : attr_name, } for reg in event_registrations.values(): if api_cls in reg['classes']: event['id'] = reg['next-id'] reg['next-id'] += 1 reg['queue'].put(event)# ---------------------------------------------------# Python Method Decorators for input value validation# ---------------------------------------------------def trace(func, api_name = ''): """Decorator to trace XMLRPC Xen API methods. @param func: function with any parameters @param api_name: name of the api call for debugging. """ if hasattr(func, 'api'): api_name = func.api def trace_func(self, *args, **kwargs): log.debug('%s: %s' % (api_name, args)) return func(self, *args, **kwargs) trace_func.api = api_name return trace_funcdef catch_typeerror(func): """Decorator to catch any TypeErrors and translate them into Xen-API errors. @param func: function with params: (self, ...) @rtype: callable object """ def f(self, *args, **kwargs): try: return func(self, *args, **kwargs) except TypeError, exn: #log.exception('catch_typeerror') if hasattr(func, 'api') and func.api in argcounts: # Assume that if the argument count was wrong and if the # exception was thrown inside this file, then it is due to an # invalid call from the client, otherwise it's an internal # error (which will be handled further up). expected = argcounts[func.api] actual = len(args) + len(kwargs) if expected != actual: tb = sys.exc_info()[2] try: sourcefile = traceback.extract_tb(tb)[-1][0] if sourcefile == inspect.getsourcefile(XendAPI): return xen_api_error( ['MESSAGE_PARAMETER_COUNT_MISMATCH', func.api, expected, actual]) finally: del tb raise except XendAPIError, exn: return xen_api_error(exn.get_api_error()) return fdef session_required(func): """Decorator to verify if session is valid before calling method. @param func: function with params: (self, session, ...) @rtype: callable object """ def check_session(self, session, *args, **kwargs): if auth_manager().is_session_valid(session): return func(self, session, *args, **kwargs) else: return xen_api_error(['SESSION_INVALID', session]) return check_sessiondef _is_valid_ref(ref, validator): return type(ref) == str and validator(ref)def _check_ref(validator, clas, func, api, session, ref, *args, **kwargs): if _is_valid_ref(ref, validator): return func(api, session, ref, *args, **kwargs) else: return xen_api_error(['HANDLE_INVALID', clas, ref])def valid_host(func): """Decorator to verify if host_ref is valid before calling method. @param func: function with params: (self, session, host_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(XendNode.instance().is_valid_host, 'host', func, *args, **kwargs)def valid_host_metrics(func): """Decorator to verify if host_metrics_ref is valid before calling method. @param func: function with params: (self, session, host_metrics_ref) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: r == XendNode.instance().host_metrics_uuid, 'host_metrics', func, *args, **kwargs)def valid_host_cpu(func): """Decorator to verify if host_cpu_ref is valid before calling method. @param func: function with params: (self, session, host_cpu_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(XendNode.instance().is_valid_cpu, 'host_cpu', func, *args, **kwargs)def valid_vm(func): """Decorator to verify if vm_ref is valid before calling method. @param func: function with params: (self, session, vm_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(XendDomain.instance().is_valid_vm, 'VM', func, *args, **kwargs)def valid_vbd(func): """Decorator to verify if vbd_ref is valid before calling method. @param func: function with params: (self, session, vbd_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: XendDomain.instance().is_valid_dev('vbd', r), 'VBD', func, *args, **kwargs)def valid_vbd_metrics(func): """Decorator to verify if ref is valid before calling method. @param func: function with params: (self, session, ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: XendDomain.instance().is_valid_dev('vbd', r), 'VBD_metrics', func, *args, **kwargs)def valid_vif(func): """Decorator to verify if vif_ref is valid before calling method. @param func: function with params: (self, session, vif_ref, ...) @rtype: callable object """ return lambda *args, **kwargs: \ _check_ref(lambda r: XendDomain.instance().is_valid_dev('vif', r), 'VIF', func, *args, **kwargs)def valid_vif_metrics(func): """Decorator to verify if ref is valid before calling method.
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?