📄 acmpolicy.py
字号:
self.set_frompolicy_name(curpol.policy_dom_get_hdr_item("PolicyName")) version = curpol.policy_dom_get_hdr_item("Version") self.set_frompolicy_version(version) (maj, minor) = self.__convVersionToTuple(version) self.set_policy_version("%s.%s" % (maj, minor+1)) # # Get all types that are part of a node # def policy_get_types(self, node): strings = [] i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == "Type" and \ len(node.childNodes[i].childNodes) > 0: strings.append(node.childNodes[i].childNodes[0].nodeValue) i += 1 return strings # # Simple Type Enforcement-related functions # def policy_get_stetypes_node(self): node = self.dom_get_node("SimpleTypeEnforcement/SimpleTypeEnforcementTypes") return node def policy_get_stetypes_types(self): strings = [] node = self.policy_get_stetypes_node() if node: strings = self.policy_get_types(node) return strings # # Chinese Wall Type-related functions # def policy_get_chwall_types(self): strings = [] node = self.dom_get_node("ChineseWall/ChineseWallTypes") if node: strings = self.policy_get_types(node) return strings def policy_get_chwall_cfses(self): cfs = [] node = self.dom_get_node("ChineseWall/ConflictSets") if node: i = 0 while i < len(node.childNodes): _cfs = {} if node.childNodes[i].nodeName == "Conflict": _cfs['name'] = node.childNodes[i].getAttribute('name') _cfs['chws'] = self.policy_get_types(node.childNodes[i]) cfs.append(_cfs) i += 1 return cfs def policy_get_chwall_cfses_names_sorted(self): """ Return the list of all conflict set names in alphabetical order. """ cfs_names = [] node = self.dom_get_node("ChineseWall/ConflictSets") if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == "Conflict": n = node.childNodes[i].getAttribute('name') #it better have a name! if n: cfs_names.append(n) i += 1 cfs_names.sort() return cfs_names # # Subject Label-related functions # def policy_get_bootstrap_vmlabel(self): node = self.dom_get_node("SecurityLabelTemplate/SubjectLabels") if node: vmlabel = node.getAttribute("bootstrap") return vmlabel # Get the names of all virtual machine labels; returns an array def policy_get_virtualmachinelabel_names(self): strings = [] node = self.dom_get_node("SecurityLabelTemplate/SubjectLabels") if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == "VirtualMachineLabel": name = self.policy_dom_get(node.childNodes[i], "Name") if len(name.childNodes) > 0: strings.append(name.childNodes[0].nodeValue) i += 1 return strings def policy_sort_virtualmachinelabel_names(self, vmnames): bootstrap = self.policy_get_bootstrap_vmlabel() if bootstrap not in vmnames: raise SecurityError(-xsconstants.XSERR_POLICY_INCONSISTENT) vmnames.remove(bootstrap) vmnames.sort() vmnames.insert(0, bootstrap) if ACM_LABEL_UNLABELED in vmnames: vmnames.remove(ACM_LABEL_UNLABELED) vmnames.insert(0, ACM_LABEL_UNLABELED) return vmnames def policy_get_virtualmachinelabel_names_sorted(self): """ Get a sorted list of VMlabel names. The bootstrap VM's label will be the first one in that list, followed by an alphabetically sorted list of VM label names """ vmnames = self.policy_get_virtualmachinelabel_names() res = self.policy_sort_virtualmachinelabel_names(vmnames) if res[0] != ACM_LABEL_UNLABELED: res.insert(0, ACM_LABEL_UNLABELED) return res def policy_get_virtualmachinelabels(self): """ Get a list of all virtual machine labels in this policy """ res = [] node = self.dom_get_node("SecurityLabelTemplate/SubjectLabels") if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == "VirtualMachineLabel": name = self.policy_dom_get(node.childNodes[i], "Name") if len(name.childNodes) > 0: _res = {} _res['type'] = xsconstants.ACM_LABEL_VM _res['name'] = name.childNodes[0].nodeValue stes = self.policy_dom_get(node.childNodes[i], "SimpleTypeEnforcementTypes") if stes: _res['stes'] = self.policy_get_types(stes) else: _res['stes'] = [] chws = self.policy_dom_get(node.childNodes[i], "ChineseWallTypes") if chws: _res['chws'] = self.policy_get_types(chws) else: _res['chws'] = [] res.append(_res) i += 1 return res def policy_get_stes_of_vmlabel(self, vmlabel): """ Get a list of all STEs of a given VMlabel """ return self.__policy_get_stes_of_labeltype(vmlabel, "/SubjectLabels", "VirtualMachineLabel") def policy_get_stes_of_resource(self, reslabel): """ Get a list of all resources of a given VMlabel """ return self.__policy_get_stes_of_labeltype(reslabel, "/ObjectLabels", "ResourceLabel") def __policy_get_stes_of_labeltype(self, label, path, labeltype): node = self.dom_get_node("SecurityLabelTemplate" + path) if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == labeltype: name = self.policy_dom_get(node.childNodes[i], "Name") if len(name.childNodes) > 0 and \ name.childNodes[0].nodeValue == label: stes = self.policy_dom_get(node.childNodes[i], "SimpleTypeEnforcementTypes") if not stes: return [] return self.policy_get_types(stes) i += 1 return [] def policy_check_vmlabel_against_reslabels(self, vmlabel, resources): """ Check whether the given vmlabel is compatible with the given resource labels. Do this by getting all the STEs of the vmlabel and the STEs of the resources. Any STE type of the VM label must match an STE type of the resource. """ vm_stes = self.policy_get_stes_of_vmlabel(vmlabel) if len(vm_stes) == 0: return False for res in resources: res_stes = self.policy_get_stes_of_resource(res) if len(res_stes) == 0 or \ len( set(res_stes).intersection( set(vm_stes) ) ) == 0: return False return True def __policy_get_label_translation_map(self, path, labeltype): res = {} node = self.dom_get_node("SecurityLabelTemplate/" + path) if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == labeltype: name = self.policy_dom_get(node.childNodes[i], "Name") from_name = name.getAttribute("from") if from_name and len(name.childNodes) > 0: res.update({from_name : name.childNodes[0].nodeValue}) i += 1 return res def policy_get_vmlabel_translation_map(self): """ Get a dictionary of virtual machine mappings from their old VMlabel name to the new VMlabel name. """ return self.__policy_get_label_translation_map("SubjectLabels", "VirtualMachineLabel") def policy_get_reslabel_translation_map(self): """ Get a dictionary of resource mappings from their old resource label name to the new resource label name. """ return self.__policy_get_label_translation_map("ObjectLabels", "ResourceLabel") # # Object Label-related functions # def policy_get_resourcelabel_names(self): """ Get the names of all resource labels in an array but only those that actually have types """ strings = [] node = self.dom_get_node("SecurityLabelTemplate/ObjectLabels") if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == "ResourceLabel": name = self.policy_dom_get(node.childNodes[i], "Name") stes = self.policy_dom_get(node.childNodes[i], "SimpleTypeEnforcementTypes") if stes and len(name.childNodes) > 0: strings.append(name.childNodes[0].nodeValue) i += 1 return strings def policy_get_resourcelabels(self): """ Get all information about all resource labels of this policy. """ res = [] node = self.dom_get_node("SecurityLabelTemplate/ObjectLabels") if node: i = 0 while i < len(node.childNodes): if node.childNodes[i].nodeName == "ResourceLabel": name = self.policy_dom_get(node.childNodes[i], "Name") if len(name.childNodes) > 0: _res = {} _res['type'] = xsconstants.ACM_LABEL_RES _res['name'] = name.childNodes[0].nodeValue stes = self.policy_dom_get(node.childNodes[i], "SimpleTypeEnforcementTypes") if stes: _res['stes'] = self.policy_get_types(stes) else: _res['stes'] = [] _res['chws'] = [] res.append(_res) i += 1 return res def policy_find_reslabels_with_stetype(self, stetype): """ Find those resource labels that hold a given STE type. """ res = [] reslabels = self.policy_get_resourcelabels() for resl in reslabels: if stetype in resl['stes']: res.append(resl['name']) return res def toxml(self): dom = self.get_dom() if dom: return dom.toxml() return None def hash(self): """ Calculate a SAH1 hash of the XML policy """ return sha.sha(self.toxml()) def save(self): ### Save the XML policy into a file ### rc = -xsconstants.XSERR_FILE_ERROR name = self.get_name() if name: path = self.path_from_policy_name(name) if path: f = open(path, 'w') if f: try: try: f.write(self.toxml()) rc = 0 except: pass finally: f.close() return rc def __write_to_file(self, suffix, data): #write the data into a file with the given suffix f = open(self.get_filename(suffix),"w") if f: try: try: f.write(data) except Exception, e: log.error("Error writing file: %s" % str(e)) return -xsconstants.XSERR_FILE_ERROR finally: f.close() else: return -xsconstants.XSERR_FILE_ERROR return xsconstants.XSERR_SUCCESS def compile(self): rc = self.save() if rc == 0: rc, mapfile, bin_pol = self.policy_create_map_and_bin() if rc == 0: try: security.mapfile_lock() rc = self.__write_to_file(".map", mapfile) if rc != 0: log.error("Error writing map file") finally: security.mapfile_unlock() if rc == 0: rc = self.__write_to_file(".bin", bin_pol) if rc != 0: log.error("Error writing binary policy file") return rc def loadintohv(self): """ load this policy into the hypervisor if successful,the policy's flags will indicate that the policy is the one loaded into the hypervisor """ if not self.isloaded(): (ret, output) = commands.getstatusoutput( security.xensec_tool + " loadpolicy " + self.get_filename(".bin")) if ret != 0: return -xsconstants.XSERR_POLICY_LOAD_FAILED return xsconstants.XSERR_SUCCESS def isloaded(self): """ Determine whether this policy is the active one. """ if self.get_name() == security.get_active_policy_name(): return True return False def destroy(self): """ Destroy the policy including its binary, mapping and XML files. This only works if the policy is not the one that's loaded """ if self.isloaded(): return -xsconstants.XSERR_POLICY_LOADED files = [ self.get_filename(".map",""), self.get_filename(".bin","") ] for f in files: try: os.unlink(f) except: pass if self.xendacmpolicy: self.xendacmpolicy.destroy() XSPolicy.destroy(self) return xsconstants.XSERR_SUCCESS def policy_get_domain_label(self, domid): """ Given a domain's ID, retrieve the label it has using its ssidref for reverse calculation.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -