📄 acm.py
字号:
config.pop(idx) break config.append(security)def get_ssid(domain): """ enables domains to retrieve the label / ssidref of a running domain """ if not on(): err("No policy active.") if isinstance(domain, str): domain_int = int(domain) elif isinstance(domain, int): domain_int = domain else: err("Illegal parameter type.") try: ssid_info = acm.getssid(int(domain_int)) except: err("Cannot determine security information.") label = ssidref2label(ssid_info["ssidref"]) return(ssid_info["policyreference"], label, ssid_info["policytype"], ssid_info["ssidref"])def get_decision(arg1, arg2): """ enables domains to retrieve access control decisions from the hypervisor Access Control Module. IN: args format = ['domid', id] or ['ssidref', ssidref] or ['access_control', ['policy', policy], ['label', label], ['type', type]] """ if not on(): err("No policy active.") #translate labels before calling low-level function if arg1[0] == 'access_control': if (arg1[1][0] != 'policy') or (arg1[2][0] != 'label') or (arg1[3][0] != 'type'): err("Argument type not supported.") ssidref = label2ssidref(arg1[2][1], arg1[1][1], arg1[3][1]) arg1 = ['ssidref', str(ssidref)] if arg2[0] == 'access_control': if (arg2[1][0] != 'policy') or (arg2[2][0] != 'label') or (arg2[3][0] != 'type'): err("Argument type not supported.") ssidref = label2ssidref(arg2[2][1], arg2[1][1], arg2[3][1]) arg2 = ['ssidref', str(ssidref)] # accept only int or string types for domid and ssidref if isinstance(arg1[1], int): arg1[1] = str(arg1[1]) if isinstance(arg2[1], int): arg2[1] = str(arg2[1]) if not isinstance(arg1[1], str) or not isinstance(arg2[1], str): err("Invalid id or ssidref type, string or int required") try: decision = acm.getdecision(arg1[0], arg1[1], arg2[0], arg2[1], ACMHOOK_sharing) except: err("Cannot determine decision.") if decision: return decision else: err("Cannot determine decision (Invalid parameter).")def has_authorization(ssidref): """ Check if the domain with the given ssidref has authorization to run on this system. To have authoriztion dom0's STE types must be a superset of that of the domain's given through its ssidref. """ rc = True dom0_ssidref = int(acm.getssid(0)['ssidref']) decision = acm.getdecision('ssidref', str(dom0_ssidref), 'ssidref', str(ssidref), ACMHOOK_authorization) if decision == "DENIED": rc = False return rcdef hv_chg_policy(bin_pol, del_array, chg_array): """ Change the binary policy in the hypervisor The 'del_array' and 'chg_array' give hints about deleted ssidrefs and changed ssidrefs which can be due to deleted VM labels or reordered VM labels """ rc = -xsconstants.XSERR_GENERAL_FAILURE errors = "" if not on(): err("No policy active.") try: rc, errors = acm.chgpolicy(bin_pol, del_array, chg_array) except Exception, e: pass if len(errors) > 0: rc = -xsconstants.XSERR_HV_OP_FAILED return rc, errorsdef hv_get_policy(): """ Gte the binary policy enforced in the hypervisor """ rc = -xsconstants.XSERR_GENERAL_FAILURE bin_pol = "" if not on(): err("No policy active.") try: rc, bin_pol = acm.getpolicy() except Exception, e: pass if len(bin_pol) == 0: bin_pol = None return rc, bin_poldef is_in_conflict(ssidref): """ Check whether the given ssidref is in conflict with any running domain. """ decision = acm.getdecision('ssidref', str(ssidref), 'ssidref', str(ssidref), ACMHOOK_conflictset) if decision == "DENIED": return True return Falsedef set_policy(xs_type, xml, flags, overwrite): """ Xend exports this function via XML-RPC """ from xen.xend import XendXSPolicyAdmin xspoladmin = XendXSPolicyAdmin.XSPolicyAdminInstance() try: acmpol, rc, errors = \ xspoladmin.add_acmpolicy_to_system(xml, int(flags), True) return rc, base64.b64encode(errors) except Exception, e: err(str(e))def reset_policy(): """ Xend exports this function via XML-RPC """ from xen.xend import XendXSPolicyAdmin xspoladmin = XendXSPolicyAdmin.XSPolicyAdminInstance() try: acmpol, rc, errors = \ xspoladmin.reset_acmpolicy() return rc, base64.b64encode(errors) except Exception, e: err(str(e))def get_policy(): """ Xend exports this function via XML-RPC """ from xen.xend import XendXSPolicyAdmin poladmin = XendXSPolicyAdmin.XSPolicyAdminInstance() try: policy = poladmin.get_loaded_policy() if policy != None: return policy.toxml(), poladmin.get_policy_flags(policy) except Exception, e: err(str(e)) return "", 0def activate_policy(flags): """ Xend exports this function via XML-RPC """ from xen.xend import XendXSPolicyAdmin poladmin = XendXSPolicyAdmin.XSPolicyAdminInstance() try: policies = poladmin.get_policies() if len(policies) > 0: flags = int(flags) irc = poladmin.activate_xspolicy(policies[0], flags) return irc except Exception, e: err("Error while activating the policy: " % str(e)) return 0def rm_bootpolicy(): """ Xend exports this function via XML-RPC """ from xen.xend import XendXSPolicyAdmin rc = XendXSPolicyAdmin.XSPolicyAdminInstance().rm_bootpolicy() if rc != xsconstants.XSERR_SUCCESS: err("Error while removing boot policy: %s" % \ str(xsconstants.xserr2string(-rc))) return rcdef get_xstype(): """ Xend exports this function via XML-RPC """ from xen.xend import XendXSPolicyAdmin return XendXSPolicyAdmin.XSPolicyAdminInstance().isXSEnabled()def get_domain_label(domain): """ Xend exports this function via XML-RPC """ from xen.xend import XendDomain dom = XendDomain.instance().domain_lookup_nr(domain) if dom: seclab = dom.get_security_label() return seclab else: err("Domain not found.")def set_domain_label(domain, seclab, old_seclab): """ Xend exports this function via XML-RPC """ from xen.xend import XendDomain dom = XendDomain.instance().domain_lookup_nr(domain) if dom: results = dom.set_security_label(seclab, old_seclab) rc, errors, old_label, new_ssidref = results return rc, new_ssidref else: err("Domain not found.")def dump_policy(): if active_policy in ['NULL', 'INACTIVE', 'INACCESSIBLE' ]: err("\'" + active_policy + "\' policy. Nothing to dump.") (ret, output) = commands.getstatusoutput(xensec_tool + " getpolicy") if ret: err("Dumping hypervisor policy failed:\n" + output) print outputdef dump_policy_file(filename, ssidref=None): ssid = "" if ssidref: ssid = " " + str(ssidref) (ret, output) = commands.getstatusoutput(xensec_tool + " dumppolicy " + filename + ssid) if ret: err("Dumping policy failed:\n" + output) print outputdef list_labels(policy_name, ltype): """ Xend exports this function via XML-RPC List the VM,resource or any kind of labels contained in the given policy. If no policy name is given, the currently active policy's label will be returned if they exist. """ if not policy_name: if active_policy in [ 'NULL', 'INACTIVE', "" ]: err("Current policy \'" + active_policy + "\' " "has no labels defined.\n") if not ltype or ltype == 'dom': condition = vm_label_re elif ltype == 'res': condition = res_label_re elif ltype == 'any': condition = all_label_re else: err("Unknown label type \'" + ltype + "\'") try: mapfile_lock() (primary, secondary, f, pol_exists) = getmapfile(policy_name) if not f: if pol_exists: err("Cannot find mapfile for policy \'" + policy_name + "\'.\n") else: err("Unknown policy \'" + policy_name + "\'") labels = [] for line in f: if condition.match(line): label = line.split()[3] if label not in labels: labels.append(label) finally: mapfile_unlock() if '__NULL_LABEL__' in labels: labels.remove('__NULL_LABEL__') return labelsdef get_res_label(resource): """Returns resource label information (policytype, label, policy) if it exists. Otherwise returns null label and policy. """ def default_res_label(): ssidref = NULL_SSIDREF if on(): label = ssidref2label(ssidref) else: label = None return (xsconstants.ACM_POLICY_ID, 'NULL', label) tmp = get_resource_label(resource) if len(tmp) == 2: policytype = xsconstants.ACM_POLICY_ID policy, label = tmp elif len(tmp) == 3: policytype, policy, label = tmp else: policytype, policy, label = default_res_label() return (policytype, label, policy)def get_res_security_details(resource): """Returns the (label, ssidref, policy) associated with a given resource from the global resource label file. """ def default_security_details(): ssidref = NULL_SSIDREF if on(): label = ssidref2label(ssidref) else: label = None policy = active_policy return (label, ssidref, policy) # find the entry associated with this resource (policytype, label, policy) = get_res_label(resource) if policy == 'NULL': log.info("Resource label for "+resource+" not in file, using DEFAULT.") return default_security_details() if policytype != xsconstants.ACM_POLICY_ID: raise VmError("Unknown policy type '%s in label for resource '%s'" % (policytype, resource)) # is this resource label for the running policy? if policy == active_policy: ssidref = label2ssidref(label, policy, 'res') elif label == xsconstants.XS_INACCESSIBLE_LABEL: ssidref = NULL_SSIDREF else: log.info("Resource label not for active policy, using DEFAULT.") return default_security_details() return (label, ssidref, policy)def security_label_to_details(seclab): """ Convert a Xen-API type of security label into details """ def default_security_details(): ssidref = NULL_SSIDREF if on(): label = ssidref2label(ssidref) else: label = None policy = active_policy return (label, ssidref, policy) (policytype, policy, label) = seclab.split(":") # is this resource label for the running policy? if policy == active_policy: ssidref = label2ssidref(label, policy, 'res') else: log.info("Resource label not for active policy, using DEFAULT.") return default_security_details() return (label, ssidref, policy)def unify_resname(resource, mustexist=True): """Makes all resource locations absolute. In case of physical resources, '/dev/' is added to local file names""" if not resource: return resource # sanity check on resource name try: (typ, resfile) = resource.split(":", 1) except: err("Resource spec '%s' contains no ':' delimiter" % resource) if typ == "tap": try: (subtype, resfile) = resfile.split(":") except: err("Resource spec '%s' contains no tap subtype" % resource) if typ in ["phy"]: if not resfile.startswith("/"): resfile = "/dev/" + resfile if mustexist: resfile = os.path.realpath(resfile) try: stats = os.lstat(resfile) if not (stat.S_ISBLK(stats[stat.ST_MODE])): err("Invalid resource") except: err("Invalid resource") if typ in [ "file", "tap" ]:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -