⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 acmpolicy.py

📁 xen虚拟机源代码安装包
💻 PY
📖 第 1 页 / 共 4 页
字号:
        """        rc = -xsconstants.XSERR_XML_PROCESSING        errors = ""        acmpol_old = self        try:            acmpol_new = ACMPolicy(xml=xml_new)        except Exception:            return -xsconstants.XSERR_XML_PROCESSING, errors        vmlabel_map = acmpol_new.policy_get_vmlabel_translation_map()        # An update requires version information in the current        # and new policy. The version number of the current policy        # must be the same as what is in the FromPolicy/Version node        # in the new one and the current policy's name must be the        # same as in FromPolicy/PolicyName        # The default policy when it is set skips this step.        if not acmpol_new.is_default_policy() and \           not acmpol_old.is_default_policy():            irc = self.__do_update_version_check(acmpol_new)            if irc != xsconstants.XSERR_SUCCESS:                return irc, errors        if self.isloaded():            newvmnames = \                 acmpol_new.policy_get_virtualmachinelabel_names_sorted()            oldvmnames = \                 acmpol_old.policy_get_virtualmachinelabel_names_sorted()            del_array = ""            chg_array = ""            for o in oldvmnames:                if o not in newvmnames:                    old_idx = oldvmnames.index(o)                    if vmlabel_map.has_key(o):                        #not a deletion, but a renaming                        new = vmlabel_map[o]                        new_idx = newvmnames.index(new)                        chg_array += struct.pack("ii", old_idx, new_idx)                    else:                        del_array += struct.pack("i", old_idx)            for v in newvmnames:                if v in oldvmnames:                    old_idx = oldvmnames.index(v)                    new_idx = newvmnames.index(v)                    if old_idx != new_idx:                        chg_array += struct.pack("ii", old_idx, new_idx)            # VM labels indicated in the 'from' attribute of a VM or            # resource node but that did not exist in the old policy            # are considered bad labels.            bad_renamings = set(vmlabel_map.keys()) - set(oldvmnames)            if len(bad_renamings) > 0:                log.error("Bad VM label renamings: %s" %                          list(bad_renamings))                return -xsconstants.XSERR_BAD_LABEL, errors            reslabel_map = acmpol_new.policy_get_reslabel_translation_map()            oldresnames  = acmpol_old.policy_get_resourcelabel_names()            bad_renamings = set(reslabel_map.keys()) - set(oldresnames)            if len(bad_renamings) > 0:                log.error("Bad resource label renamings: %s" %                          list(bad_renamings))                return -xsconstants.XSERR_BAD_LABEL, errors            #Get binary and map from the new policy            rc, pol_map, bin_pol = acmpol_new.policy_create_map_and_bin()            if rc != xsconstants.XSERR_SUCCESS:                log.error("Could not build the map and binary policy.")                return rc, errors            #Need to do / check the following:            # - relabel all resources where there is a 'from' field in            #   the policy and mark those as unlabeled where the label            #   does not appear in the new policy anymore            # - relabel all VMs where there is a 'from' field in the            #   policy and mark those as unlabeled where the label            #   does not appear in the new policy anymore; no running            #   or paused VM may be unlabeled through this            # - check that under the new labeling conditions the VMs            #   still have access to their resources as before. Unlabeled            #   resources are inaccessible. If this check fails, the            #   update failed.            # - Attempt changes in the hypervisor; if this step fails,            #   roll back the relabeling of resources and VMs            # - Commit the relabeling of resources            rc, errors = security.change_acm_policy(bin_pol,                                        del_array, chg_array,                                        vmlabel_map, reslabel_map,                                        self, acmpol_new,                                        acmpol_new.is_default_policy())            if rc == 0:                # Replace the old DOM with the new one and save it                self.dom = acmpol_new.dom                self.compile()                log.info("ACM policy update was successful")        else:            #Not loaded in HV            self.dom = acmpol_new.dom            rc = self.compile()        return rc, errors    def force_default_policy(klass, policy_ref):        """           Force the installation of the DEFAULT policy if for           example no XML of the current policy is available and           the update path with comparisons of old and new policy           cannot be taken.           This only succeeds if only Domain-0 is running or           all guest have the same ssidref as Domain-0.        """        errors = ""        acmpol_new = ACMPolicy(xml = get_DEFAULT_policy(), ref=policy_ref)        from xen.lowlevel import acm        dom0_ssidref = acm.getssid(0)        del_array = ""        chg_array = struct.pack("ii",                                dom0_ssidref['ssidref'] & 0xffff,                                0x1)        rc, pol_map, bin_pol = acmpol_new.policy_create_map_and_bin()        if rc != xsconstants.XSERR_SUCCESS:            return rc, errors, acmpol_new        rc, errors = security.hv_chg_policy(bin_pol, del_array, chg_array)        return rc, errors, acmpol_new    force_default_policy = classmethod(force_default_policy)    def get_reset_policy_xml(klass):        dom0_label = security.get_ssid(0)[1]        return get_DEFAULT_policy(dom0_label)    get_reset_policy_xml = classmethod(get_reset_policy_xml)    def __do_update_version_check(self, acmpol_new):        acmpol_old = self        now_vers    = acmpol_old.policy_dom_get_hdr_item("Version")        now_name    = acmpol_old.policy_dom_get_hdr_item("PolicyName")        req_oldvers = acmpol_new.policy_dom_get_frompol_item("Version")        req_oldname = acmpol_new.policy_dom_get_frompol_item("PolicyName")        if now_vers == "" or \           now_vers != req_oldvers or \           now_name != req_oldname:            log.info("Policy rejected: %s != %s or %s != %s" % \                     (now_vers,req_oldvers,now_name,req_oldname))            return -xsconstants.XSERR_VERSION_PREVENTS_UPDATE        if not self.isVersionUpdate(acmpol_new):            log.info("Policy rejected since new version is not an update.")            return -xsconstants.XSERR_VERSION_PREVENTS_UPDATE        return xsconstants.XSERR_SUCCESS    def compareVersions(self, v1, v2):        """            Compare two policy versions given their tuples of major and            minor.            Return '0' if versions are equal, '>0' if v1 > v2 and            '<' if v1 < v2        """        rc = v1[0] - v2[0]        if rc == 0:            rc = v1[1] - v2[1]        return rc    def getVersionTuple(self, item="Version"):        v_str = self.policy_dom_get_hdr_item(item)        return self.__convVersionToTuple(v_str)    def get_version(self):        return self.policy_dom_get_hdr_item("Version")    def isVersionUpdate(self, polnew):        if self.compareVersions(polnew.getVersionTuple(),                                self.getVersionTuple()) > 0:            return True        return False    def __convVersionToTuple(self, v_str):        """ Convert a version string, formatted according to the scheme            "%d.%d" into a tuple of (major, minor). Return (0,0) if the            string is empty.        """        major = 0        minor = 0        if v_str != "":            tmp = v_str.split(".")            major = int(tmp[0])            if len(tmp) > 1:                minor = int(tmp[1])        return (major, minor)    def get_policies_path(self):        xoptions = XendOptions.instance()        basedir = xoptions.get_xend_security_path()        return basedir + "/policies/"    def policy_path(self, name):        prefix = self.get_policies_path()        path = prefix + name.replace('.','/')        _path = path.split("/")        del _path[-1]        mkdir.parents("/".join(_path), stat.S_IRWXU)        return path    def path_from_policy_name(self, name):        return self.policy_path(name) + "-security_policy.xml"    #    # Functions interacting with the bootloader    #    def vmlabel_to_ssidref(self, vm_label):        """ Convert a VMlabel into an ssidref given the current            policy            Return xsconstants.INVALID_SSIDREF if conversion failed.        """        ssidref = xsconstants.INVALID_SSIDREF        names = self.policy_get_virtualmachinelabel_names_sorted()        try:            vmidx = names.index(vm_label)            ssidref = (vmidx << 16) | vmidx        except:            pass        return ssidref    def set_vm_bootlabel(self, vm_label, remove=False):        parms="<>"        if vm_label != "":            ssidref = self.vmlabel_to_ssidref(vm_label)            if ssidref == xsconstants.INVALID_SSIDREF:                return -xsconstants.XSERR_BAD_LABEL            parms = "0x%08x:%s:%s:%s" % \                        (ssidref, xsconstants.ACM_POLICY_ID, \                         self.get_name(),vm_label)        else:            ssidref = 0 #Identifier for removal        if remove == True:            parms = "<>"        try:            def_title = bootloader.get_default_title()            bootloader.set_kernel_attval(def_title, "ssidref", parms)        except:            return -xsconstants.XSERR_GENERAL_FAILURE        return ssidref    #    # Utility functions related to the policy's files    #    def get_filename(self, postfix, prefix=None, dotted=False):        """           Create the filename for the policy. The prefix is prepended           to the path. If dotted is True, then a policy name like           'a.b.c' will remain as is, otherwise it will become 'a/b/c'        """        if prefix == None:            prefix = self.get_policies_path()        name = self.get_name()        if name:            p = name.split(".")            path = ""            if dotted:                sep = "."            else:                sep = "/"            if len(p) > 1:                path = sep.join(p[0:len(p)-1])            if prefix != "" or path != "":                allpath = prefix + path + sep + p[-1] + postfix            else:                allpath = p[-1] + postfix            return allpath        return None    def __readfile(self, name):        cont = ""        filename = self.get_filename(name)        f = open(filename, "r")        if f:            cont = f.read()            f.close()        return cont    def get_map(self):        return self.__readfile(".map")    def get_bin(self):        return self.__readfile(".bin")    def copy_policy_file(self, suffix, destdir):        spolfile = self.get_filename(suffix)        dpolfile = destdir + "/" + self.get_filename(suffix,"",dotted=True)        try:            shutil.copyfile(spolfile, dpolfile)        except Exception, e:            log.error("Could not copy policy file %s to %s: %s" %                      (spolfile, dpolfile, str(e)))            return -xsconstants.XSERR_FILE_ERROR        return xsconstants.XSERR_SUCCESS    #    # DOM-related functions    #    def policy_dom_get(self, parent, key, createit=False):        for node in parent.childNodes:            if node.nodeType == Node.ELEMENT_NODE:                if node.nodeName == key:                    return node        if createit:            self.dom_create_node(parent, key)            return self.policy_dom_get(parent, key)    def dom_create_node(self, parent, newname, value=" "):        xml = "<a><"+newname+">"+ value +"</"+newname+"></a>"        frag = minidom.parseString(xml)        frag.childNodes[0].nodeType = Node.DOCUMENT_FRAGMENT_NODE        parent.appendChild(frag.childNodes[0])        return frag.childNodes[0]    def dom_get_node(self, path, createit=False):        node = None        parts = path.split("/")        doc = self.get_dom()        if len(parts) > 0:            node = self.policy_dom_get(doc.documentElement, parts[0])            if node:                i = 1                while i < len(parts):                    _node = self.policy_dom_get(node, parts[i], createit)                    if not _node:                        if not createit:                            break                        else:                            self.dom_create_node(node, parts[i])                            _node = self.policy_dom_get(node, parts[i])                    node = _node                    i += 1        return node    #    # Header-related functions    #    def policy_dom_get_header_subnode(self, nodename):        node = self.dom_get_node("PolicyHeader/%s" % nodename)        return node    def policy_dom_get_hdr_item(self, name, default=""):        node = self.policy_dom_get_header_subnode(name)        if node and len(node.childNodes) > 0:            return node.childNodes[0].nodeValue        return default    def policy_dom_get_frompol_item(self, name, default="", createit=False):        node = self.dom_get_node("PolicyHeader/FromPolicy",createit)        if node:            node = self.policy_dom_get(node, name, createit)            if node and len(node.childNodes) > 0:                return node.childNodes[0].nodeValue        return default    def get_header_fields_map(self):        header = {          'policyname'   : self.policy_dom_get_hdr_item("PolicyName"),          'policyurl'    : self.policy_dom_get_hdr_item("PolicyUrl"),          'reference'    : self.policy_dom_get_hdr_item("Reference"),          'date'         : self.policy_dom_get_hdr_item("Date"),          'namespaceurl' : self.policy_dom_get_hdr_item("NameSpaceUrl"),          'version'      : self.policy_dom_get_hdr_item("Version")        }        return header    def set_frompolicy_name(self, name):        """ For tools to adapt the header of the policy """        node = self.dom_get_node("PolicyHeader/FromPolicy/PolicyName",                                 createit=True)        node.childNodes[0].nodeValue = name    def set_frompolicy_version(self, version):        """ For tools to adapt the header of the policy """        node = self.dom_get_node("PolicyHeader/FromPolicy/Version",                                 createit=True)        node.childNodes[0].nodeValue = version    def set_policy_name(self, name):        """ For tools to adapt the header of the policy """        node = self.dom_get_node("PolicyHeader/PolicyName")        node.childNodes[0].nodeValue = name    def set_policy_version(self, version):        """ For tools to adapt the header of the policy """        node = self.dom_get_node("PolicyHeader/Version")        node.childNodes[0].nodeValue = version    def update_frompolicy(self, curpol):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -