📄 acm.py
字号:
#===========================================================================# This library is free software; you can redistribute it and/or# modify it under the terms of version 2.1 of the GNU Lesser General Public# License as published by the Free Software Foundation.## This library is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU# Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public# License along with this library; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA#============================================================================# Copyright (C) 2006 International Business Machines Corp.# Author: Reiner Sailer# Author: Bryan D. Payne <bdpayne@us.ibm.com># Author: Stefan Berger <stefanb@us.ibm.com>#============================================================================import commandsimport loggingimport os, string, reimport threadingimport structimport statimport base64from xen.lowlevel import acmfrom xen.xend import sxpfrom xen.xend import XendConstantsfrom xen.xend import XendOptionsfrom xen.xend.XendLogging import logfrom xen.xend.XendError import VmErrorfrom xen.util import dictio, xsconstantsfrom xen.xend.XendConstants import *#global directories and tools for security managementinstall_policy_dir_prefix = "/etc/xen/acm-security/policies"security_dir_prefix = XendOptions.instance().get_xend_security_path()policy_dir_prefix = security_dir_prefix + "/policies"res_label_filename = policy_dir_prefix + "/resource_labels"boot_filename = "/boot/grub/menu.lst"altboot_filename = "/boot/grub/grub.conf"xensec_tool = "/usr/sbin/xensec_tool"#global patterns for map file#police_reference_tagname = "POLICYREFERENCENAME"primary_entry_re = re.compile("\s*PRIMARY\s+.*", re.IGNORECASE)secondary_entry_re = re.compile("\s*SECONDARY\s+.*", re.IGNORECASE)label_template_re = re.compile(".*security_label_template.xml", re.IGNORECASE)mapping_filename_re = re.compile(".*\.map", re.IGNORECASE)policy_reference_entry_re = re.compile("\s*POLICYREFERENCENAME\s+.*", re.IGNORECASE)vm_label_re = re.compile("\s*LABEL->SSID\s.+[VM|ANY]\s+.*", re.IGNORECASE)res_label_re = re.compile("\s*LABEL->SSID\s+RES\s+.*", re.IGNORECASE)all_label_re = re.compile("\s*LABEL->SSID\s+.*", re.IGNORECASE)access_control_re = re.compile("\s*access_control\s*=", re.IGNORECASE)#global patterns for boot configuration filexen_title_re = re.compile("\s*title\s+XEN", re.IGNORECASE)any_title_re = re.compile("\s*title\s", re.IGNORECASE)xen_kernel_re = re.compile("\s*kernel.*xen.*\.gz", re.IGNORECASE)kernel_ver_re = re.compile("\s*module.*vmlinuz", re.IGNORECASE)any_module_re = re.compile("\s*module\s", re.IGNORECASE)empty_line_re = re.compile("^\s*$")binary_name_re = re.compile(".*[chwall|ste|chwall_ste].*\.bin", re.IGNORECASE)policy_name_re = re.compile(".*[chwall|ste|chwall_ste].*", re.IGNORECASE)#decision hooks known to the hypervisorACMHOOK_sharing = 1ACMHOOK_authorization = 2ACMHOOK_conflictset = 3#other global variablesNULL_SSIDREF = 0#general Rlock for map files; only one lock for all mapfiles__mapfile_lock = threading.RLock()__resfile_lock = threading.RLock()log = logging.getLogger("xend.util.security")#Functions exported through XML-RPCxmlrpc_exports = [ 'set_resource_label', 'get_resource_label', 'list_labels', 'get_labeled_resources', 'set_policy', 'reset_policy', 'get_policy', 'activate_policy', 'rm_bootpolicy', 'get_xstype', 'get_domain_label', 'set_domain_label']# Our own exception definition. It is masked (pass) if raised and# whoever raises this exception must provide error information.class XSMError(Exception): def __init__(self,value): self.value = value def __str__(self): return repr(self.value)def err(msg): """Raise ACM exception. """ raise XSMError(msg)active_policy = Nonedef mapfile_lock(): __mapfile_lock.acquire()def mapfile_unlock(): __mapfile_lock.release()def resfile_lock(): __resfile_lock.acquire()def resfile_unlock(): __resfile_lock.release()def refresh_security_policy(): """ retrieves security policy """ global active_policy active_policy = 'INACCESSIBLE' if os.access("/proc/xen/privcmd", os.R_OK|os.W_OK): try: active_policy = acm.policy() except: active_policy = "INACTIVE"def get_active_policy_name(): refresh_security_policy() return active_policy# now set active_policyrefresh_security_policy()def on(): """ returns none if security policy is off (not compiled), any string otherwise, use it: if not security.on() ... """ if get_active_policy_name() not in ['INACTIVE', 'NULL', '']: return xsconstants.XS_POLICY_ACM return 0def calc_dom_ssidref_from_info(info): """ Calculate a domain's ssidref from the security_label in its info. This function is called before the domain is started and makes sure that: - the type of the policy is the same as indicated in the label - the name of the policy is the same as indicated in the label - calculates an up-to-date ssidref for the domain The latter is necessary since the domain's ssidref could have changed due to changes to the policy. """ import xen.xend.XendConfig if isinstance(info, xen.xend.XendConfig.XendConfig): if info.has_key('security_label'): seclab = info['security_label'] tmp = seclab.split(":") if len(tmp) != 3: raise VmError("VM label '%s' in wrong format." % seclab) typ, policyname, vmlabel = seclab.split(":") if typ != xsconstants.ACM_POLICY_ID: raise VmError("Policy type '%s' must be changed." % typ) if get_active_policy_name() != policyname: raise VmError("Active policy '%s' different than " "what in VM's label ('%s')." % (get_active_policy_name(), policyname)) ssidref = label2ssidref(vmlabel, policyname, "dom") return ssidref else: return 0x0 raise VmError("security.calc_dom_ssidref_from_info: info of type '%s'" "not supported." % type(info))def getmapfile(policyname): """ in: if policyname is None then the currently active hypervisor policy is used out: 1. primary policy, 2. secondary policy, 3. open file descriptor for mapping file, and 4. True if policy file is available, False otherwise """ if not policyname: policyname = get_active_policy_name() map_file_ok = False primary = None secondary = None #strip last part of policy as file name part policy_dir_list = string.split(policyname, ".") policy_file = policy_dir_list.pop() if len(policy_dir_list) > 0: policy_dir = string.join(policy_dir_list, "/") + "/" else: policy_dir = "" map_filename = policy_dir_prefix + "/" + policy_dir + policy_file + ".map" # check if it is there, if not check if policy file is there if not os.path.isfile(map_filename): policy_filename = policy_dir_prefix + "/" + policy_dir + policy_file + "-security_policy.xml" if not os.path.isfile(policy_filename): err("Policy file \'" + policy_filename + "\' not found.") else: err("Mapping file \'" + map_filename + "\' not found.") f = open(map_filename) for line in f: if policy_reference_entry_re.match(line): l = line.split() if (len(l) == 2) and (l[1] == policyname): map_file_ok = True elif primary_entry_re.match(line): l = line.split() if len(l) == 2: primary = l[1] elif secondary_entry_re.match(line): l = line.split() if len(l) == 2: secondary = l[1] f.close() f = open(map_filename) if map_file_ok and primary and secondary: return (primary, secondary, f, True) else: err("Mapping file inconsistencies found.")def ssidref2label(ssidref_var): """ returns labelname corresponding to ssidref; maps current policy to default directory to find mapping file """ #1. translated permitted input formats if isinstance(ssidref_var, str): ssidref_var.strip() if ssidref_var[0:2] == "0x": ssidref = int(ssidref_var[2:], 16) else: ssidref = int(ssidref_var) elif isinstance(ssidref_var, int): ssidref = ssidref_var else: err("Instance type of ssidref not supported (must be of type 'str' or 'int')") if ssidref == 0: from xen.util.acmpolicy import ACM_LABEL_UNLABELED return ACM_LABEL_UNLABELED try: mapfile_lock() (primary, secondary, f, pol_exists) = getmapfile(None) if not f: if (pol_exists): err("Mapping file for policy not found.") else: err("Policy file for \'" + get_active_policy_name() + "\' not found.") #2. get labelnames for both ssidref parts pri_ssid = ssidref & 0xffff sec_ssid = ssidref >> 16 pri_null_ssid = NULL_SSIDREF & 0xffff sec_null_ssid = NULL_SSIDREF >> 16 pri_labels = [] sec_labels = [] labels = [] for line in f: l = line.split() if (len(l) < 5) or (l[0] != "LABEL->SSID"): continue if primary and (l[2] == primary) and (int(l[4], 16) == pri_ssid): pri_labels.append(l[3]) if secondary and (l[2] == secondary) and (int(l[4], 16) == sec_ssid): sec_labels.append(l[3]) f.close() finally: mapfile_unlock() #3. get the label that is in both lists (combination must be a single label) if (primary == "CHWALL") and (pri_ssid == pri_null_ssid) and (sec_ssid != sec_null_ssid): labels = sec_labels elif (secondary == "CHWALL") and (pri_ssid != pri_null_ssid) and (sec_ssid == sec_null_ssid): labels = pri_labels elif secondary == "NULL": labels = pri_labels else: for i in pri_labels: for j in sec_labels: if (i==j): labels.append(i) if len(labels) != 1: err("Label for ssidref \'" + str(ssidref) + "\' unknown or not unique in policy \'" + active_policy + "\'") return labels[0]def label2ssidref(labelname, policyname, typ): """ returns ssidref corresponding to labelname; maps current policy to default directory to find mapping file """ if policyname in ['NULL', 'INACTIVE', 'INACCESSIBLE' ]: err("Cannot translate labels for \'" + policyname + "\' policy.") allowed_types = ['ANY'] if typ == 'dom': allowed_types.append('VM') elif typ == 'res': allowed_types.append('RES') else: err("Invalid type. Must specify 'dom' or 'res'.") try: mapfile_lock() (primary, secondary, f, pol_exists) = getmapfile(policyname) #2. get labelnames for ssidref parts and find a common label pri_ssid = [] sec_ssid = [] for line in f: l = line.split() if (len(l) < 5) or (l[0] != "LABEL->SSID"): continue if primary and (l[1] in allowed_types) and \ (l[2] == primary) and \ (l[3] == labelname): pri_ssid.append(int(l[4], 16)) if secondary and (l[1] in allowed_types) and \ (l[2] == secondary) and \ (l[3] == labelname): sec_ssid.append(int(l[4], 16)) f.close() if (typ == 'res') and (primary == "CHWALL") and (len(pri_ssid) == 0): pri_ssid.append(NULL_SSIDREF) elif (typ == 'res') and (secondary == "CHWALL") and \ (len(sec_ssid) == 0): sec_ssid.append(NULL_SSIDREF) #3. sanity check and composition of ssidref if (len(pri_ssid) == 0) or ((len(sec_ssid) == 0) and \ (secondary != "NULL")): err("Label \'" + labelname + "\' not found.") elif (len(pri_ssid) > 1) or (len(sec_ssid) > 1): err("Label \'" + labelname + "\' not unique in policy (policy error)") if secondary == "NULL": return pri_ssid[0] else: return (sec_ssid[0] << 16) | pri_ssid[0] finally: mapfile_unlock()def refresh_ssidref(config): """ looks up ssidref from security field and refreshes the value if label exists """ #called by dom0, policy could have changed after xen.utils.security was initialized refresh_security_policy() security = None if isinstance(config, dict): security = config['security'] elif isinstance(config, list): security = sxp.child_value(config, 'security') else: err("Instance type of config parameter not supported.") if not security: #nothing to do (no security label attached) return config policyname = None labelname = None # compose new security field for idx in range(0, len(security)): if security[idx][0] == 'ssidref': security.pop(idx) break elif security[idx][0] == 'access_control': for jdx in [1, 2]: if security[idx][jdx][0] == 'label': labelname = security[idx][jdx][1] elif security[idx][jdx][0] == 'policy': policyname = security[idx][jdx][1] else: err("Illegal field in access_control") #verify policy is correct if active_policy != policyname: err("Policy \'" + str(policyname) + "\' in label does not match active policy \'" + str(active_policy) +"\'!") new_ssidref = label2ssidref(labelname, policyname, 'dom') if not new_ssidref: err("SSIDREF refresh failed!") security.append([ 'ssidref',str(new_ssidref)]) security = ['security', security ] for idx in range(0,len(config)): if config[idx][0] == 'security':
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -