⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 samba3sam.py

📁 samba最新软件
💻 PY
📖 第 1 页 / 共 3 页
字号:
#!/usr/bin/python# Unix SMB/CIFS implementation.# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007# Copyright (C) Martin Kuehl <mkhl@samba.org> 2006## This is a Python port of the original in testprogs/ejs/samba3sam.js#   # This program is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3 of the License, or# (at your option) any later version.#   # This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.#   # You should have received a copy of the GNU General Public License# along with this program.  If not, see <http://www.gnu.org/licenses/>.#"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""import osimport sysimport sambaimport ldbfrom ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREEfrom samba import Ldb, substitute_varfrom samba.tests import LdbTestCase, TestCaseInTempDirdatadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")class MapBaseTestCase(TestCaseInTempDir):    def setup_data(self, obj, ldif):        self.assertTrue(ldif is not None)        obj.db.add_ldif(substitute_var(ldif, obj.substvars))    def setup_modules(self, ldb, s3, s4):        ldb.add({"dn": "@MAP=samba3sam",                 "@FROM": s4.basedn,                 "@TO": "sambaDomainName=TESTS," + s3.basedn})        ldb.add({"dn": "@MODULES",                 "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})        ldb.add({"dn": "@PARTITION",            "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url],            "replicateEntries": ["@SUBCLASSES", "@ATTRIBUTES", "@INDEXLIST"]})    def setUp(self):        super(MapBaseTestCase, self).setUp()        def make_dn(basedn, rdn):            return rdn + ",sambaDomainName=TESTS," + basedn        def make_s4dn(basedn, rdn):            return rdn + "," + basedn        self.ldbfile = os.path.join(self.tempdir, "test.ldb")        self.ldburl = "tdb://" + self.ldbfile        tempdir = self.tempdir        print tempdir        class Target:            """Simple helper class that contains data for a specific SAM connection."""            def __init__(self, file, basedn, dn):                self.file = os.path.join(tempdir, file)                self.url = "tdb://" + self.file                self.basedn = basedn                self.substvars = {"BASEDN": self.basedn}                self.db = Ldb()                self._dn = dn            def dn(self, rdn):                return self._dn(rdn, self.basedn)            def connect(self):                return self.db.connect(self.url)        self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)        self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)        self.templates = Target("templates.ldb", "cn=templates", None)        self.samba3.connect()        self.templates.connect()        self.samba4.connect()    def tearDown(self):        os.unlink(self.ldbfile)        os.unlink(self.samba3.file)        os.unlink(self.templates.file)        os.unlink(self.samba4.file)        super(MapBaseTestCase, self).tearDown()class Samba3SamTestCase(MapBaseTestCase):    def setUp(self):        super(Samba3SamTestCase, self).setUp()        ldb = Ldb(self.ldburl)        self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())        self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())        ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()        ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))        self.setup_modules(ldb, self.samba3, self.samba4)        self.ldb = Ldb(self.ldburl)    def test_s3sam_search(self):        print "Looking up by non-mapped attribute"        msg = self.ldb.search(expression="(cn=Administrator)")        self.assertEquals(len(msg), 1)        self.assertEquals(msg[0]["cn"], "Administrator")        print "Looking up by mapped attribute"        msg = self.ldb.search(expression="(name=Backup Operators)")        self.assertEquals(len(msg), 1)        self.assertEquals(msg[0]["name"], "Backup Operators")        print "Looking up by old name of renamed attribute"        msg = self.ldb.search(expression="(displayName=Backup Operators)")        self.assertEquals(len(msg), 0)        print "Looking up mapped entry containing SID"        msg = self.ldb.search(expression="(cn=Replicator)")        self.assertEquals(len(msg), 1)        print msg[0].dn        self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")        self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")        print "Checking mapping of objectClass"        oc = set(msg[0]["objectClass"])        self.assertTrue(oc is not None)        for i in oc:            self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")        print "Looking up by objectClass"        msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")        self.assertEquals(len(msg), 2)        for i in range(len(msg)):            self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or                   (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))    def test_s3sam_modify(self):        print "Adding a record that will be fallbacked"        self.ldb.add({"dn": "cn=Foo",             "foo": "bar",             "blah": "Blie",             "cn": "Foo",             "showInAdvancedViewOnly": "TRUE"}            )        print "Checking for existence of record (local)"        # TODO: This record must be searched in the local database, which is currently only supported for base searches        # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]        # TODO: Actually, this version should work as well but doesn't...        #         #            msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly'])        self.assertEquals(len(msg), 1)        self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")        self.assertEquals(msg[0]["foo"], "bar")        self.assertEquals(msg[0]["blah"], "Blie")        print "Adding record that will be mapped"        self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",                 "objectClass": "user",                 "unixName": "bin",                 "sambaUnicodePwd": "geheim",                 "cn": "Niemand"})        print "Checking for existence of record (remote)"        msg = self.ldb.search(expression="(unixName=bin)",                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])        self.assertEquals(len(msg), 1)        self.assertEquals(msg[0]["cn"], "Niemand")        self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")        print "Checking for existence of record (local && remote)"        msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])        self.assertEquals(len(msg), 1)           # TODO: should check with more records        self.assertEquals(msg[0]["cn"], "Niemand")        self.assertEquals(msg[0]["unixName"], "bin")        self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")        print "Checking for existence of record (local || remote)"        msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])        print "got " + len(msg) + " replies"        self.assertEquals(len(msg), 1)        # TODO: should check with more records        self.assertEquals(msg[0]["cn"], "Niemand")        self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")        print "Checking for data in destination database"        msg = s3.db.search("(cn=Niemand)")        self.assertTrue(len(msg) >= 1)        self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")        self.assertEquals(msg[0]["displayName"], "Niemand")        print "Adding attribute..."        self.ldb.modify_ldif("""dn: cn=Niemand,cn=Users,dc=vernstok,dc=nlchangetype: modifyadd: descriptiondescription: Blah""")        print "Checking whether changes are still there..."        msg = self.ldb.search(expression="(cn=Niemand)")        self.assertTrue(len(msg) >= 1)        self.assertEquals(msg[0]["cn"], "Niemand")        self.assertEquals(msg[0]["description"], "Blah")        print "Modifying attribute..."        self.ldb.modify_ldif("""dn: cn=Niemand,cn=Users,dc=vernstok,dc=nlchangetype: modifyreplace: descriptiondescription: Blie""")        print "Checking whether changes are still there..."        msg = self.ldb.search(expression="(cn=Niemand)")        self.assertTrue(len(msg) >= 1)        self.assertEquals(msg[0]["description"], "Blie")        print "Deleting attribute..."        self.ldb.modify_ldif("""dn: cn=Niemand,cn=Users,dc=vernstok,dc=nlchangetype: modifydelete: description""")        print "Checking whether changes are no longer there..."        msg = self.ldb.search(expression="(cn=Niemand)")        self.assertTrue(len(msg) >= 1)        self.assertTrue(not "description" in res[0])        print "Renaming record..."        self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")        print "Checking whether DN has changed..."        msg = self.ldb.search(expression="(cn=Niemand2)")        self.assertEquals(len(msg), 1)        self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")        print "Deleting record..."        self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")        print "Checking whether record is gone..."        msg = self.ldb.search(expression="(cn=Niemand2)")        self.assertEquals(len(msg), 0)class MapTestCase(MapBaseTestCase):    def setUp(self):        super(MapTestCase, self).setUp()        ldb = Ldb(self.ldburl)        self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())        ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()        ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))        self.setup_modules(ldb, self.samba3, self.samba4)        self.ldb = Ldb(self.ldburl)    def test_map_search(self):        print "Running search tests on mapped data"        ldif = """dn: """ + "sambaDomainName=TESTS,""" + self.samba3.basedn + """objectclass: sambaDomainobjectclass: topsambaSID: S-1-5-21-4231626423-2410014848-2360679739sambaNextRid: 2000sambaDomainName: TESTS"""        self.samba3.db.add_ldif(substitute_var(ldif, self.samba3.substvars))        print "Add a set of split records"        ldif = """dn: """ + self.samba4.dn("cn=X") + """objectClass: usercn: XcodePage: xrevision: xdnsHostName: xnextRid: ylastLogon: xdescription: xobjectSid: S-1-5-21-4231626423-2410014848-2360679739-552primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512dn: """ + self.samba4.dn("cn=Y") + """objectClass: topcn: YcodePage: xrevision: xdnsHostName: ynextRid: ylastLogon: ydescription: xdn: """ + self.samba4.dn("cn=Z") + """objectClass: topcn: ZcodePage: xrevision: ydnsHostName: znextRid: ylastLogon: zdescription: y"""        self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))        print "Add a set of remote records"        ldif = """dn: """ + self.samba3.dn("cn=A") + """objectClass: posixAccountcn: AsambaNextRid: xsambaBadPasswordCount: xsambaLogonTime: xdescription: xsambaSID: S-1-5-21-4231626423-2410014848-2360679739-552sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512dn: """ + self.samba3.dn("cn=B") + """objectClass: topcn:BsambaNextRid: xsambaBadPasswordCount: xsambaLogonTime: ydescription: xdn: """ + self.samba3.dn("cn=C") + """objectClass: topcn: CsambaNextRid: xsambaBadPasswordCount: ysambaLogonTime: zdescription: y"""        self.samba3.add_ldif(substitute_var(ldif, self.samba3.substvars))        print "Testing search by DN"

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -