⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xendapi.py

📁 xen 3.2.2 源码
💻 PY
📖 第 1 页 / 共 5 页
字号:
            # wrap validators around readable class attributes            for attr_name in ro_attrs + rw_attrs:                doit('%s.get_%s' % (api_cls, attr_name), True,                     async_support = False)            # wrap validators around writable class attrributes            for attr_name in rw_attrs:                doit('%s.set_%s' % (api_cls, attr_name), True,                     async_support = False)                setter_event_wrapper(api_cls, attr_name)            # wrap validators around methods            for method_name, return_type in methods:                doit('%s.%s' % (api_cls, method_name), True,                     async_support = True)            # wrap validators around class functions            for func_name, return_type in funcs:                doit('%s.%s' % (api_cls, func_name), False,                     async_support = True,                     return_type = return_type)            ctor_event_wrapper(api_cls)            dtor_event_wrapper(api_cls)    _decorate = classmethod(_decorate)    def __init__(self, auth):        self.auth = auth    Base_attr_ro = ['uuid']    Base_attr_rw = []    Base_methods = [('get_record', 'Struct')]    Base_funcs   = [('get_all', 'Set'), ('get_by_uuid', None), ('get_all_records', 'Set')]    # Xen API: Class Session    # ----------------------------------------------------------------    # NOTE: Left unwrapped by __init__    session_attr_ro = ['this_host', 'this_user', 'last_active']    session_methods = [('logout', None)]    def session_get_all(self, session):        return xen_api_success([session])    def session_login_with_password(self, *args):        if len(args) != 2:            return xen_api_error(                ['MESSAGE_PARAMETER_COUNT_MISMATCH',                 'session.login_with_password', 2, len(args)])        username = args[0]        password = args[1]        try:            session = ((self.auth == AUTH_NONE and                        auth_manager().login_unconditionally(username)) or                       auth_manager().login_with_password(username, password))            return xen_api_success(session)        except XendError, e:            return xen_api_error(['SESSION_AUTHENTICATION_FAILED'])    session_login_with_password.api = 'session.login_with_password'    # object methods    def session_logout(self, session):        auth_manager().logout(session)        return xen_api_success_void()    def session_get_record(self, session, self_session):        if self_session != session:            return xen_api_error(['PERMISSION_DENIED'])        record = {'uuid'       : session,                  'this_host'  : XendNode.instance().uuid,                  'this_user'  : auth_manager().get_user(session),                  'last_active': now()}        return xen_api_success(record)    def session_get_uuid(self, session, self_session):        return xen_api_success(self_session)    def session_get_by_uuid(self, session, self_session):        return xen_api_success(self_session)    # attributes (ro)    def session_get_this_host(self, session, self_session):        if self_session != session:            return xen_api_error(['PERMISSION_DENIED'])        return xen_api_success(XendNode.instance().uuid)    def session_get_this_user(self, session, self_session):        if self_session != session:            return xen_api_error(['PERMISSION_DENIED'])        user = auth_manager().get_user(session)        if user is not None:            return xen_api_success(user)        return xen_api_error(['SESSION_INVALID', session])    def session_get_last_active(self, session, self_session):        if self_session != session:            return xen_api_error(['PERMISSION_DENIED'])        return xen_api_success(now())    # Xen API: Class User    # ----------------------------------------------------------------    # TODO: NOT IMPLEMENTED YET    # Xen API: Class Tasks    # ----------------------------------------------------------------    task_attr_ro = ['name_label',                    'name_description',                    'status',                    'progress',                    'type',                    'result',                    'error_info',                    'allowed_operations',                    'session'                    ]    task_attr_rw = []    task_funcs = [('get_by_name_label', 'Set(task)'),                  ('cancel', None)]    def task_get_name_label(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.name_label)    def task_get_name_description(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.name_description)    def task_get_status(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.get_status())    def task_get_progress(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.progress)    def task_get_type(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.type)    def task_get_result(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.result)    def task_get_error_info(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.error_info)    def task_get_allowed_operations(self, session, task_ref):        return xen_api_success({})    def task_get_session(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.session)    def task_get_all(self, session):        tasks = XendTaskManager.get_all_tasks()        return xen_api_success(tasks)    def task_get_record(self, session, task_ref):        task = XendTaskManager.get_task(task_ref)        return xen_api_success(task.get_record())    def task_cancel(self, session, task_ref):        return xen_api_error('OPERATION_NOT_ALLOWED')    def task_get_by_name_label(self, session, name):        return xen_api_success(XendTaskManager.get_task_by_name(name))    # Xen API: Class host    # ----------------------------------------------------------------        host_attr_ro = ['software_version',                    'resident_VMs',                    'PBDs',                    'PIFs',                    'host_CPUs',                    'cpu_configuration',                    'metrics',                    'capabilities',                    'supported_bootloaders',                    'sched_policy',                    'API_version_major',                    'API_version_minor',                    'API_version_vendor',                    'API_version_vendor_implementation',                    'enabled']        host_attr_rw = ['name_label',                    'name_description',                    'other_config',                    'logging']    host_methods = [('disable', None),                    ('enable', None),                    ('reboot', None),                    ('shutdown', None),                    ('add_to_other_config', None),                    ('remove_from_other_config', None),                    ('dmesg', 'String'),                    ('dmesg_clear', 'String'),                    ('get_log', 'String'),                    ('send_debug_keys', None)]        host_funcs = [('get_by_name_label', None),                  ('list_methods', None)]    # attributes    def host_get_name_label(self, session, host_ref):        return xen_api_success(XendNode.instance().name)    def host_set_name_label(self, session, host_ref, new_name):        XendNode.instance().set_name(new_name)        return xen_api_success_void()    def host_get_name_description(self, session, host_ref):        return xen_api_success(XendNode.instance().get_description())    def host_set_name_description(self, session, host_ref, new_desc):        XendNode.instance().set_description(new_desc)        return xen_api_success_void()    def host_get_other_config(self, session, host_ref):        return xen_api_success(XendNode.instance().other_config)    def host_set_other_config(self, session, host_ref, other_config):        node = XendNode.instance()        node.other_config = dict(other_config)        node.save()        return xen_api_success_void()    def host_add_to_other_config(self, session, host_ref, key, value):        node = XendNode.instance()        node.other_config[key] = value        node.save()        return xen_api_success_void()    def host_remove_from_other_config(self, session, host_ref, key):        node = XendNode.instance()        if key in node.other_config:            del node.other_config[key]            node.save()        return xen_api_success_void()    def host_get_API_version_major(self, _, ref):        return xen_api_success(XEN_API_VERSION_MAJOR)    def host_get_API_version_minor(self, _, ref):        return xen_api_success(XEN_API_VERSION_MINOR)    def host_get_API_version_vendor(self, _, ref):        return xen_api_success(XEN_API_VERSION_VENDOR)    def host_get_API_version_vendor_implementation(self, _, ref):        return xen_api_success(XEN_API_VERSION_VENDOR_IMPLEMENTATION)    def host_get_software_version(self, session, host_ref):        return xen_api_success(XendNode.instance().xen_version())    def host_get_enabled(self, _1, _2):        return xen_api_success(XendDomain.instance().allow_new_domains())    def host_get_resident_VMs(self, session, host_ref):        return xen_api_success(XendDomain.instance().get_domain_refs())    def host_get_PBDs(self, _, ref):        return xen_api_success(XendPBD.get_all())    def host_get_PIFs(self, session, ref):        return xen_api_success(XendNode.instance().get_PIF_refs())    def host_get_host_CPUs(self, session, host_ref):        return xen_api_success(XendNode.instance().get_host_cpu_refs())    def host_get_metrics(self, _, ref):        return xen_api_success(XendNode.instance().host_metrics_uuid)    def host_get_capabilities(self, session, host_ref):        return xen_api_success(XendNode.instance().get_capabilities())    def host_get_supported_bootloaders(self, session, host_ref):        return xen_api_success(['pygrub'])    def host_get_sched_policy(self, _, host_ref):        return xen_api_success(XendNode.instance().get_vcpus_policy())    def host_get_cpu_configuration(self, _, host_ref):        return xen_api_success(XendNode.instance().get_cpu_configuration())    def host_set_logging(self, _, host_ref, logging):        return xen_api_todo()    def host_get_logging(self, _, host_ref):        return xen_api_todo()    # object methods    def host_disable(self, session, host_ref):        XendDomain.instance().set_allow_new_domains(False)        return xen_api_success_void()    def host_enable(self, session, host_ref):        XendDomain.instance().set_allow_new_domains(True)        return xen_api_success_void()    def host_reboot(self, session, host_ref):        if not XendDomain.instance().allow_new_domains():            return xen_api_error(XEND_ERROR_HOST_RUNNING)        return xen_api_error(XEND_ERROR_UNSUPPORTED)    def host_shutdown(self, session, host_ref):        if not XendDomain.instance().allow_new_domains():            return xen_api_error(XEND_ERROR_HOST_RUNNING)        return xen_api_error(XEND_ERROR_UNSUPPORTED)            def host_dmesg(self, session, host_ref):        return xen_api_success(XendDmesg.instance().info())    def host_dmesg_clear(self, session, host_ref):        return xen_api_success(XendDmesg.instance().clear())    def host_get_log(self, session, host_ref):        log_file = open(XendLogging.getLogFilename())        log_buffer = log_file.read()        log_buffer = log_buffer.replace('\b', ' ')        log_buffer = log_buffer.replace('\f', '\n')        log_file.close()        return xen_api_success(log_buffer)    def host_send_debug_keys(self, _, host_ref, keys):        node = XendNode.instance()        node.send_debug_keys(keys)        return xen_api_success_void()    def host_get_record(self, session, host_ref):        node = XendNode.instance()        dom = XendDomain.instance()        record = {'uuid': node.uuid,                  'name_label': node.name,                  'name_description': '',                  'API_version_major': XEN_API_VERSION_MAJOR,                  'API_version_minor': XEN_API_VERSION_MINOR,                  'API_version_vendor': XEN_API_VERSION_VENDOR,                  'API_version_vendor_implementation':                  XEN_API_VERSION_VENDOR_IMPLEMENTATION,                  'software_version': node.xen_version(),                  'enabled': XendDomain.instance().allow_new_domains(),                  'other_config': node.other_config,                  'resident_VMs': dom.get_domain_refs(),                  'host_CPUs': node.get_host_cpu_refs(),                  'cpu_configuration': node.get_cpu_configuration(),                  'metrics': node.host_metrics_uuid,                  'capabilities': node.get_capabilities(),                  'supported_bootloaders': ['pygrub'],                  'sched_policy': node.get_vcpus_policy(),                  'logging': {},                  'PIFs': XendPIF.get_all(),                  'PBDs': XendPBD.get_all()}        return xen_api_success(record)    # class methods    def host_get_all(self, session):        return xen_api_success((XendNode.instance().uuid,))    def host_get_by_name_label(self, session, name):        if XendNode.instance().name == name:            return xen_api_success((XendNode.instance().uuid,))        return xen_api_success([])        def host_list_methods(self, _):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -