📄 acmpolicy.py
字号:
""" rc = -xsconstants.XSERR_XML_PROCESSING errors = "" acmpol_old = self try: acmpol_new = ACMPolicy(xml=xml_new) except Exception: return -xsconstants.XSERR_XML_PROCESSING, errors vmlabel_map = acmpol_new.policy_get_vmlabel_translation_map() # An update requires version information in the current # and new policy. The version number of the current policy # must be the same as what is in the FromPolicy/Version node # in the new one and the current policy's name must be the # same as in FromPolicy/PolicyName # The default policy when it is set skips this step. if not acmpol_new.is_default_policy() and \ not acmpol_old.is_default_policy(): irc = self.__do_update_version_check(acmpol_new) if irc != xsconstants.XSERR_SUCCESS: return irc, errors if self.isloaded(): newvmnames = \ acmpol_new.policy_get_virtualmachinelabel_names_sorted() oldvmnames = \ acmpol_old.policy_get_virtualmachinelabel_names_sorted() del_array = "" chg_array = "" for o in oldvmnames: if o not in newvmnames: old_idx = oldvmnames.index(o) if vmlabel_map.has_key(o): #not a deletion, but a renaming new = vmlabel_map[o] new_idx = newvmnames.index(new) chg_array += struct.pack("ii", old_idx, new_idx) else: del_array += struct.pack("i", old_idx) for v in newvmnames: if v in oldvmnames: old_idx = oldvmnames.index(v) new_idx = newvmnames.index(v) if old_idx != new_idx: chg_array += struct.pack("ii", old_idx, new_idx) # VM labels indicated in the 'from' attribute of a VM or # resource node but that did not exist in the old policy # are considered bad labels. bad_renamings = set(vmlabel_map.keys()) - set(oldvmnames) if len(bad_renamings) > 0: log.error("Bad VM label renamings: %s" % list(bad_renamings)) return -xsconstants.XSERR_BAD_LABEL, errors reslabel_map = acmpol_new.policy_get_reslabel_translation_map() oldresnames = acmpol_old.policy_get_resourcelabel_names() bad_renamings = set(reslabel_map.keys()) - set(oldresnames) if len(bad_renamings) > 0: log.error("Bad resource label renamings: %s" % list(bad_renamings)) return -xsconstants.XSERR_BAD_LABEL, errors #Get binary and map from the new policy rc, pol_map, bin_pol = acmpol_new.policy_create_map_and_bin() if rc != xsconstants.XSERR_SUCCESS: log.error("Could not build the map and binary policy.") return rc, errors #Need to do / check the following: # - relabel all resources where there is a 'from' field in # the policy and mark those as unlabeled where the label # does not appear in the new policy anymore # - relabel all VMs where there is a 'from' field in the # policy and mark those as unlabeled where the label # does not appear in the new policy anymore; no running # or paused VM may be unlabeled through this # - check that under the new labeling conditions the VMs # still have access to their resources as before. Unlabeled # resources are inaccessible. If this check fails, the # update failed. # - Attempt changes in the hypervisor; if this step fails, # roll back the relabeling of resources and VMs # - Commit the relabeling of resources rc, errors = security.change_acm_policy(bin_pol, del_array, chg_array, vmlabel_map, reslabel_map, self, acmpol_new, acmpol_new.is_default_policy()) if rc == 0: # Replace the old DOM with the new one and save it self.dom = acmpol_new.dom self.compile() log.info("ACM policy update was successful") else: #Not loaded in HV self.dom = acmpol_new.dom rc = self.compile() return rc, errors def force_default_policy(klass, policy_ref): """ Force the installation of the DEFAULT policy if for example no XML of the current policy is available and the update path with comparisons of old and new policy cannot be taken. This only succeeds if only Domain-0 is running or all guest have the same ssidref as Domain-0. """ errors = "" acmpol_new = ACMPolicy(xml = get_DEFAULT_policy(), ref=policy_ref) from xen.lowlevel import acm dom0_ssidref = acm.getssid(0) del_array = "" chg_array = struct.pack("ii", dom0_ssidref['ssidref'] & 0xffff, 0x1) rc, pol_map, bin_pol = acmpol_new.policy_create_map_and_bin() if rc != xsconstants.XSERR_SUCCESS: return rc, errors, acmpol_new rc, errors = security.hv_chg_policy(bin_pol, del_array, chg_array) return rc, errors, acmpol_new force_default_policy = classmethod(force_default_policy) def get_reset_policy_xml(klass): dom0_label = security.get_ssid(0)[1] return get_DEFAULT_policy(dom0_label) get_reset_policy_xml = classmethod(get_reset_policy_xml) def __do_update_version_check(self, acmpol_new): acmpol_old = self now_vers = acmpol_old.policy_dom_get_hdr_item("Version") now_name = acmpol_old.policy_dom_get_hdr_item("PolicyName") req_oldvers = acmpol_new.policy_dom_get_frompol_item("Version") req_oldname = acmpol_new.policy_dom_get_frompol_item("PolicyName") if now_vers == "" or \ now_vers != req_oldvers or \ now_name != req_oldname: log.info("Policy rejected: %s != %s or %s != %s" % \ (now_vers,req_oldvers,now_name,req_oldname)) return -xsconstants.XSERR_VERSION_PREVENTS_UPDATE if not self.isVersionUpdate(acmpol_new): log.info("Policy rejected since new version is not an update.") return -xsconstants.XSERR_VERSION_PREVENTS_UPDATE return xsconstants.XSERR_SUCCESS def compareVersions(self, v1, v2): """ Compare two policy versions given their tuples of major and minor. Return '0' if versions are equal, '>0' if v1 > v2 and '<' if v1 < v2 """ rc = v1[0] - v2[0] if rc == 0: rc = v1[1] - v2[1] return rc def getVersionTuple(self, item="Version"): v_str = self.policy_dom_get_hdr_item(item) return self.__convVersionToTuple(v_str) def get_version(self): return self.policy_dom_get_hdr_item("Version") def isVersionUpdate(self, polnew): if self.compareVersions(polnew.getVersionTuple(), self.getVersionTuple()) > 0: return True return False def __convVersionToTuple(self, v_str): """ Convert a version string, formatted according to the scheme "%d.%d" into a tuple of (major, minor). Return (0,0) if the string is empty. """ major = 0 minor = 0 if v_str != "": tmp = v_str.split(".") major = int(tmp[0]) if len(tmp) > 1: minor = int(tmp[1]) return (major, minor) def get_policies_path(self): xoptions = XendOptions.instance() basedir = xoptions.get_xend_security_path() return basedir + "/policies/" def policy_path(self, name): prefix = self.get_policies_path() path = prefix + name.replace('.','/') _path = path.split("/") del _path[-1] mkdir.parents("/".join(_path), stat.S_IRWXU) return path def path_from_policy_name(self, name): return self.policy_path(name) + "-security_policy.xml" # # Functions interacting with the bootloader # def vmlabel_to_ssidref(self, vm_label): """ Convert a VMlabel into an ssidref given the current policy Return xsconstants.INVALID_SSIDREF if conversion failed. """ ssidref = xsconstants.INVALID_SSIDREF names = self.policy_get_virtualmachinelabel_names_sorted() try: vmidx = names.index(vm_label) ssidref = (vmidx << 16) | vmidx except: pass return ssidref def set_vm_bootlabel(self, vm_label, remove=False): parms="<>" if vm_label != "": ssidref = self.vmlabel_to_ssidref(vm_label) if ssidref == xsconstants.INVALID_SSIDREF: return -xsconstants.XSERR_BAD_LABEL parms = "0x%08x:%s:%s:%s" % \ (ssidref, xsconstants.ACM_POLICY_ID, \ self.get_name(),vm_label) else: ssidref = 0 #Identifier for removal if remove == True: parms = "<>" try: def_title = bootloader.get_default_title() bootloader.set_kernel_attval(def_title, "ssidref", parms) except: return -xsconstants.XSERR_GENERAL_FAILURE return ssidref # # Utility functions related to the policy's files # def get_filename(self, postfix, prefix=None, dotted=False): """ Create the filename for the policy. The prefix is prepended to the path. If dotted is True, then a policy name like 'a.b.c' will remain as is, otherwise it will become 'a/b/c' """ if prefix == None: prefix = self.get_policies_path() name = self.get_name() if name: p = name.split(".") path = "" if dotted: sep = "." else: sep = "/" if len(p) > 1: path = sep.join(p[0:len(p)-1]) if prefix != "" or path != "": allpath = prefix + path + sep + p[-1] + postfix else: allpath = p[-1] + postfix return allpath return None def __readfile(self, name): cont = "" filename = self.get_filename(name) f = open(filename, "r") if f: cont = f.read() f.close() return cont def get_map(self): return self.__readfile(".map") def get_bin(self): return self.__readfile(".bin") def copy_policy_file(self, suffix, destdir): spolfile = self.get_filename(suffix) dpolfile = destdir + "/" + self.get_filename(suffix,"",dotted=True) try: shutil.copyfile(spolfile, dpolfile) except Exception, e: log.error("Could not copy policy file %s to %s: %s" % (spolfile, dpolfile, str(e))) return -xsconstants.XSERR_FILE_ERROR return xsconstants.XSERR_SUCCESS # # DOM-related functions # def policy_dom_get(self, parent, key, createit=False): for node in parent.childNodes: if node.nodeType == Node.ELEMENT_NODE: if node.nodeName == key: return node if createit: self.dom_create_node(parent, key) return self.policy_dom_get(parent, key) def dom_create_node(self, parent, newname, value=" "): xml = "<a><"+newname+">"+ value +"</"+newname+"></a>" frag = minidom.parseString(xml) frag.childNodes[0].nodeType = Node.DOCUMENT_FRAGMENT_NODE parent.appendChild(frag.childNodes[0]) return frag.childNodes[0] def dom_get_node(self, path, createit=False): node = None parts = path.split("/") doc = self.get_dom() if len(parts) > 0: node = self.policy_dom_get(doc.documentElement, parts[0]) if node: i = 1 while i < len(parts): _node = self.policy_dom_get(node, parts[i], createit) if not _node: if not createit: break else: self.dom_create_node(node, parts[i]) _node = self.policy_dom_get(node, parts[i]) node = _node i += 1 return node # # Header-related functions # def policy_dom_get_header_subnode(self, nodename): node = self.dom_get_node("PolicyHeader/%s" % nodename) return node def policy_dom_get_hdr_item(self, name, default=""): node = self.policy_dom_get_header_subnode(name) if node and len(node.childNodes) > 0: return node.childNodes[0].nodeValue return default def policy_dom_get_frompol_item(self, name, default="", createit=False): node = self.dom_get_node("PolicyHeader/FromPolicy",createit) if node: node = self.policy_dom_get(node, name, createit) if node and len(node.childNodes) > 0: return node.childNodes[0].nodeValue return default def get_header_fields_map(self): header = { 'policyname' : self.policy_dom_get_hdr_item("PolicyName"), 'policyurl' : self.policy_dom_get_hdr_item("PolicyUrl"), 'reference' : self.policy_dom_get_hdr_item("Reference"), 'date' : self.policy_dom_get_hdr_item("Date"), 'namespaceurl' : self.policy_dom_get_hdr_item("NameSpaceUrl"), 'version' : self.policy_dom_get_hdr_item("Version") } return header def set_frompolicy_name(self, name): """ For tools to adapt the header of the policy """ node = self.dom_get_node("PolicyHeader/FromPolicy/PolicyName", createit=True) node.childNodes[0].nodeValue = name def set_frompolicy_version(self, version): """ For tools to adapt the header of the policy """ node = self.dom_get_node("PolicyHeader/FromPolicy/Version", createit=True) node.childNodes[0].nodeValue = version def set_policy_name(self, name): """ For tools to adapt the header of the policy """ node = self.dom_get_node("PolicyHeader/PolicyName") node.childNodes[0].nodeValue = name def set_policy_version(self, version): """ For tools to adapt the header of the policy """ node = self.dom_get_node("PolicyHeader/Version") node.childNodes[0].nodeValue = version def update_frompolicy(self, curpol):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -