📄 xendapi.py
字号:
# wrap validators around readable class attributes for attr_name in ro_attrs + rw_attrs: doit('%s.get_%s' % (api_cls, attr_name), True, async_support = False) # wrap validators around writable class attrributes for attr_name in rw_attrs: doit('%s.set_%s' % (api_cls, attr_name), True, async_support = False) setter_event_wrapper(api_cls, attr_name) # wrap validators around methods for method_name, return_type in methods: doit('%s.%s' % (api_cls, method_name), True, async_support = True) # wrap validators around class functions for func_name, return_type in funcs: doit('%s.%s' % (api_cls, func_name), False, async_support = True, return_type = return_type) ctor_event_wrapper(api_cls) dtor_event_wrapper(api_cls) _decorate = classmethod(_decorate) def __init__(self, auth): self.auth = auth Base_attr_ro = ['uuid'] Base_attr_rw = [] Base_methods = [('get_record', 'Struct')] Base_funcs = [('get_all', 'Set'), ('get_by_uuid', None), ('get_all_records', 'Set')] # Xen API: Class Session # ---------------------------------------------------------------- # NOTE: Left unwrapped by __init__ session_attr_ro = ['this_host', 'this_user', 'last_active'] session_methods = [('logout', None)] def session_get_all(self, session): return xen_api_success([session]) def session_login_with_password(self, *args): if len(args) != 2: return xen_api_error( ['MESSAGE_PARAMETER_COUNT_MISMATCH', 'session.login_with_password', 2, len(args)]) username = args[0] password = args[1] try: session = ((self.auth == AUTH_NONE and auth_manager().login_unconditionally(username)) or auth_manager().login_with_password(username, password)) return xen_api_success(session) except XendError, e: return xen_api_error(['SESSION_AUTHENTICATION_FAILED']) session_login_with_password.api = 'session.login_with_password' # object methods def session_logout(self, session): auth_manager().logout(session) return xen_api_success_void() def session_get_record(self, session, self_session): if self_session != session: return xen_api_error(['PERMISSION_DENIED']) record = {'uuid' : session, 'this_host' : XendNode.instance().uuid, 'this_user' : auth_manager().get_user(session), 'last_active': now()} return xen_api_success(record) def session_get_uuid(self, session, self_session): return xen_api_success(self_session) def session_get_by_uuid(self, session, self_session): return xen_api_success(self_session) # attributes (ro) def session_get_this_host(self, session, self_session): if self_session != session: return xen_api_error(['PERMISSION_DENIED']) return xen_api_success(XendNode.instance().uuid) def session_get_this_user(self, session, self_session): if self_session != session: return xen_api_error(['PERMISSION_DENIED']) user = auth_manager().get_user(session) if user is not None: return xen_api_success(user) return xen_api_error(['SESSION_INVALID', session]) def session_get_last_active(self, session, self_session): if self_session != session: return xen_api_error(['PERMISSION_DENIED']) return xen_api_success(now()) # Xen API: Class User # ---------------------------------------------------------------- # TODO: NOT IMPLEMENTED YET # Xen API: Class Tasks # ---------------------------------------------------------------- task_attr_ro = ['name_label', 'name_description', 'status', 'progress', 'type', 'result', 'error_info', 'allowed_operations', 'session' ] task_attr_rw = [] task_funcs = [('get_by_name_label', 'Set(task)'), ('cancel', None)] def task_get_name_label(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.name_label) def task_get_name_description(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.name_description) def task_get_status(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.get_status()) def task_get_progress(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.progress) def task_get_type(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.type) def task_get_result(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.result) def task_get_error_info(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.error_info) def task_get_allowed_operations(self, session, task_ref): return xen_api_success({}) def task_get_session(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.session) def task_get_all(self, session): tasks = XendTaskManager.get_all_tasks() return xen_api_success(tasks) def task_get_record(self, session, task_ref): task = XendTaskManager.get_task(task_ref) return xen_api_success(task.get_record()) def task_cancel(self, session, task_ref): return xen_api_error('OPERATION_NOT_ALLOWED') def task_get_by_name_label(self, session, name): return xen_api_success(XendTaskManager.get_task_by_name(name)) # Xen API: Class host # ---------------------------------------------------------------- host_attr_ro = ['software_version', 'resident_VMs', 'PBDs', 'PIFs', 'host_CPUs', 'cpu_configuration', 'metrics', 'capabilities', 'supported_bootloaders', 'sched_policy', 'API_version_major', 'API_version_minor', 'API_version_vendor', 'API_version_vendor_implementation', 'enabled'] host_attr_rw = ['name_label', 'name_description', 'other_config', 'logging'] host_methods = [('disable', None), ('enable', None), ('reboot', None), ('shutdown', None), ('add_to_other_config', None), ('remove_from_other_config', None), ('dmesg', 'String'), ('dmesg_clear', 'String'), ('get_log', 'String'), ('send_debug_keys', None)] host_funcs = [('get_by_name_label', None), ('list_methods', None)] # attributes def host_get_name_label(self, session, host_ref): return xen_api_success(XendNode.instance().name) def host_set_name_label(self, session, host_ref, new_name): XendNode.instance().set_name(new_name) return xen_api_success_void() def host_get_name_description(self, session, host_ref): return xen_api_success(XendNode.instance().get_description()) def host_set_name_description(self, session, host_ref, new_desc): XendNode.instance().set_description(new_desc) return xen_api_success_void() def host_get_other_config(self, session, host_ref): return xen_api_success(XendNode.instance().other_config) def host_set_other_config(self, session, host_ref, other_config): node = XendNode.instance() node.other_config = dict(other_config) node.save() return xen_api_success_void() def host_add_to_other_config(self, session, host_ref, key, value): node = XendNode.instance() node.other_config[key] = value node.save() return xen_api_success_void() def host_remove_from_other_config(self, session, host_ref, key): node = XendNode.instance() if key in node.other_config: del node.other_config[key] node.save() return xen_api_success_void() def host_get_API_version_major(self, _, ref): return xen_api_success(XEN_API_VERSION_MAJOR) def host_get_API_version_minor(self, _, ref): return xen_api_success(XEN_API_VERSION_MINOR) def host_get_API_version_vendor(self, _, ref): return xen_api_success(XEN_API_VERSION_VENDOR) def host_get_API_version_vendor_implementation(self, _, ref): return xen_api_success(XEN_API_VERSION_VENDOR_IMPLEMENTATION) def host_get_software_version(self, session, host_ref): return xen_api_success(XendNode.instance().xen_version()) def host_get_enabled(self, _1, _2): return xen_api_success(XendDomain.instance().allow_new_domains()) def host_get_resident_VMs(self, session, host_ref): return xen_api_success(XendDomain.instance().get_domain_refs()) def host_get_PBDs(self, _, ref): return xen_api_success(XendPBD.get_all()) def host_get_PIFs(self, session, ref): return xen_api_success(XendNode.instance().get_PIF_refs()) def host_get_host_CPUs(self, session, host_ref): return xen_api_success(XendNode.instance().get_host_cpu_refs()) def host_get_metrics(self, _, ref): return xen_api_success(XendNode.instance().host_metrics_uuid) def host_get_capabilities(self, session, host_ref): return xen_api_success(XendNode.instance().get_capabilities()) def host_get_supported_bootloaders(self, session, host_ref): return xen_api_success(['pygrub']) def host_get_sched_policy(self, _, host_ref): return xen_api_success(XendNode.instance().get_vcpus_policy()) def host_get_cpu_configuration(self, _, host_ref): return xen_api_success(XendNode.instance().get_cpu_configuration()) def host_set_logging(self, _, host_ref, logging): return xen_api_todo() def host_get_logging(self, _, host_ref): return xen_api_todo() # object methods def host_disable(self, session, host_ref): XendDomain.instance().set_allow_new_domains(False) return xen_api_success_void() def host_enable(self, session, host_ref): XendDomain.instance().set_allow_new_domains(True) return xen_api_success_void() def host_reboot(self, session, host_ref): if not XendDomain.instance().allow_new_domains(): return xen_api_error(XEND_ERROR_HOST_RUNNING) return xen_api_error(XEND_ERROR_UNSUPPORTED) def host_shutdown(self, session, host_ref): if not XendDomain.instance().allow_new_domains(): return xen_api_error(XEND_ERROR_HOST_RUNNING) return xen_api_error(XEND_ERROR_UNSUPPORTED) def host_dmesg(self, session, host_ref): return xen_api_success(XendDmesg.instance().info()) def host_dmesg_clear(self, session, host_ref): return xen_api_success(XendDmesg.instance().clear()) def host_get_log(self, session, host_ref): log_file = open(XendLogging.getLogFilename()) log_buffer = log_file.read() log_buffer = log_buffer.replace('\b', ' ') log_buffer = log_buffer.replace('\f', '\n') log_file.close() return xen_api_success(log_buffer) def host_send_debug_keys(self, _, host_ref, keys): node = XendNode.instance() node.send_debug_keys(keys) return xen_api_success_void() def host_get_record(self, session, host_ref): node = XendNode.instance() dom = XendDomain.instance() record = {'uuid': node.uuid, 'name_label': node.name, 'name_description': '', 'API_version_major': XEN_API_VERSION_MAJOR, 'API_version_minor': XEN_API_VERSION_MINOR, 'API_version_vendor': XEN_API_VERSION_VENDOR, 'API_version_vendor_implementation': XEN_API_VERSION_VENDOR_IMPLEMENTATION, 'software_version': node.xen_version(), 'enabled': XendDomain.instance().allow_new_domains(), 'other_config': node.other_config, 'resident_VMs': dom.get_domain_refs(), 'host_CPUs': node.get_host_cpu_refs(), 'cpu_configuration': node.get_cpu_configuration(), 'metrics': node.host_metrics_uuid, 'capabilities': node.get_capabilities(), 'supported_bootloaders': ['pygrub'], 'sched_policy': node.get_vcpus_policy(), 'logging': {}, 'PIFs': XendPIF.get_all(), 'PBDs': XendPBD.get_all()} return xen_api_success(record) # class methods def host_get_all(self, session): return xen_api_success((XendNode.instance().uuid,)) def host_get_by_name_label(self, session, name): if XendNode.instance().name == name: return xen_api_success((XendNode.instance().uuid,)) return xen_api_success([]) def host_list_methods(self, _):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -