⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 acm.py

📁 xen虚拟机源代码安装包
💻 PY
📖 第 1 页 / 共 4 页
字号:
            config.pop(idx)            break        config.append(security)def get_ssid(domain):    """    enables domains to retrieve the label / ssidref of a running domain    """    if not on():        err("No policy active.")    if isinstance(domain, str):        domain_int = int(domain)    elif isinstance(domain, int):        domain_int = domain    else:        err("Illegal parameter type.")    try:        ssid_info = acm.getssid(int(domain_int))    except:        err("Cannot determine security information.")    label = ssidref2label(ssid_info["ssidref"])    return(ssid_info["policyreference"],           label,           ssid_info["policytype"],           ssid_info["ssidref"])def get_decision(arg1, arg2):    """    enables domains to retrieve access control decisions from    the hypervisor Access Control Module.    IN: args format = ['domid', id] or ['ssidref', ssidref]    or ['access_control', ['policy', policy], ['label', label], ['type', type]]    """    if not on():        err("No policy active.")    #translate labels before calling low-level function    if arg1[0] == 'access_control':        if (arg1[1][0] != 'policy') or (arg1[2][0] != 'label') or (arg1[3][0] != 'type'):            err("Argument type not supported.")        ssidref = label2ssidref(arg1[2][1], arg1[1][1], arg1[3][1])        arg1 = ['ssidref', str(ssidref)]    if arg2[0] == 'access_control':        if (arg2[1][0] != 'policy') or (arg2[2][0] != 'label') or (arg2[3][0] != 'type'):            err("Argument type not supported.")        ssidref = label2ssidref(arg2[2][1], arg2[1][1], arg2[3][1])        arg2 = ['ssidref', str(ssidref)]    # accept only int or string types for domid and ssidref    if isinstance(arg1[1], int):        arg1[1] = str(arg1[1])    if isinstance(arg2[1], int):        arg2[1] = str(arg2[1])    if not isinstance(arg1[1], str) or not isinstance(arg2[1], str):        err("Invalid id or ssidref type, string or int required")    try:        decision = acm.getdecision(arg1[0], arg1[1], arg2[0], arg2[1],                                   ACMHOOK_sharing)    except:        err("Cannot determine decision.")    if decision:        return decision    else:        err("Cannot determine decision (Invalid parameter).")def has_authorization(ssidref):    """ Check if the domain with the given ssidref has authorization to        run on this system. To have authoriztion dom0's STE types must        be a superset of that of the domain's given through its ssidref.    """    rc = True    dom0_ssidref = int(acm.getssid(0)['ssidref'])    decision = acm.getdecision('ssidref', str(dom0_ssidref),                               'ssidref', str(ssidref),                               ACMHOOK_authorization)    if decision == "DENIED":        rc = False    return rcdef hv_chg_policy(bin_pol, del_array, chg_array):    """        Change the binary policy in the hypervisor        The 'del_array' and 'chg_array' give hints about deleted ssidrefs        and changed ssidrefs which can be due to deleted VM labels        or reordered VM labels    """    rc = -xsconstants.XSERR_GENERAL_FAILURE    errors = ""    if not on():        err("No policy active.")    try:        rc, errors = acm.chgpolicy(bin_pol, del_array, chg_array)    except Exception, e:        pass    if len(errors) > 0:        rc = -xsconstants.XSERR_HV_OP_FAILED    return rc, errorsdef hv_get_policy():    """        Gte the binary policy enforced in the hypervisor    """    rc = -xsconstants.XSERR_GENERAL_FAILURE    bin_pol = ""    if not on():        err("No policy active.")    try:        rc, bin_pol = acm.getpolicy()    except Exception, e:        pass    if len(bin_pol) == 0:        bin_pol = None    return rc, bin_poldef is_in_conflict(ssidref):    """ Check whether the given ssidref is in conflict with any running        domain.    """    decision = acm.getdecision('ssidref', str(ssidref),                               'ssidref', str(ssidref),                               ACMHOOK_conflictset)    if decision == "DENIED":        return True    return Falsedef set_policy(xs_type, xml, flags, overwrite):    """        Xend exports this function via XML-RPC    """    from xen.xend import XendXSPolicyAdmin    xspoladmin = XendXSPolicyAdmin.XSPolicyAdminInstance()    try:        acmpol, rc, errors = \             xspoladmin.add_acmpolicy_to_system(xml,                                                int(flags),                                                True)        return rc, base64.b64encode(errors)    except Exception, e:        err(str(e))def reset_policy():    """       Xend exports this function via XML-RPC    """    from xen.xend import XendXSPolicyAdmin    xspoladmin = XendXSPolicyAdmin.XSPolicyAdminInstance()    try:        acmpol, rc, errors = \             xspoladmin.reset_acmpolicy()        return rc, base64.b64encode(errors)    except Exception, e:        err(str(e))def get_policy():    """        Xend exports this function via XML-RPC    """    from xen.xend import XendXSPolicyAdmin    poladmin = XendXSPolicyAdmin.XSPolicyAdminInstance()    try:        policy = poladmin.get_loaded_policy()        if policy != None:            return policy.toxml(), poladmin.get_policy_flags(policy)    except Exception, e:        err(str(e))    return "", 0def activate_policy(flags):    """        Xend exports this function via XML-RPC    """    from xen.xend import XendXSPolicyAdmin    poladmin = XendXSPolicyAdmin.XSPolicyAdminInstance()    try:        policies = poladmin.get_policies()        if len(policies) > 0:           flags = int(flags)           irc = poladmin.activate_xspolicy(policies[0], flags)           return irc    except Exception, e:        err("Error while activating the policy: " % str(e))    return 0def rm_bootpolicy():    """        Xend exports this function via XML-RPC    """    from xen.xend import XendXSPolicyAdmin    rc = XendXSPolicyAdmin.XSPolicyAdminInstance().rm_bootpolicy()    if rc != xsconstants.XSERR_SUCCESS:        err("Error while removing boot policy: %s" % \            str(xsconstants.xserr2string(-rc)))    return rcdef get_xstype():    """        Xend exports this function via XML-RPC    """    from xen.xend import XendXSPolicyAdmin    return XendXSPolicyAdmin.XSPolicyAdminInstance().isXSEnabled()def get_domain_label(domain):    """        Xend exports this function via XML-RPC    """    from xen.xend import XendDomain    dom = XendDomain.instance().domain_lookup_nr(domain)    if dom:        seclab = dom.get_security_label()        return seclab    else:        err("Domain not found.")def set_domain_label(domain, seclab, old_seclab):    """        Xend exports this function via XML-RPC    """    from xen.xend import XendDomain    dom = XendDomain.instance().domain_lookup_nr(domain)    if dom:        results = dom.set_security_label(seclab, old_seclab)        rc, errors, old_label, new_ssidref = results        return rc, new_ssidref    else:        err("Domain not found.")def dump_policy():    if active_policy in ['NULL', 'INACTIVE', 'INACCESSIBLE' ]:        err("\'" + active_policy + "\' policy. Nothing to dump.")    (ret, output) = commands.getstatusoutput(xensec_tool + " getpolicy")    if ret:        err("Dumping hypervisor policy failed:\n" + output)    print outputdef dump_policy_file(filename, ssidref=None):    ssid = ""    if ssidref:        ssid = " " + str(ssidref)    (ret, output) = commands.getstatusoutput(xensec_tool + " dumppolicy " +                                             filename + ssid)    if ret:        err("Dumping policy failed:\n" + output)    print outputdef list_labels(policy_name, ltype):    """        Xend exports this function via XML-RPC        List the VM,resource or any kind of labels contained in the        given policy. If no policy name is given, the currently        active policy's label will be returned if they exist.    """    if not policy_name:        if active_policy in [ 'NULL', 'INACTIVE', "" ]:            err("Current policy \'" + active_policy + "\' "                "has no labels defined.\n")    if not ltype or ltype == 'dom':        condition = vm_label_re    elif ltype == 'res':        condition = res_label_re    elif ltype == 'any':        condition = all_label_re    else:        err("Unknown label type \'" + ltype + "\'")    try:        mapfile_lock()        (primary, secondary, f, pol_exists) = getmapfile(policy_name)        if not f:            if pol_exists:                err("Cannot find mapfile for policy \'" + policy_name + "\'.\n")            else:                err("Unknown policy \'" + policy_name + "\'")        labels = []        for line in f:            if condition.match(line):                label = line.split()[3]                if label not in labels:                    labels.append(label)    finally:        mapfile_unlock()    if '__NULL_LABEL__' in labels:        labels.remove('__NULL_LABEL__')    return labelsdef get_res_label(resource):    """Returns resource label information (policytype, label, policy) if       it exists. Otherwise returns null label and policy.    """    def default_res_label():        ssidref = NULL_SSIDREF        if on():            label = ssidref2label(ssidref)        else:            label = None        return (xsconstants.ACM_POLICY_ID, 'NULL', label)    tmp = get_resource_label(resource)    if len(tmp) == 2:        policytype = xsconstants.ACM_POLICY_ID        policy, label = tmp    elif len(tmp) == 3:        policytype, policy, label = tmp    else:        policytype, policy, label = default_res_label()    return (policytype, label, policy)def get_res_security_details(resource):    """Returns the (label, ssidref, policy) associated with a given       resource from the global resource label file.    """    def default_security_details():        ssidref = NULL_SSIDREF        if on():            label = ssidref2label(ssidref)        else:            label = None        policy = active_policy        return (label, ssidref, policy)    # find the entry associated with this resource    (policytype, label, policy) = get_res_label(resource)    if policy == 'NULL':        log.info("Resource label for "+resource+" not in file, using DEFAULT.")        return default_security_details()    if policytype != xsconstants.ACM_POLICY_ID:        raise VmError("Unknown policy type '%s in label for resource '%s'" %                      (policytype, resource))    # is this resource label for the running policy?    if policy == active_policy:        ssidref = label2ssidref(label, policy, 'res')    elif label == xsconstants.XS_INACCESSIBLE_LABEL:        ssidref = NULL_SSIDREF    else:        log.info("Resource label not for active policy, using DEFAULT.")        return default_security_details()    return (label, ssidref, policy)def security_label_to_details(seclab):    """ Convert a Xen-API type of security label into details """    def default_security_details():        ssidref = NULL_SSIDREF        if on():            label = ssidref2label(ssidref)        else:            label = None        policy = active_policy        return (label, ssidref, policy)    (policytype, policy, label) = seclab.split(":")    # is this resource label for the running policy?    if policy == active_policy:        ssidref = label2ssidref(label, policy, 'res')    else:        log.info("Resource label not for active policy, using DEFAULT.")        return default_security_details()    return (label, ssidref, policy)def unify_resname(resource, mustexist=True):    """Makes all resource locations absolute. In case of physical    resources, '/dev/' is added to local file names"""    if not resource:        return resource    # sanity check on resource name    try:        (typ, resfile) = resource.split(":", 1)    except:        err("Resource spec '%s' contains no ':' delimiter" % resource)    if typ == "tap":        try:            (subtype, resfile) = resfile.split(":")        except:            err("Resource spec '%s' contains no tap subtype" % resource)    if typ in ["phy"]:        if not resfile.startswith("/"):            resfile = "/dev/" + resfile        if mustexist:            resfile = os.path.realpath(resfile)            try:                stats = os.lstat(resfile)                if not (stat.S_ISBLK(stats[stat.ST_MODE])):                    err("Invalid resource")            except:                err("Invalid resource")    if typ in [ "file", "tap" ]:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -