⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 acmpolicy.py

📁 xen虚拟机源代码安装包
💻 PY
📖 第 1 页 / 共 4 页
字号:
        self.set_frompolicy_name(curpol.policy_dom_get_hdr_item("PolicyName"))        version = curpol.policy_dom_get_hdr_item("Version")        self.set_frompolicy_version(version)        (maj, minor) = self.__convVersionToTuple(version)        self.set_policy_version("%s.%s" % (maj, minor+1))    #    # Get all types that are part of a node    #    def policy_get_types(self, node):        strings = []        i = 0        while i < len(node.childNodes):            if node.childNodes[i].nodeName == "Type" and \               len(node.childNodes[i].childNodes) > 0:                strings.append(node.childNodes[i].childNodes[0].nodeValue)            i += 1        return strings    #    # Simple Type Enforcement-related functions    #    def policy_get_stetypes_node(self):        node = self.dom_get_node("SimpleTypeEnforcement/SimpleTypeEnforcementTypes")        return node    def policy_get_stetypes_types(self):        strings = []        node = self.policy_get_stetypes_node()        if node:            strings = self.policy_get_types(node)        return strings    #    # Chinese Wall Type-related functions    #    def policy_get_chwall_types(self):        strings = []        node = self.dom_get_node("ChineseWall/ChineseWallTypes")        if node:            strings = self.policy_get_types(node)        return strings    def policy_get_chwall_cfses(self):        cfs = []        node = self.dom_get_node("ChineseWall/ConflictSets")        if node:            i = 0            while i < len(node.childNodes):                _cfs = {}                if node.childNodes[i].nodeName == "Conflict":                    _cfs['name']  = node.childNodes[i].getAttribute('name')                    _cfs['chws'] = self.policy_get_types(node.childNodes[i])                    cfs.append(_cfs)                i += 1        return cfs    def policy_get_chwall_cfses_names_sorted(self):        """           Return the list of all conflict set names in alphabetical           order.        """        cfs_names = []        node = self.dom_get_node("ChineseWall/ConflictSets")        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == "Conflict":                    n  = node.childNodes[i].getAttribute('name')                    #it better have a name!                    if n:                        cfs_names.append(n)                i += 1        cfs_names.sort()        return cfs_names    #    # Subject Label-related functions    #    def policy_get_bootstrap_vmlabel(self):        node = self.dom_get_node("SecurityLabelTemplate/SubjectLabels")        if node:            vmlabel = node.getAttribute("bootstrap")        return vmlabel    # Get the names of all virtual machine labels; returns an array    def policy_get_virtualmachinelabel_names(self):        strings = []        node = self.dom_get_node("SecurityLabelTemplate/SubjectLabels")        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == "VirtualMachineLabel":                    name = self.policy_dom_get(node.childNodes[i], "Name")                    if len(name.childNodes) > 0:                        strings.append(name.childNodes[0].nodeValue)                i += 1        return strings    def policy_sort_virtualmachinelabel_names(self, vmnames):        bootstrap = self.policy_get_bootstrap_vmlabel()        if bootstrap not in vmnames:            raise SecurityError(-xsconstants.XSERR_POLICY_INCONSISTENT)        vmnames.remove(bootstrap)        vmnames.sort()        vmnames.insert(0, bootstrap)        if ACM_LABEL_UNLABELED in vmnames:            vmnames.remove(ACM_LABEL_UNLABELED)            vmnames.insert(0, ACM_LABEL_UNLABELED)        return vmnames    def policy_get_virtualmachinelabel_names_sorted(self):        """ Get a sorted list of VMlabel names. The bootstrap VM's            label will be the first one in that list, followed            by an alphabetically sorted list of VM label names """        vmnames = self.policy_get_virtualmachinelabel_names()        res = self.policy_sort_virtualmachinelabel_names(vmnames)        if res[0] != ACM_LABEL_UNLABELED:            res.insert(0, ACM_LABEL_UNLABELED)        return res    def policy_get_virtualmachinelabels(self):        """ Get a list of all virtual machine labels in this policy """        res = []        node = self.dom_get_node("SecurityLabelTemplate/SubjectLabels")        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == "VirtualMachineLabel":                    name = self.policy_dom_get(node.childNodes[i], "Name")                    if len(name.childNodes) > 0:                        _res = {}                        _res['type'] = xsconstants.ACM_LABEL_VM                        _res['name'] = name.childNodes[0].nodeValue                        stes = self.policy_dom_get(node.childNodes[i],                                                 "SimpleTypeEnforcementTypes")                        if stes:                           _res['stes'] = self.policy_get_types(stes)                        else:                            _res['stes'] = []                        chws = self.policy_dom_get(node.childNodes[i],                                                   "ChineseWallTypes")                        if chws:                            _res['chws'] = self.policy_get_types(chws)                        else:                            _res['chws'] = []                        res.append(_res)                i += 1        return res    def policy_get_stes_of_vmlabel(self, vmlabel):        """ Get a list of all STEs of a given VMlabel """        return self.__policy_get_stes_of_labeltype(vmlabel,                                        "/SubjectLabels", "VirtualMachineLabel")    def policy_get_stes_of_resource(self, reslabel):        """ Get a list of all resources of a given VMlabel """        return self.__policy_get_stes_of_labeltype(reslabel,                                        "/ObjectLabels", "ResourceLabel")    def __policy_get_stes_of_labeltype(self, label, path, labeltype):        node = self.dom_get_node("SecurityLabelTemplate" + path)        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == labeltype:                    name = self.policy_dom_get(node.childNodes[i], "Name")                    if len(name.childNodes) > 0 and \                       name.childNodes[0].nodeValue == label:                        stes = self.policy_dom_get(node.childNodes[i],                                            "SimpleTypeEnforcementTypes")                        if not stes:                            return []                        return self.policy_get_types(stes)                i += 1        return []    def policy_check_vmlabel_against_reslabels(self, vmlabel, resources):        """           Check whether the given vmlabel is compatible with the given           resource labels. Do this by getting all the STEs of the           vmlabel and the STEs of the resources. Any STE type of the           VM label must match an STE type of the resource.        """        vm_stes = self.policy_get_stes_of_vmlabel(vmlabel)        if len(vm_stes) == 0:            return False        for res in resources:            res_stes = self.policy_get_stes_of_resource(res)            if len(res_stes) == 0 or \               len( set(res_stes).intersection( set(vm_stes) ) ) == 0:                return False        return True    def __policy_get_label_translation_map(self, path, labeltype):        res = {}        node = self.dom_get_node("SecurityLabelTemplate/" + path)        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == labeltype:                    name = self.policy_dom_get(node.childNodes[i], "Name")                    from_name = name.getAttribute("from")                    if from_name and len(name.childNodes) > 0:                        res.update({from_name : name.childNodes[0].nodeValue})                i += 1        return res    def policy_get_vmlabel_translation_map(self):        """            Get a dictionary of virtual machine mappings from their            old VMlabel name to the new VMlabel name.        """        return self.__policy_get_label_translation_map("SubjectLabels",                                                       "VirtualMachineLabel")    def policy_get_reslabel_translation_map(self):        """            Get a dictionary of resource mappings from their            old resource label name to the new resource label name.        """        return self.__policy_get_label_translation_map("ObjectLabels",                                                       "ResourceLabel")    #    # Object Label-related functions    #    def policy_get_resourcelabel_names(self):        """            Get the names of all resource labels in an array but            only those that actually have types        """        strings = []        node = self.dom_get_node("SecurityLabelTemplate/ObjectLabels")        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == "ResourceLabel":                    name = self.policy_dom_get(node.childNodes[i], "Name")                    stes = self.policy_dom_get(node.childNodes[i],                                          "SimpleTypeEnforcementTypes")                    if stes and len(name.childNodes) > 0:                        strings.append(name.childNodes[0].nodeValue)                i += 1        return strings    def policy_get_resourcelabels(self):        """           Get all information about all resource labels of this policy.        """        res = []        node = self.dom_get_node("SecurityLabelTemplate/ObjectLabels")        if node:            i = 0            while i < len(node.childNodes):                if node.childNodes[i].nodeName == "ResourceLabel":                    name = self.policy_dom_get(node.childNodes[i], "Name")                    if len(name.childNodes) > 0:                        _res = {}                        _res['type'] = xsconstants.ACM_LABEL_RES                        _res['name'] = name.childNodes[0].nodeValue                        stes = self.policy_dom_get(node.childNodes[i],                                                   "SimpleTypeEnforcementTypes")                        if stes:                            _res['stes'] = self.policy_get_types(stes)                        else:                            _res['stes'] = []                        _res['chws'] = []                        res.append(_res)                i += 1        return res    def policy_find_reslabels_with_stetype(self, stetype):        """           Find those resource labels that hold a given STE type.        """        res = []        reslabels = self.policy_get_resourcelabels()        for resl in reslabels:            if stetype in resl['stes']:                res.append(resl['name'])        return res    def toxml(self):        dom = self.get_dom()        if dom:            return dom.toxml()        return None    def hash(self):        """ Calculate a SAH1 hash of the XML policy """        return sha.sha(self.toxml())    def save(self):        ### Save the XML policy into a file ###        rc = -xsconstants.XSERR_FILE_ERROR        name = self.get_name()        if name:            path = self.path_from_policy_name(name)            if path:                f = open(path, 'w')                if f:                    try:                        try:                            f.write(self.toxml())                            rc = 0                        except:                            pass                    finally:                        f.close()        return rc    def __write_to_file(self, suffix, data):        #write the data into a file with the given suffix        f = open(self.get_filename(suffix),"w")        if f:            try:                try:                    f.write(data)                except Exception, e:                    log.error("Error writing file: %s" % str(e))                    return -xsconstants.XSERR_FILE_ERROR            finally:                f.close()        else:            return -xsconstants.XSERR_FILE_ERROR        return xsconstants.XSERR_SUCCESS    def compile(self):        rc = self.save()        if rc == 0:            rc, mapfile, bin_pol = self.policy_create_map_and_bin()            if rc == 0:                try:                    security.mapfile_lock()                    rc = self.__write_to_file(".map", mapfile)                    if rc != 0:                        log.error("Error writing map file")                finally:                    security.mapfile_unlock()            if rc == 0:                rc = self.__write_to_file(".bin", bin_pol)                if rc != 0:                    log.error("Error writing binary policy file")        return rc    def loadintohv(self):        """            load this policy into the hypervisor            if successful,the policy's flags will indicate that the            policy is the one loaded into the hypervisor        """        if not self.isloaded():            (ret, output) = commands.getstatusoutput(                                   security.xensec_tool +                                   " loadpolicy " +                                   self.get_filename(".bin"))            if ret != 0:                return -xsconstants.XSERR_POLICY_LOAD_FAILED        return xsconstants.XSERR_SUCCESS    def isloaded(self):        """            Determine whether this policy is the active one.        """        if self.get_name() == security.get_active_policy_name():            return True        return False    def destroy(self):        """            Destroy the policy including its binary, mapping and            XML files.            This only works if the policy is not the one that's loaded        """        if self.isloaded():            return -xsconstants.XSERR_POLICY_LOADED        files = [ self.get_filename(".map",""),                  self.get_filename(".bin","") ]        for f in files:            try:                os.unlink(f)            except:                pass        if self.xendacmpolicy:            self.xendacmpolicy.destroy()        XSPolicy.destroy(self)        return xsconstants.XSERR_SUCCESS    def policy_get_domain_label(self, domid):        """           Given a domain's ID, retrieve the label it has using           its ssidref for reverse calculation.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -