📄 acmpolicy.py
字号:
""" try: mgmt_dom = security.get_ssid(domid) except: return "" return self.policy_get_domain_label_by_ssidref(int(mgmt_dom[3])) def policy_get_domain_label_by_ssidref(self, ssidref): """ Given an ssidref, find the corresponding VM label """ chwall_ref = ssidref & 0xffff try: allvmtypes = self.policy_get_virtualmachinelabel_names_sorted() except: return None return allvmtypes[chwall_ref] def policy_get_domain_label_formatted(self, domid): label = self.policy_get_domain_label(domid) if label == "": label = ACM_LABEL_UNLABELED return "%s:%s:%s" % (xsconstants.ACM_POLICY_ID, self.get_name(), label) def policy_get_domain_label_by_ssidref_formatted(self, ssidref): label = self.policy_get_domain_label_by_ssidref(ssidref) if label == "": return "" return "%s:%s:%s" % (xsconstants.ACM_POLICY_ID, self.get_name(), label) def policy_create_map_and_bin(self): """ Create the policy's map and binary files -- compile the policy. """ def roundup8(len): return ((len + 7) & ~7) rc = xsconstants.XSERR_SUCCESS mapfile = "" primpolcode = ACM_POLICY_UNDEFINED secpolcode = ACM_POLICY_UNDEFINED unknown_ste = set() unknown_chw = set() unlabeled_ste = "__NULL_LABEL__" unlabeled_chw = "__NULL_LABEL__" rc = self.validate() if rc: return rc, "", "" stes = self.policy_get_stetypes_types() if stes: stes.sort() chws = self.policy_get_chwall_types() if chws: chws.sort() vms = self.policy_get_virtualmachinelabels() bootstrap = self.policy_get_bootstrap_vmlabel() vmlabels = self.policy_get_virtualmachinelabel_names_sorted() if bootstrap not in vmlabels: log.error("Bootstrap label '%s' not found among VM labels '%s'." \ % (bootstrap, vmlabels)) return -xsconstants.XSERR_POLICY_INCONSISTENT, "", "" vms_with_chws = [] chws_by_vm = { ACM_LABEL_UNLABELED : [] } for v in vms: if v.has_key("chws"): vms_with_chws.append(v["name"]) chws_by_vm[v["name"]] = v["chws"] if bootstrap in vms_with_chws: vms_with_chws.remove(bootstrap) vms_with_chws.sort() vms_with_chws.insert(0, bootstrap) else: vms_with_chws.sort() if ACM_LABEL_UNLABELED in vms_with_chws: unlabeled_chw = ACM_LABEL_UNLABELED vms_with_chws.remove(ACM_LABEL_UNLABELED) ; # @1 vms_with_stes = [] stes_by_vm = { ACM_LABEL_UNLABELED : [] } for v in vms: if v.has_key("stes"): vms_with_stes.append(v["name"]) stes_by_vm[v["name"]] = v["stes"] if bootstrap in vms_with_stes: vms_with_stes.remove(bootstrap) vms_with_stes.sort() vms_with_stes.insert(0, bootstrap) else: vms_with_stes.sort() if ACM_LABEL_UNLABELED in vms_with_stes: unlabeled_ste = ACM_LABEL_UNLABELED vms_with_stes.remove(ACM_LABEL_UNLABELED) ; # @2 resnames = self.policy_get_resourcelabel_names() resnames.sort() stes_by_res = {} res = self.policy_get_resourcelabels() for r in res: if r.has_key("stes"): stes_by_res[r["name"]] = r["stes"] if ACM_LABEL_UNLABELED in resnames: resnames.remove(ACM_LABEL_UNLABELED) # check for duplicate labels if len(vmlabels) != len(set(vmlabels)) or \ len(resnames) != len(set(resnames)) or \ len(stes) != len(set(stes)) or \ len(chws) != len(set(chws)): return -xsconstants.XSERR_POLICY_HAS_DUPLICATES, "", "" max_chw_ssids = 1 + len(vms_with_chws) max_chw_types = 1 + len(vms_with_chws) max_ste_ssids = 1 + len(vms_with_stes) + len(resnames) max_ste_types = 1 + len(vms_with_stes) + len(resnames) mapfile = "POLICYREFERENCENAME %s\n" % self.get_name() mapfile += "MAGIC %08x\n" % ACM_MAGIC mapfile += "POLICFILE %s\n" % \ self.path_from_policy_name(self.get_name()) mapfile += "BINARYFILE %s\n" % self.get_filename(".bin") mapfile += "MAX-CHWALL-TYPES %08x\n" % len(chws) mapfile += "MAX-CHWALL-SSIDS %08x\n" % max_chw_ssids mapfile += "MAX-CHWALL-LABELS %08x\n" % max_chw_ssids mapfile += "MAX-STE-TYPES %08x\n" % len(stes) mapfile += "MAX-STE-SSIDS %08x\n" % max_ste_ssids mapfile += "MAX-STE-LABELS %08x\n" % max_ste_ssids mapfile += "\n" if chws: mapfile += \ "PRIMARY CHWALL\n" primpolcode = ACM_CHINESE_WALL_POLICY if stes: mapfile += \ "SECONDARY STE\n" else: mapfile += \ "SECONDARY NULL\n" secpolcode = ACM_SIMPLE_TYPE_ENFORCEMENT_POLICY else: if stes: mapfile += \ "PRIMARY STE\n" primpolcode = ACM_SIMPLE_TYPE_ENFORCEMENT_POLICY mapfile += \ "SECONDARY NULL\n" mapfile += "\n" if len(vms_with_chws) > 0: mapfile += \ "LABEL->SSID ANY CHWALL %-20s %x\n" % \ (unlabeled_chw, 0) i = 0 for v in vms_with_chws: mapfile += \ "LABEL->SSID VM CHWALL %-20s %x\n" % \ (v, i+1) i += 1 mapfile += "\n" if len(vms_with_stes) > 0 or len(resnames) > 0: mapfile += \ "LABEL->SSID ANY STE %-20s %08x\n" % \ (unlabeled_ste, 0) i = 0 for v in vms_with_stes: mapfile += \ "LABEL->SSID VM STE %-20s %x\n" % (v, i+1) i += 1 j = 0 for r in resnames: mapfile += \ "LABEL->SSID RES STE %-20s %x\n" % (r, j+i+1) j += 1 mapfile += "\n" if vms_with_chws: mapfile += \ "SSID->TYPE CHWALL %08x\n" % 0 i = 1 for v in vms_with_chws: mapfile += \ "SSID->TYPE CHWALL %08x" % i for c in chws_by_vm[v]: mapfile += " %s" % c mapfile += "\n" i += 1 mapfile += "\n" if len(vms_with_stes) > 0 or len(resnames) > 0: mapfile += \ "SSID->TYPE STE %08x\n" % 0 i = 1 for v in vms_with_stes: mapfile += \ "SSID->TYPE STE %08x" % i for s in stes_by_vm[v]: mapfile += " %s" % s mapfile += "\n" i += 1 for r in resnames: mapfile += \ "SSID->TYPE STE %08x" % i for s in stes_by_res[r]: mapfile += " %s" % s mapfile += "\n" i += 1 mapfile += "\n" if chws: i = 0 while i < len(chws): mapfile += \ "TYPE CHWALL %-20s %d\n" % (chws[i], i) i += 1 mapfile += "\n" if stes: i = 0 while i < len(stes): mapfile += \ "TYPE STE %-20s %d\n" % (stes[i], i) i += 1 mapfile += "\n" mapfile += "\n" # Build header with policy name length = roundup8(4 + len(self.get_name()) + 1) polname = self.get_name(); pr_bin = struct.pack("!i", len(polname)+1) pr_bin += polname; while len(pr_bin) < length: pr_bin += "\x00" # Build chinese wall part vms_with_chws.insert(0, ACM_LABEL_UNLABELED) cfses_names = self.policy_get_chwall_cfses_names_sorted() cfses = self.policy_get_chwall_cfses() chwformat = "!iiiiiiiii" max_chw_cfs = len(cfses) chw_ssid_offset = struct.calcsize(chwformat) chw_confset_offset = chw_ssid_offset + \ 2 * len(chws) * max_chw_types chw_running_types_offset = 0 chw_conf_agg_offset = 0 chw_bin = struct.pack(chwformat, ACM_CHWALL_VERSION, ACM_CHINESE_WALL_POLICY, len(chws), max_chw_ssids, max_chw_cfs, chw_ssid_offset, chw_confset_offset, chw_running_types_offset, chw_conf_agg_offset) chw_bin_body = "" # VMs that are listed and their chinese walls for v in vms_with_chws: for c in chws: unknown_chw |= (set(chws_by_vm[v]) - set(chws)) if c in chws_by_vm[v]: chw_bin_body += struct.pack("!h",1) else: chw_bin_body += struct.pack("!h",0) # Conflict sets -- they need to be processed in alphabetical order for cn in cfses_names: if cn == "" or cn is None: return -xsconstants.XSERR_BAD_CONFLICTSET, "", "" i = 0 while i < len(cfses): if cfses[i]['name'] == cn: conf = cfses[i]['chws'] break i += 1 for c in chws: if c in conf: chw_bin_body += struct.pack("!h",1) else: chw_bin_body += struct.pack("!h",0) del cfses[i] if len(cfses) != 0: return -xsconstants.XSERR_BAD_CONFLICTSET, "", "" chw_bin += chw_bin_body while len(chw_bin) < roundup8(len(chw_bin)): chw_bin += "\x00" # Build STE part vms_with_stes.insert(0, ACM_LABEL_UNLABELED) # Took out in @2 steformat="!iiiii" ste_bin = struct.pack(steformat, ACM_STE_VERSION, ACM_SIMPLE_TYPE_ENFORCEMENT_POLICY, len(stes), max_ste_types, struct.calcsize(steformat)) ste_bin_body = "" if stes: # VMs that are listed and their STE types for v in vms_with_stes: unknown_ste |= (set(stes_by_vm[v]) - set(stes)) for s in stes: if s in stes_by_vm[v]: ste_bin_body += struct.pack("!h",1) else: ste_bin_body += struct.pack("!h",0) for r in resnames: unknown_ste |= (set(stes_by_res[r]) - set(stes)) for s in stes: if s in stes_by_res[r]: ste_bin_body += struct.pack("!h",1) else: ste_bin_body += struct.pack("!h",0) ste_bin += ste_bin_body; while len(ste_bin) < roundup8(len(ste_bin)): ste_bin += "\x00" #Write binary header: headerformat="!iiiiiiiiii20s" totallen_bin = struct.calcsize(headerformat) + \ len(pr_bin) + len(chw_bin) + len(ste_bin) polref_offset = struct.calcsize(headerformat) primpoloffset = polref_offset + len(pr_bin) if primpolcode == ACM_CHINESE_WALL_POLICY: secpoloffset = primpoloffset + len(chw_bin) elif primpolcode == ACM_SIMPLE_TYPE_ENFORCEMENT_POLICY: secpoloffset = primpoloffset + len(ste_bin) else: secpoloffset = primpoloffset (major, minor) = self.getVersionTuple() hdr_bin = struct.pack(headerformat, ACM_MAGIC, ACM_POLICY_VERSION, totallen_bin, polref_offset, primpolcode, primpoloffset, secpolcode, secpoloffset, major, minor, self.hash().digest()) all_bin = array.array('B') for s in [ hdr_bin, pr_bin, chw_bin, ste_bin ]: for c in s: all_bin.append(ord(c)) log.info("Compiled policy: rc = %s" % hex(rc)) if len(unknown_ste) > 0: log.info("The following STEs in VM/res labels were unknown:" \ " %s" % list(unknown_ste)) rc = -xsconstants.XSERR_BAD_LABEL if len(unknown_chw) > 0: log.info("The following Ch. Wall types in labels were unknown:" \ " %s" % list(unknown_chw)) rc = -xsconstants.XSERR_BAD_LABEL return rc, mapfile, all_bin.tostring() def validate_enforced_policy_hash(self): """ verify that the policy hash embedded in the binary policy that is currently enforce matches the one of the XML policy. """ if self.hash().digest() != self.get_enforced_policy_hash(): raise Exception('Policy hashes do not match') def get_enforced_policy_hash(self): binpol = self.get_enforced_binary() headerformat="!iiiiiiiiii20s" res = struct.unpack(headerformat, binpol[:60]) if len(res) >= 11: return res[10] return None def get_enforced_binary(self): rc, binpol = security.hv_get_policy() if rc != 0: raise SecurityError(-xsconstants.XSERR_HV_OP_FAILED) return binpol get_enforced_binary = classmethod(get_enforced_binary)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -