📄 09_security-acm_pol_update.py
字号:
#!/usr/bin/python# Copyright (C) International Business Machines Corp., 2007# Author: Stefan Berger <stefanb@us.ibm.com># Test to exercise the xspolicy and acmpolicy classesfrom XmTestLib import xapifrom XmTestLib.XenAPIDomain import XmTestAPIDomainfrom XmTestLib.acm import *from XmTestLib import *from xen.xend import XendAPIConstantsimport xen.util.xsm.xsm as securityfrom xen.util import xsconstantsfrom xen.util.acmpolicy import ACMPolicyfrom xen.xend.XendDomain import DOM0_UUIDimport base64import structimport timeif not isACMEnabled(): SKIP("Not running this test since ACM not enabled.")try: session = xapi.connect()except: SKIP("Skipping this test since xm is not using the Xen-API.")def typestoxml(types): res = "" for t in types: res += "<Type>" + t + "</Type>\n" return resdef cfstoxml(cfss): res = "" for cfs in cfss: res += "<Conflict name=\"" + cfs['name'] + "\">\n" + \ typestoxml(cfs['chws']) + \ "</Conflict>\n" return resdef vmlabelstoxml(vmlabels, vmfrommap): res = "" for vmlabel in vmlabels: res += "<VirtualMachineLabel>\n" if vmlabel['name'] in vmfrommap: res += "<Name from=\""+ vmfrommap[vmlabel['name']] +"\">" else: res += "<Name>" res += vmlabel['name'] + "</Name>\n" res += "<SimpleTypeEnforcementTypes>\n" + \ typestoxml(vmlabel['stes']) + \ "</SimpleTypeEnforcementTypes>\n" if vmlabel.has_key('chws'): res += "<ChineseWallTypes>\n" + \ typestoxml(vmlabel['chws']) + \ "</ChineseWallTypes>\n" res += "</VirtualMachineLabel>\n" return resdef reslabelstoxml(reslabels, resfrommap): res = "" for reslabel in reslabels: res += "<ResourceLabel>\n" if resfrommap.has_key(reslabel['name']): res += "<Name from=\""+ resfrommap[reslabel['name']] +"\">" else: res += "<Name>" res += reslabel['name'] + "</Name>\n" res += "<SimpleTypeEnforcementTypes>\n" + \ typestoxml(reslabel['stes']) + \ "</SimpleTypeEnforcementTypes>\n" res += "</ResourceLabel>\n" return resdef create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss): hdr_xml ="<PolicyHeader>\n" + \ " <PolicyName>" + hdr['name'] + "</PolicyName>\n" + \ " <Version>" + hdr['version'] + "</Version>\n" + \ " <FromPolicy>\n" + \ " <PolicyName>" + hdr['oldname'] + "</PolicyName>\n" + \ " <Version>" + hdr['oldversion'] + "</Version>\n" + \ " </FromPolicy>\n" + \ "</PolicyHeader>\n" stes_xml = "<SimpleTypeEnforcement>\n" + \ " <SimpleTypeEnforcementTypes>\n" + \ typestoxml(stes) + \ " </SimpleTypeEnforcementTypes>\n" + \ "</SimpleTypeEnforcement>\n" chws_xml = "<ChineseWall>\n" + \ " <ChineseWallTypes>\n" + \ typestoxml(chws) + \ " </ChineseWallTypes>\n" + \ " <ConflictSets>\n" + \ cfstoxml(cfss) + \ " </ConflictSets>\n" + \ "</ChineseWall>\n" subjlabel_xml = "<SubjectLabels bootstrap=\""+ bootstrap +"\">\n" + \ vmlabelstoxml(vmlabels, vmfrommap) + \ "</SubjectLabels>\n" objlabel_xml = "<ObjectLabels>\n" + \ reslabelstoxml(reslabels, resfrommap) + \ "</ObjectLabels>\n" policyxml = "<?xml version=\"1.0\" ?>\n" + \ "<SecurityPolicyDefinition xmlns=\"http://www.ibm.com\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://www.ibm.com ../../security_policy.xsd \">\n" + \ hdr_xml + \ stes_xml + \ chws_xml + \ "<SecurityLabelTemplate>\n" + \ subjlabel_xml + \ objlabel_xml + \ "</SecurityLabelTemplate>\n" + \ "</SecurityPolicyDefinition>\n" return policyxmldef update_hdr(hdr): """ Update the version information in the header """ hdr['oldversion'] = hdr['version'] hdr['oldname'] = hdr['name'] vers = hdr['version'] tmp = vers.split('.') if len(tmp) == 1: rev = 1 else: rev = int(tmp[1]) + 1 hdr['version'] = "%s.%s" % (tmp[0],rev) return hdrsession = xapi.connect()policystate = session.xenapi.XSPolicy.get_xspolicy()if policystate['repr'] != "": print "%s" % policystate['repr'] try: acmpol = ACMPolicy(xml=policystate['repr']) except Exception, e: FAIL("Failure from creating ACMPolicy object: %s" % str(e)) oldname = acmpol.policy_dom_get_hdr_item("PolicyName") oldvers = acmpol.policy_dom_get_hdr_item("Version") tmp = oldvers.split(".") if len(tmp) == 1: rev = 1 else: rev = int(tmp[1]) + 1 newvers = "%s.%s" % (tmp[0], str(rev)) print "old name/version = %s/%s" % (oldname, oldvers)else: oldname = None oldvers = None newvers = "1.0"# Initialize the header of the policyhdr = {}hdr['name'] = "xm-test"hdr['version'] = newversif oldname: hdr['oldname'] = oldname if oldvers and oldvers != "": hdr['oldversion'] = oldversstes = [ "SystemManagement", "red", "green", "blue" ]chws = [ "SystemManagement", "red", "green", "blue" ]bootstrap = "SystemManagement"vm_sysmgt = { 'name' : bootstrap, 'stes' : stes, 'chws' : [ "SystemManagement" ] }vm_red = { 'name' : "red" , 'stes' : ["red"] , 'chws' : ["red"] }vm_green = { 'name' : "green" , 'stes' : ["green"] , 'chws' : ["green"] }vm_blue = { 'name' : "blue" , 'stes' : ["blue"] , 'chws' : ["blue"] }res_red = { 'name' : "red" , 'stes' : ["red"] }res_green = { 'name' : "green" , 'stes' : ["green"] }res_blue = { 'name' : "blue" , 'stes' : ["blue"] }cfs_1 = { 'name' : "CFS1", 'chws' : [ "red" , "blue" ] }vmlabels = [ vm_sysmgt, vm_red, vm_green, vm_blue ]vmfrommap = {}reslabels = [ res_red, res_green, res_blue ]resfrommap = {}cfss = [ cfs_1 ]vm_label_red = xsconstants.ACM_POLICY_ID + ":xm-test:red"vm_label_green = xsconstants.ACM_POLICY_ID + ":xm-test:green"vm_label_blue = xsconstants.ACM_POLICY_ID + ":xm-test:blue"xml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)xml_good = xmlpolicystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "\n\npolicystate = %s" % policystatepolicystate = session.xenapi.XSPolicy.get_xspolicy()## Create two non-conflicting domains and start them#try: # XmTestAPIDomain tries to establish a connection to XenD domain1 = XmTestAPIDomain(extraConfig={ 'security_label' : vm_label_red })except Exception, e: SKIP("Skipping test. Error: %s" % str(e))vm1_uuid = domain1.get_uuid()try: domain1.start(noConsole=True)except: FAIL("Could not start domain1")print "Domain 1 started"try: # XmTestAPIDomain tries to establish a connection to XenD domain2 = XmTestAPIDomain(extraConfig={'security_label': vm_label_green })except Exception, e: SKIP("Skipping test. Error: %s" % str(e))vm2_uuid = domain2.get_uuid()try: domain2.start(noConsole=True)except: FAIL("Could not start domain1")print "Domain 2 started"# Try a policy that would put the two domains into conflictcfs_2 = { 'name' : "CFS1", 'chws' : [ "red" , "green" ] }cfss = [ cfs_2 ]hdr = update_hdr(hdr)xml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "policystate %s" % policystateif int(policystate['xserr']) == 0: FAIL("(1) Should not have been able to set this policy.")if len(policystate['errors']) == 0: FAIL("Hypervisor should have reported errros.")errors = base64.b64decode(policystate['errors'])print "Length of errors: %d" % len(errors)a,b = struct.unpack("!ii",errors)print "%08x , %08x" % (a,b)## Create a faulty policy with 'red' STE missing#cfss = [ cfs_1 ]stes = [ "SystemManagement", "green", "blue" ]xml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "Result from setting faulty(!) policy with STE 'red' missing:"print "policystate %s" % policystateif int(policystate['xserr']) == 0: FAIL("(2) Should not have been able to set this policy.")## Create a policy with 'red' VMLabel missing -- should not work since it is# in use.#stes = [ "SystemManagement", "red", "green", "blue" ]vmlabels = [ vm_sysmgt, vm_green, vm_blue ]xml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "Result from setting faulty(!) policy with VMlabel 'red' missing:"print "policystate %s" % policystateif int(policystate['xserr']) == 0: FAIL("(3) Should not have been able to set this policy.")## Create a policy with 'blue' VMLabel missing -- should work since it is NOT# in use.#vmlabels = [ vm_sysmgt, vm_red, vm_green ]xml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "Result from setting (good) policy with VMlabel 'blue' missing:"print "policystate %s" % policystateif int(policystate['xserr']) != 0: FAIL("(4) Should have been able to set this policy: %s" % xml)## Move the green VMLabel towards blue which should put the running# domain with label blue into a conflict set#vmlabels = [ vm_sysmgt, vm_red, vm_blue ]vmfrommap = { "blue" : "green" } # new : oldhdr = update_hdr(hdr) #Needed, since last update was successfulxml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "policystate %s" % policystateif int(policystate['xserr']) == 0: FAIL("(5) Should not have been able to set this policy.")## Try to install a policy where a VM label has a faulty VM label name#vmfrommap = {}vm_blue_bad = { 'name' : "blue:x" , # ':' no allowed 'stes' : ["blue"], 'chws' : ["blue"] }vmlabels = [ vm_sysmgt, vm_red, vm_green, vm_blue_bad ]xml = create_xml_policy(hdr, stes, chws, vmlabels, vmfrommap, bootstrap, reslabels, resfrommap, cfss)policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, xml, xsconstants.XS_INST_LOAD, True)print "policystate %s" % policystateif int(policystate['xserr']) == 0: FAIL("(6) Should not have been able to set this policy.")## End the test by installing the initial policy again#cur_version = hdr['version'](maj, min) = cur_version.split(".")cur_version = "%s.%s" % (maj, str(int(min)-1) )orig_acmpol = ACMPolicy(xml=xml_good)orig_acmpol.set_frompolicy_version(cur_version)orig_acmpol.set_policy_version(hdr['version'])policystate = session.xenapi.XSPolicy.set_xspolicy(xsconstants.XS_POLICY_ACM, orig_acmpol.toxml(), xsconstants.XS_INST_LOAD, True)if int(policystate['xserr']) != 0: FAIL("(END) Should have been able to set this policy.")domain1.stop()domain2.stop()domain1.destroy()domain2.destroy()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -