⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xendxspolicyadmin.py

📁 xen虚拟机源代码安装包
💻 PY
字号:
#============================================================================# This library is free software; you can redistribute it and/or# modify it under the terms of version 2.1 of the GNU Lesser General Public# License as published by the Free Software Foundation.## This library is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU# Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public# License along with this library; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA#============================================================================# Copyright (C) 2006,2007 International Business Machines Corp.# Author: Stefan Berger <stefanb@us.ibm.com>#============================================================================import osimport shutilfrom xml.dom import minidom, Nodefrom xen.xend.XendLogging import logfrom xen.xend import uuidfrom xen.util import xsconstants, bootloaderimport xen.util.xsm.acm.acm as securityfrom xen.util.xspolicy import XSPolicyfrom xen.util.acmpolicy import ACMPolicy, initializefrom xen.xend.XendError import SecurityErrorclass XSPolicyAdmin:    """ The class that handles the managed policies in the system.        Handles adding and removing managed policies. All managed        policies are handled using a reference (UUID) which is        assigned to the policy by this class.    """    def __init__(self, maxpolicies):        """ Create a management class for managing the system's            policies.            @param maxpolicies: The max. number of policies allowed                                on the system (currently '1')        """        self.maxpolicies = maxpolicies        self.policies = {}        self.xsobjs = {}        bootloader.init()        if security.on() == xsconstants.XS_POLICY_ACM:            self.__acm_init()    def __acm_init(self):        act_pol_name = self.get_hv_loaded_policy_name()        initialize()        ref = uuid.createString()        try:            self.xsobjs[ref] = ACMPolicy(name=act_pol_name, ref=ref)            self.policies[ref] = (act_pol_name, xsconstants.ACM_POLICY_ID)            self.xsobjs[ref].validate_enforced_policy_hash()        except Exception, e:            log.error("Could not find XML representation of policy '%s': "                      "%s" % (act_pol_name,e))            rc, errors, acmpol_def = ACMPolicy.force_default_policy(ref)            if rc == xsconstants.XSERR_SUCCESS:                self.xsobjs[ref] = acmpol_def                self.policies[ref] = (acmpol_def.get_name(),                                      xsconstants.ACM_POLICY_ID)                log.info("Switched to DEFAULT policy.")        log.debug("XSPolicyAdmin: Known policies: %s" % self.policies)    def isXSEnabled(self):        """ Check whether 'security' is enabled on this system.            This currently only checks for ACM-enablement.        """        rc = 0        if security.on() == xsconstants.XS_POLICY_ACM:            rc |= xsconstants.XS_POLICY_ACM        return rc    def add_acmpolicy_to_system(self, xmltext, flags, overwrite):        """ Add an ACM policy's xml representation to the system. The            policy will automatically be compiled         flags:          XS_INST_BOOT : make policy the one to boot the system with                         by default; if there's a policy already installed,                         refuse to install this policy unless its one with                         the same name          XS_INST_LOAD : load the policy immediately; if this does not work                         refuse to install this policy         overwrite:          If any policy is installed and this is False, refuse to install          this policy          If flags is True, then any existing policy will be removed from          the system and the new one will be installed        """        from xen.xend import XendDomain        domains = XendDomain.instance()        try:            domains.domains_lock.acquire()            return self.__add_acmpolicy_to_system(xmltext, flags, overwrite)        finally:            domains.domains_lock.release()    def __add_acmpolicy_to_system(self, xmltext, flags, overwrite):        errors = ""        if security.on() != xsconstants.XS_POLICY_ACM:            raise SecurityError(-xsconstants.XSERR_POLICY_TYPE_UNSUPPORTED)        loadedpol = self.get_loaded_policy()        if loadedpol:            # This is meant as an update to a currently loaded policy            if flags & xsconstants.XS_INST_LOAD == 0:                raise SecurityError(-xsconstants.XSERR_POLICY_LOADED)            # Remember old flags, so they can be restored if update fails            old_flags = self.get_policy_flags(loadedpol)            # Remove policy from bootloader in case of new name of policy            self.rm_bootpolicy()            rc, errors = loadedpol.update(xmltext)            if rc == 0:                irc = self.activate_xspolicy(loadedpol, flags)                # policy is loaded; if setting the boot flag fails it's ok.            else:                old_flags = old_flags & xsconstants.XS_INST_BOOT                log.info("OLD FLAGS TO RESTORE: %s" % str(old_flags))                if old_flags != 0:                    self.activate_xspolicy(loadedpol, xsconstants.XS_INST_BOOT)            return (loadedpol, rc, errors)        try:            dom = minidom.parseString(xmltext.encode("utf-8"))        except:            raise SecurityError(-xsconstants.XSERR_BAD_XML)        ref = uuid.createString()        acmpol = ACMPolicy(dom=dom, ref=ref)        #First some basic tests that do not modify anything:        if flags & xsconstants.XS_INST_BOOT and not overwrite:            filename = acmpol.get_filename(".bin","",dotted=True)            if bootloader.get_default_policy != None and \               not bootloader.loads_default_policy(filename):                raise SecurityError(-xsconstants.XSERR_BOOTPOLICY_INSTALLED)        if not overwrite and len(self.policies) >= self.maxpolicies:            raise SecurityError(-xsconstants.XSERR_BOOTPOLICY_INSTALLED)        if overwrite:            #This should only give one key since only one policy is            #allowed.            keys = self.policies.keys()            for k in keys:                self.rm_bootpolicy()                rc = self.rm_policy_from_system(k, force=overwrite)                if rc != xsconstants.XSERR_SUCCESS:                    raise SecurityError(rc)        rc = acmpol.compile()        if rc != 0:            raise SecurityError(rc)        if flags & xsconstants.XS_INST_LOAD:            rc = acmpol.loadintohv()            if rc != 0:                raise SecurityError(rc)        if flags & xsconstants.XS_INST_BOOT:            rc = self.make_boot_policy(acmpol)            if rc != 0:                # If it cannot be installed due to unsupported                # bootloader, let it be ok.                pass        if dom:            new_entry = { ref : tuple([acmpol.get_name(),                                       xsconstants.ACM_POLICY_ID]) }            self.policies.update(new_entry)            self.xsobjs[ref]  = acmpol        return (acmpol, xsconstants.XSERR_SUCCESS, errors)    def reset_acmpolicy(self):        """           Attempt to reset the system's policy by udating it with           the DEFAULT policy.        """        from xen.xend import XendDomain        domains = XendDomain.instance()        try:            domains.domains_lock.acquire()            xml = ACMPolicy.get_reset_policy_xml()            flags = xsconstants.XS_INST_BOOT | xsconstants.XS_INST_LOAD            return self.__add_acmpolicy_to_system(xml, flags, True)        finally:            domains.domains_lock.release()    def make_boot_policy(self, acmpol):        if acmpol.is_default_policy():            return xsconstants.XSERR_SUCCESS        rc = acmpol.copy_policy_file(".bin","/boot")        if rc != xsconstants.XSERR_SUCCESS:            return rc        try:            filename = acmpol.get_filename(".bin","",dotted=True)            if bootloader.set_default_boot_policy(filename) != True:                return xsconstants.XSERR_BOOTPOLICY_INSTALL_ERROR        except:            return xsconstants.XSERR_FILE_ERROR        return xsconstants.XSERR_SUCCESS    def activate_xspolicy(self, xspol, flags):        from xen.xend import XendDomain        domains = XendDomain.instance()        try:            domains.domains_lock.acquire()            return self.__activate_xspolicy(xspol, flags)        finally:            domains.domains_lock.release()    def __activate_xspolicy(self, xspol, flags):        rc = xsconstants.XSERR_SUCCESS        if flags & xsconstants.XS_INST_LOAD:            rc = xspol.loadintohv()        if rc == xsconstants.XSERR_SUCCESS and \           flags & xsconstants.XS_INST_BOOT:            rc = self.make_boot_policy(xspol)        if rc == xsconstants.XSERR_SUCCESS:            rc = flags        return rc    def rm_policy_from_system(self, ref, force=False):        if self.policies.has_key(ref):            acmpol = self.xsobjs[ref]            rc = acmpol.destroy()            if rc == xsconstants.XSERR_SUCCESS or force:                del self.policies[ref]                del self.xsobjs[ref]                rc = xsconstants.XSERR_SUCCESS            return rc    def rm_bootpolicy(self):        """ Remove any (ACM) boot policy from the grub configuration file        """        rc = 0        title = bootloader.get_default_title()        if title != None:            polnames = []            for (k, v) in self.xsobjs.items():                polnames.append(v.get_filename(".bin","",dotted=True))            bootloader.rm_policy_from_boottitle(title, polnames)        else:            rc = -xsconstants.XSERR_NO_DEFAULT_BOOT_TITLE        return rc    def get_policy_flags(self, acmpol):        """ Get the currently active flags of a policy, i.e., whether the            system is using this policy as its boot policy for the default            boot title.        """        flags = 0        filename = acmpol.get_filename(".bin","", dotted=True)        if bootloader.loads_default_policy(filename) or \           acmpol.is_default_policy():            flags |= xsconstants.XS_INST_BOOT        if acmpol.isloaded():            flags |= xsconstants.XS_INST_LOAD        return flags    def get_policies(self):        """ Get all managed policies. """        return self.xsobjs.values()    def get_policies_refs(self):        """ Get all managed policies' references. """        return self.xsobjs.keys()    def has_ref(self, ref):        """ Check whether there is a policy with the given reference """        return self.xsobjs.has_key(ref)    def policy_from_ref(self, ref):        """ Get the policy's object given its reference """        if ref in self.xsobjs.keys():            return self.xsobjs[ref]        return None    def ref_from_polname(self, polname):        """ Get the reference of the policy given its name """        ref = None        for (k, v) in self.xsobjs.items():            if v.get_name() == polname:                ref = k                break        return ref    def lock_policy(self, ref):        """ get exclusive access to a policy """        self.xsobjs[ref].grab_lock()    def unlock_policy(self, ref):        """ release exclusive access to a policy """        self.xsobjs[ref].unlock()    def get_loaded_policy(self):        for pol in self.xsobjs.values():            if pol.isloaded():                return pol        return None    def get_hv_loaded_policy_name(self):        return security.get_active_policy_name()    def get_policy_by_name(self, name):        for pol in self.xsobjs.values():            if pol.get_name() == name:                return pol        return None    def get_domain0_bootlabel(self):        """ Get the domain0 bootlabel from the default boot title """        title = ""        def_title = bootloader.get_default_title()        line = bootloader.get_kernel_val(def_title, "ssidref")        if line:            parms = line.split(":",1)            if len(parms) > 1:                title = parms[1]        return title    def set_domain0_bootlabel(self, xspol, label):        """ Set the domain-0 bootlabel under the given policy. If the            current policy is the default policy, it will remove it. """        rm_entry = (xspol.get_name() == "DEFAULT")        return xspol.set_vm_bootlabel(label, rm_entry)    def rm_domain0_bootlabel(self):        """ Remove the domain-0 bootlabel from the default boot title """        def_title = bootloader.get_default_title()        return bootloader.set_kernel_attval(def_title, "ssidref", None)    def ssidref_to_vmlabel(self, ssidref):        """ Given an ssidref, return the vmlabel under the current policy """        vmlabel = ""        pol = self.get_loaded_policy()        if pol:            vmlabel = pol.policy_get_domain_label_by_ssidref_formatted(ssidref)        return vmlabel    def get_stes_of_vmlabel(self, vmlabel_xapi):        """ Get the list of STEs given a VM label in XenAPI format """        stes = []        loadedpol = self.get_loaded_policy()        if loadedpol:            tmp = vmlabel_xapi.split(":")            if len(tmp) != 3:                return []            stes = loadedpol.policy_get_stes_of_vmlabel(tmp[2])        return stes    def get_enforced_binary(self, xstype):        res = None        if xstype == xsconstants.XS_POLICY_ACM:            res = ACMPolicy.get_enforced_binary()        return respoladmin = Nonedef XSPolicyAdminInstance(maxpolicies=1):    global poladmin    if poladmin == None:        poladmin = XSPolicyAdmin(maxpolicies)    return poladmin

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -