📄 acm.py
字号:
resfile = os.path.realpath(resfile) if mustexist and not os.path.isfile(resfile): err("Invalid resource") #file: resources must be specified with absolute path #vlan resources don't start with '/' if typ != "vlan": if (not resfile.startswith("/")) or \ (mustexist and not os.path.exists(resfile)): err("Invalid resource.") # from here on absolute file names with resources if typ == "tap": typ = typ + ":" + subtype resource = typ + ":" + resfile return resourcedef res_security_check(resource, domain_label): """Checks if the given resource can be used by the given domain label. Returns 1 if the resource can be used, otherwise 0. """ rtnval = 1 # if security is on, ask the hypervisor for a decision if on(): #build canonical resource name resource = unify_resname(resource) (label, ssidref, policy) = get_res_security_details(resource) domac = ['access_control'] domac.append(['policy', active_policy]) domac.append(['label', domain_label]) domac.append(['type', 'dom']) decision = get_decision(domac, ['ssidref', str(ssidref)]) # provide descriptive error messages if decision == 'DENIED': if label == ssidref2label(NULL_SSIDREF): raise XSMError("Resource '"+resource+"' is not labeled") rtnval = 0 else: raise XSMError("Permission denied for resource '"+resource+"' because label '"+label+"' is not allowed") rtnval = 0 # security is off, make sure resource isn't labeled else: # Note, we can't canonicalise the resource here, because people using # xm without ACM are free to use relative paths. (policytype, label, policy) = get_res_label(resource) if policy != 'NULL': raise XSMError("Security is off, but '"+resource+"' is labeled") rtnval = 0 return rtnvaldef res_security_check_xapi(rlabel, rssidref, rpolicy, xapi_dom_label): """Checks if the given resource can be used by the given domain label. Returns 1 if the resource can be used, otherwise 0. """ rtnval = 1 # if security is on, ask the hypervisor for a decision if on(): if rlabel == xsconstants.XS_INACCESSIBLE_LABEL: return 0 typ, dpolicy, domain_label = xapi_dom_label.split(":") if not dpolicy or not domain_label: raise VmError("VM security label in wrong format.") if active_policy != rpolicy: raise VmError("Resource's policy '%s' != active policy '%s'" % (rpolicy, active_policy)) domac = ['access_control'] domac.append(['policy', active_policy]) domac.append(['label', domain_label]) domac.append(['type', 'dom']) decision = get_decision(domac, ['ssidref', str(rssidref)]) log.info("Access Control Decision : %s" % decision) # provide descriptive error messages if decision == 'DENIED': if rlabel == ssidref2label(NULL_SSIDREF): #raise XSMError("Resource is not labeled") rtnval = 0 else: #raise XSMError("Permission denied for resource because label '"+rlabel+"' is not allowed") rtnval = 0 # security is off, make sure resource isn't labeled else: # Note, we can't canonicalise the resource here, because people using # xm without ACM are free to use relative paths. if rpolicy != 'NULL': #raise XSMError("Security is off, but resource is labeled") rtnval = 0 return rtnvaldef validate_label_xapi(xapi_label, dom_or_res): """ Make sure that this label is part of the currently enforced policy and that it references the current policy. dom_or_res defines whether this is a VM ('res') or resource label ('res') """ tmp = xapi_label.split(":") if len(tmp) != 3: return -xsconstants.XSERR_BAD_LABEL_FORMAT policytyp, policyref, label = tmp return validate_label(policytyp, policyref, label, dom_or_res)def validate_label(policytype, policyref, label, dom_or_res): """ Make sure that this label is part of the currently enforced policy and that it reference the current policy. """ if policytype != xsconstants.ACM_POLICY_ID: return -xsconstants.XSERR_WRONG_POLICY_TYPE if not policytype or not label: return -xsconstants.XSERR_BAD_LABEL_FORMAT rc = xsconstants.XSERR_SUCCESS if label == xsconstants.XS_INACCESSIBLE_LABEL: return rc from xen.xend.XendXSPolicyAdmin import XSPolicyAdminInstance curpol = XSPolicyAdminInstance().get_loaded_policy() if not curpol or curpol.get_name() != policyref: rc = -xsconstants.XSERR_BAD_LABEL else: try: label2ssidref(label, curpol.get_name() , dom_or_res) except: rc = -xsconstants.XSERR_BAD_LABEL return rcdef set_resource_label_xapi(resource, reslabel_xapi, oldlabel_xapi): """Assign a resource label to a resource @param resource: The name of a resource, i.e., "phy:/dev/hda", or "tap:qcow:/path/to/file.qcow" @param reslabel_xapi: A resource label foramtted as in all other parts of the Xen-API, i.e., ACM:xm-test:blue" @rtype: int @return Success (0) or failure value (< 0) """ olabel = "" if reslabel_xapi == "": return rm_resource_label(resource, oldlabel_xapi) rc = validate_label_xapi(reslabel_xapi, 'res') if rc != xsconstants.XSERR_SUCCESS: return rc if oldlabel_xapi not in [ "" ]: tmp = oldlabel_xapi.split(":") if len(tmp) != 3: return -xsconstants.XSERR_BAD_LABEL_FORMAT otyp, opolicyref, olabel = tmp # Only ACM is supported if otyp != xsconstants.ACM_POLICY_ID and \ otyp != xsconstants.INVALID_POLICY_PREFIX + \ xsconstants.ACM_POLICY_ID: return -xsconstants.XSERR_WRONG_POLICY_TYPE typ, policyref, label = reslabel_xapi.split(":") return set_resource_label(resource, typ, policyref, label, olabel)def is_resource_in_use(resource): """ Domain-0 'owns' resources of type 'VLAN', the rest are owned by the guests. """ from xen.xend import XendDomain lst = [] if resource.startswith('vlan'): from xen.xend.XendXSPolicyAdmin import XSPolicyAdminInstance curpol = XSPolicyAdminInstance().get_loaded_policy() policytype, label, policy = get_res_label(resource) if curpol and \ policytype == xsconstants.ACM_POLICY_ID and \ policy == curpol.get_name() and \ label in curpol.policy_get_resourcelabel_names(): # VLAN is in use. lst.append(XendDomain.instance(). get_vm_by_uuid(XendDomain.DOM0_UUID)) else: dominfos = XendDomain.instance().list('all') for dominfo in dominfos: if is_resource_in_use_by_dom(dominfo, resource): lst.append(dominfo) return lstdef devices_equal(res1, res2, mustexist=True): """ Determine whether two devices are equal """ return (unify_resname(res1, mustexist) == unify_resname(res2, mustexist))def is_resource_in_use_by_dom(dominfo, resource): """ Determine whether a resources is in use by a given domain @return True or False """ if not dominfo.domid: return False if dominfo._stateGet() not in [ DOM_STATE_RUNNING ]: return False devs = dominfo.info['devices'] uuids = devs.keys() for uuid in uuids: dev = devs[uuid] if len(dev) >= 2 and dev[1].has_key('uname'): # dev[0] is type, i.e. 'vbd' if devices_equal(dev[1]['uname'], resource, mustexist=False): log.info("RESOURCE IN USE: Domain %d uses %s." % (dominfo.domid, resource)) return True return Falsedef get_domain_resources(dominfo): """ Collect all resources of a domain in a map where each entry of the map is a list. Entries are strored in the following formats: tap:qcow:/path/xyz.qcow """ resources = { 'vbd' : [], 'tap' : [], 'vif' : []} devs = dominfo.info['devices'] uuids = devs.keys() for uuid in uuids: dev = devs[uuid] typ = dev[0] if typ in [ 'vbd', 'tap' ]: resources[typ].append(dev[1]['uname']) if typ in [ 'vif' ]: sec_lab = dev[1].get('security_label') if sec_lab: resources[typ].append(sec_lab) else: # !!! This should really get the label of the domain # or at least a resource label that has the same STE type # as the domain has from xen.util.acmpolicy import ACM_LABEL_UNLABELED resources[typ].append("%s:%s:%s" % (xsconstants.ACM_POLICY_ID, active_policy, ACM_LABEL_UNLABELED)) return resourcesdef resources_compatible_with_vmlabel(xspol, dominfo, vmlabel): """ Check whether the resources' labels are compatible with the given VM label. This is a function to be used when for example a running domain is to get the new label 'vmlabel' """ if not xspol: return False try: resfile_lock() try: access_control = dictio.dict_read("resources", res_label_filename) except: # No labeled resources -> must be compatible return True return __resources_compatible_with_vmlabel(xspol, dominfo, vmlabel, access_control) finally: resfile_unlock() return Falsedef __resources_compatible_with_vmlabel(xspol, dominfo, vmlabel, access_control, is_policy_update=False): """ Check whether the resources' labels are compatible with the given VM label. The access_control parameter provides a dictionary of the resource name to resource label mappings under which the evaluation should be done. Call this only for a paused or running domain. """ def collect_labels(reslabels, s_label, polname): if len(s_label) != 3 or polname != s_label[1]: return False label = s_label[2] if not label in reslabels: reslabels.append(label) return True resources = get_domain_resources(dominfo) reslabels = [] # all resource labels polname = xspol.get_name() for key, value in resources.items(): if key in [ 'vbd', 'tap' ]: for res in resources[key]: if not res in access_control: label = [xsconstants.ACM_POLICY_ID, xspol.get_name(), ACM_LABEL_UNLABELED] else: label = access_control[res] if not collect_labels(reslabels, label, polname): return False elif key in [ 'vif' ]: for xapi_label in value: label = xapi_label.split(":") from xen.util.acmpolicy import ACM_LABEL_UNLABELED if not (is_policy_update and \ label[2] == ACM_LABEL_UNLABELED): if not collect_labels(reslabels, label, polname): return False else: log.error("Unhandled device type: %s" % key) return False # Check that all resource labes have a common STE type with the # vmlabel if len(reslabels) > 0: rc = xspol.policy_check_vmlabel_against_reslabels(vmlabel, reslabels) else: rc = True log.info("vmlabel=%s, reslabels=%s, rc=%s" % (vmlabel, reslabels, str(rc))) return rc;def set_resource_label(resource, policytype, policyref, reslabel, \ oreslabel = None): """ Xend exports this function via XML-RPC. Assign a label to a resource If the old label (oreslabel) is given, then the resource must have that old label. A resource label may be changed if - the resource is not in use @param resource : The name of a resource, i.e., "phy:/dev/hda" @param policyref : The name of the policy @param reslabel : the resource label within the policy @param oreslabel : optional current resource label @rtype: int @return Success (0) or failure value (< 0) """ try: resource = unify_resname(resource, mustexist=False) except Exception: return -xsconstants.XSERR_BAD_RESOURCE_FORMAT try: resfile_lock() mapfile_lock() if reslabel not in [ '', xsconstants.XS_INACCESSIBLE_LABEL ]: ssidref = label2ssidref(reslabel, policyref, 'res') domains = is_resource_in_use(resource) if len(domains) > 0: return -xsconstants.XSERR_RESOURCE_IN_USE access_control = {} try: access_control = dictio.dict_read("resources", res_label_filename) except: pass if oreslabel: if not access_control.has_key(resource): return -xsconstants.XSERR_BAD_LABEL tmp = access_control[resource] if len(tmp) != 3: return -xsconstants.XSERR_BAD_LABEL if tmp[2] != oreslabel: return -xsconstants.XSERR_BAD_LABEL if resource.startswith('vlan:'): for key, value in access_control.items(): if value == tuple([policytype, policyref, reslabel]) and \ key.startswith('vlan:'): return -xsconstants.XSERR_BAD_LABEL if reslabel == xsconstants.XS_INACCESSIBLE_LABEL: policytype = xsconstants.ACM_POLICY_ID policyref = '*' if reslabel != "": new_entry = { resource : tuple([policytype, policyref, reslabel])} access_control.update(new_entry) command = "add" reslbl = ":".join([policytype, policyref, reslabel]) else: if access_control.has_key(resource): del access_control[resource] command = "remove" reslbl = "" run_resource_label_change_script(resource, reslbl, command) dictio.dict_write(access_control, "resources", res_label_filename) finally: resfile_unlock() mapfile_unlock() return xsconstants.XSERR_SUCCESSdef rm_resource_label(resource, oldlabel_xapi): """Remove a resource label from a physical resource @param resource: The name of a resource, i.e., "phy:/dev/hda" @rtype: int @return Success (0) or failure value (< 0) """ tmp = oldlabel_xapi.split(":") if len(tmp) != 3: return -xsconstants.XSERR_BAD_LABEL_FORMAT otyp, opolicyref, olabel = tmp # Only ACM is supported if otyp != xsconstants.ACM_POLICY_ID and \ otyp != xsconstants.INVALID_POLICY_PREFIX + xsconstants.ACM_POLICY_ID: return -xsconstants.XSERR_WRONG_POLICY_TYPE return set_resource_label(resource, "", "", "", olabel)def get_resource_label_xapi(resource): """Get the assigned resource label of a physical resource in the format used by then Xen-API, i.e., "ACM:xm-test:blue" @rtype: string @return the string representing policy type, policy name and label of the resource """
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -