⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xendxspolicy.py

📁 xen虚拟机源代码安装包
💻 PY
字号:
#============================================================================# This library is free software; you can redistribute it and/or# modify it under the terms of version 2.1 of the GNU Lesser General Public# License as published by the Free Software Foundation.## This library is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU# Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public# License along with this library; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA#============================================================================# Copyright (c) 2007 IBM Corporation# Copyright (c) 2006 Xensource#============================================================================import base64import loggingfrom xen.xend import XendDomainfrom xen.xend.XendBase import XendBasefrom xen.xend.XendError import *from xen.xend.XendAPIConstants import *from xen.xend.XendXSPolicyAdmin import XSPolicyAdminInstancefrom xen.util import xsconstantsimport xen.util.xsm.xsm as securitylog = logging.getLogger("xend.XendXSPolicy")log.setLevel(logging.TRACE)class XendXSPolicy(XendBase):    """ Administration class for an XSPolicy. """    def getClass(self):        return "XSPolicy"    def getMethods(self):        methods = ['activate_xspolicy']        return XendBase.getMethods() + methods    def getFuncs(self):        funcs = [ 'get_xstype',                  'set_xspolicy',                  'reset_xspolicy',                  'get_xspolicy',                  'rm_xsbootpolicy',                  'get_resource_label',                  'set_resource_label',                  'get_labeled_resources',                  'can_run' ]        return XendBase.getFuncs() + funcs    getClass    = classmethod(getClass)    getMethods  = classmethod(getMethods)    getFuncs    = classmethod(getFuncs)    def __init__(self, xspol, record, uuid):        """ xspol = actual XSPolicy  object """        self.xspol = xspol        XendBase.__init__(self, uuid, record)    def get_record(self):        xspol_record = {          'uuid'   : self.get_uuid(),          'flags'  : XSPolicyAdminInstance().get_policy_flags(self.xspol),          'repr'   : self.xspol.toxml(),          'type'   : self.xspol.get_type(),        }        return xspol_record    def get_xstype(self):        return XSPolicyAdminInstance().isXSEnabled()    def set_xspolicy(self, xstype, xml, flags, overwrite):        ref = ""        xstype = int(xstype)        flags  = int(flags)        polstate = { 'xs_ref': "", 'repr'   : "", 'type'   : 0,                     'flags' : 0 , 'version': 0 , 'errors' : "", 'xserr' : 0 }        if xstype == xsconstants.XS_POLICY_ACM:            poladmin = XSPolicyAdminInstance()            try:                (xspol, rc, errors) = poladmin.add_acmpolicy_to_system(                                                                   xml, flags,                                                                   overwrite)                if rc != 0:                    polstate.update( { 'xserr' : rc,                                       'errors': base64.b64encode(errors) } )                else:                    ref = xspol.get_ref()                    polstate = {                      'xs_ref' : ref,                      'flags'  : poladmin.get_policy_flags(xspol),                      'type'   : xstype,                      'repr'   : "",                      'version': xspol.get_version(),                      'errors' : base64.b64encode(errors),                      'xserr'  : rc,                    }            except Exception, e:                raise        else:            raise SecurityError(-xsconstants.XSERR_POLICY_TYPE_UNSUPPORTED)        return polstate    def reset_xspolicy(self, xstype):        xstype = int(xstype)        polstate = { 'xs_ref': "", 'repr'   : "", 'type'   : 0,                     'flags' : 0 , 'version': 0 , 'errors' : "", 'xserr' : 0 }        if xstype == xsconstants.XS_POLICY_ACM:            poladmin = XSPolicyAdminInstance()            try:                (xspol, rc, errors) = poladmin.reset_acmpolicy()                if rc != 0:                    polstate.update( { 'xserr' : rc,                                       'errors': base64.b64encode(errors) } )                else:                    ref = xspol.get_ref()                    polstate = {                      'xs_ref' : ref,                      'flags'  : poladmin.get_policy_flags(xspol),                      'type'   : xstype,                      'repr'   : "",                      'version': xspol.get_version(),                      'errors' : base64.b64encode(errors),                      'xserr'  : rc,                    }            except Exception, e:                raise        else:            raise SecurityError(-xsconstants.XSERR_POLICY_TYPE_UNSUPPORTED)        return polstate    def activate_xspolicy(self, flags):        flags = int(flags)        rc = -xsconstants.XSERR_GENERAL_FAILURE        poladmin = XSPolicyAdminInstance()        try:            rc = poladmin.activate_xspolicy(self.xspol, flags)        except Exception, e:            log.info("Activate_policy: %s" % str(e))        if rc != flags:            raise SecurityError(rc)        return flags    def get_xspolicy(self):        polstate = { 'xs_ref' : "",                     'repr'   : "",                     'type'   : 0,                     'flags'  : 0,                     'version': "",                     'errors' : "",                     'xserr'  : 0 }        poladmin = XSPolicyAdminInstance()        refs = poladmin.get_policies_refs()        # Will return one or no policy        if refs and len(refs) > 0:            ref = refs[0]            xspol = XSPolicyAdminInstance().policy_from_ref(ref)            if xspol:                polstate = {                  'xs_ref' : ref,                  'repr'   : xspol.toxml(),                  'type'   : xspol.get_type(),                  'flags'  : poladmin.get_policy_flags(xspol),                  'version': xspol.get_version(),                  'errors' : "",                  'xserr'  : 0,                }        return polstate    def rm_xsbootpolicy(self):        rc = XSPolicyAdminInstance().rm_bootpolicy()        if rc != xsconstants.XSERR_SUCCESS:            raise SecurityError(rc)    def get_labeled_resources(self):        return security.get_labeled_resources_xapi()    def set_resource_label(self, resource, sec_lab, old_lab):        rc = security.set_resource_label_xapi(resource, sec_lab, old_lab)        if rc != xsconstants.XSERR_SUCCESS:            raise SecurityError(rc)    def get_resource_label(self, resource):        res = security.get_resource_label_xapi(resource)        return res    def can_run(self, sec_label):        irc = security.validate_label_xapi(sec_label, 'dom')        if irc != xsconstants.XSERR_SUCCESS:            raise SecurityError(irc)        return security.check_can_run(sec_label)    get_xstype      = classmethod(get_xstype)    get_xspolicy    = classmethod(get_xspolicy)    set_xspolicy    = classmethod(set_xspolicy)    reset_xspolicy  = classmethod(reset_xspolicy)    rm_xsbootpolicy = classmethod(rm_xsbootpolicy)    set_resource_label = classmethod(set_resource_label)    get_resource_label = classmethod(get_resource_label)    get_labeled_resources = classmethod(get_labeled_resources)    can_run = classmethod(can_run)class XendACMPolicy(XendXSPolicy):    """ Administration class of an ACMPolicy """    def getClass(self):        return "ACMPolicy"    def getAttrRO(self):        attrRO = [ 'xml',                   'map',                   'binary',                   'header' ]        return XendXSPolicy.getAttrRO() + attrRO    def getFuncs(self):        funcs = [ 'get_enforced_binary', 'get_VM_ssidref' ]        return XendBase.getFuncs() + funcs    getClass    = classmethod(getClass)    getAttrRO   = classmethod(getAttrRO)    getFuncs    = classmethod(getFuncs)    def __init__(self, acmpol, record, uuid):        """ acmpol = actual ACMPolicy object """        self.acmpol = acmpol        XendXSPolicy.__init__(self, acmpol, record, uuid)    def get_record(self):        polstate = {          'uuid'   : self.get_uuid(),          'flags'  : XSPolicyAdminInstance().get_policy_flags(self.acmpol),          'repr'   : self.acmpol.toxml(),          'type'   : self.acmpol.get_type(),        }        return polstate    def get_header(self):        header = {          'policyname'   : "", 'policyurl'    : "", 'reference'    : "",          'date'         : "", 'namespaceurl' : "", 'version'      : "",        }        try:            header = self.acmpol.get_header_fields_map()        except:            pass        return header    def get_xml(self):        return self.acmpol.toxml()    def get_map(self):        return self.acmpol.get_map()    def get_binary(self):        polbin = self.acmpol.get_bin()        return base64.b64encode(polbin)    def get_VM_ssidref(self, vm_ref):        dom = XendDomain.instance().get_vm_by_uuid(vm_ref)        if not dom:            raise InvalidHandleError("VM", vm_ref)        if dom._stateGet() not in [ XEN_API_VM_POWER_STATE_RUNNING, \                                    XEN_API_VM_POWER_STATE_PAUSED ]:            raise VMBadState("Domain is not running or paused.")        ssid = security.get_ssid(dom.getDomid())        if not ssid:            raise SecurityError(-xsconstants.XSERR_GENERAL_FAILURE)        return ssid[3]    def get_enforced_binary(self):        polbin = XSPolicyAdminInstance(). \                   get_enforced_binary(xsconstants.XS_POLICY_ACM)        if polbin:            return base64.b64encode(polbin)        return None    get_enforced_binary = classmethod(get_enforced_binary)    get_VM_ssidref = classmethod(get_VM_ssidref)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -