📄 samba3sam.py
字号:
#!/usr/bin/python# Unix SMB/CIFS implementation.# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007# Copyright (C) Martin Kuehl <mkhl@samba.org> 2006## This is a Python port of the original in testprogs/ejs/samba3sam.js# # This program is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 3 of the License, or# (at your option) any later version.# # This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.# # You should have received a copy of the GNU General Public License# along with this program. If not, see <http://www.gnu.org/licenses/>.#"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""import osimport sysimport sambaimport ldbfrom ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREEfrom samba import Ldb, substitute_varfrom samba.tests import LdbTestCase, TestCaseInTempDirdatadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")class MapBaseTestCase(TestCaseInTempDir): def setup_data(self, obj, ldif): self.assertTrue(ldif is not None) obj.db.add_ldif(substitute_var(ldif, obj.substvars)) def setup_modules(self, ldb, s3, s4): ldb.add({"dn": "@MAP=samba3sam", "@FROM": s4.basedn, "@TO": "sambaDomainName=TESTS," + s3.basedn}) ldb.add({"dn": "@MODULES", "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"}) ldb.add({"dn": "@PARTITION", "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url], "replicateEntries": ["@SUBCLASSES", "@ATTRIBUTES", "@INDEXLIST"]}) def setUp(self): super(MapBaseTestCase, self).setUp() def make_dn(basedn, rdn): return rdn + ",sambaDomainName=TESTS," + basedn def make_s4dn(basedn, rdn): return rdn + "," + basedn self.ldbfile = os.path.join(self.tempdir, "test.ldb") self.ldburl = "tdb://" + self.ldbfile tempdir = self.tempdir print tempdir class Target: """Simple helper class that contains data for a specific SAM connection.""" def __init__(self, file, basedn, dn): self.file = os.path.join(tempdir, file) self.url = "tdb://" + self.file self.basedn = basedn self.substvars = {"BASEDN": self.basedn} self.db = Ldb() self._dn = dn def dn(self, rdn): return self._dn(rdn, self.basedn) def connect(self): return self.db.connect(self.url) self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) self.templates = Target("templates.ldb", "cn=templates", None) self.samba3.connect() self.templates.connect() self.samba4.connect() def tearDown(self): os.unlink(self.ldbfile) os.unlink(self.samba3.file) os.unlink(self.templates.file) os.unlink(self.samba4.file) super(MapBaseTestCase, self).tearDown()class Samba3SamTestCase(MapBaseTestCase): def setUp(self): super(Samba3SamTestCase, self).setUp() ldb = Ldb(self.ldburl) self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read()) self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) self.ldb = Ldb(self.ldburl) def test_s3sam_search(self): print "Looking up by non-mapped attribute" msg = self.ldb.search(expression="(cn=Administrator)") self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["cn"], "Administrator") print "Looking up by mapped attribute" msg = self.ldb.search(expression="(name=Backup Operators)") self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["name"], "Backup Operators") print "Looking up by old name of renamed attribute" msg = self.ldb.search(expression="(displayName=Backup Operators)") self.assertEquals(len(msg), 0) print "Looking up mapped entry containing SID" msg = self.ldb.search(expression="(cn=Replicator)") self.assertEquals(len(msg), 1) print msg[0].dn self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl") self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552") print "Checking mapping of objectClass" oc = set(msg[0]["objectClass"]) self.assertTrue(oc is not None) for i in oc: self.assertEquals(oc[i] == "posixGroup" or oc[i], "group") print "Looking up by objectClass" msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))") self.assertEquals(len(msg), 2) for i in range(len(msg)): self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl")) def test_s3sam_modify(self): print "Adding a record that will be fallbacked" self.ldb.add({"dn": "cn=Foo", "foo": "bar", "blah": "Blie", "cn": "Foo", "showInAdvancedViewOnly": "TRUE"} ) print "Checking for existence of record (local)" # TODO: This record must be searched in the local database, which is currently only supported for base searches # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')] # TODO: Actually, this version should work as well but doesn't... # # msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE") self.assertEquals(msg[0]["foo"], "bar") self.assertEquals(msg[0]["blah"], "Blie") print "Adding record that will be mapped" self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", "objectClass": "user", "unixName": "bin", "sambaUnicodePwd": "geheim", "cn": "Niemand"}) print "Checking for existence of record (remote)" msg = self.ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) self.assertEquals(len(msg), 1) self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") print "Checking for existence of record (local && remote)" msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) self.assertEquals(len(msg), 1) # TODO: should check with more records self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["unixName"], "bin") self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim") print "Checking for existence of record (local || remote)" msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", attrs=['unixName','cn','dn', 'sambaUnicodePwd']) print "got " + len(msg) + " replies" self.assertEquals(len(msg), 1) # TODO: should check with more records self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim") print "Checking for data in destination database" msg = s3.db.search("(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001") self.assertEquals(msg[0]["displayName"], "Niemand") print "Adding attribute..." self.ldb.modify_ldif("""dn: cn=Niemand,cn=Users,dc=vernstok,dc=nlchangetype: modifyadd: descriptiondescription: Blah""") print "Checking whether changes are still there..." msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["cn"], "Niemand") self.assertEquals(msg[0]["description"], "Blah") print "Modifying attribute..." self.ldb.modify_ldif("""dn: cn=Niemand,cn=Users,dc=vernstok,dc=nlchangetype: modifyreplace: descriptiondescription: Blie""") print "Checking whether changes are still there..." msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertEquals(msg[0]["description"], "Blie") print "Deleting attribute..." self.ldb.modify_ldif("""dn: cn=Niemand,cn=Users,dc=vernstok,dc=nlchangetype: modifydelete: description""") print "Checking whether changes are no longer there..." msg = self.ldb.search(expression="(cn=Niemand)") self.assertTrue(len(msg) >= 1) self.assertTrue(not "description" in res[0]) print "Renaming record..." self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") print "Checking whether DN has changed..." msg = self.ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 1) self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") print "Deleting record..." self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") print "Checking whether record is gone..." msg = self.ldb.search(expression="(cn=Niemand2)") self.assertEquals(len(msg), 0)class MapTestCase(MapBaseTestCase): def setUp(self): super(MapTestCase, self).setUp() ldb = Ldb(self.ldburl) self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read()) ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read() ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) self.setup_modules(ldb, self.samba3, self.samba4) self.ldb = Ldb(self.ldburl) def test_map_search(self): print "Running search tests on mapped data" ldif = """dn: """ + "sambaDomainName=TESTS,""" + self.samba3.basedn + """objectclass: sambaDomainobjectclass: topsambaSID: S-1-5-21-4231626423-2410014848-2360679739sambaNextRid: 2000sambaDomainName: TESTS""" self.samba3.db.add_ldif(substitute_var(ldif, self.samba3.substvars)) print "Add a set of split records" ldif = """dn: """ + self.samba4.dn("cn=X") + """objectClass: usercn: XcodePage: xrevision: xdnsHostName: xnextRid: ylastLogon: xdescription: xobjectSid: S-1-5-21-4231626423-2410014848-2360679739-552primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512dn: """ + self.samba4.dn("cn=Y") + """objectClass: topcn: YcodePage: xrevision: xdnsHostName: ynextRid: ylastLogon: ydescription: xdn: """ + self.samba4.dn("cn=Z") + """objectClass: topcn: ZcodePage: xrevision: ydnsHostName: znextRid: ylastLogon: zdescription: y""" self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars)) print "Add a set of remote records" ldif = """dn: """ + self.samba3.dn("cn=A") + """objectClass: posixAccountcn: AsambaNextRid: xsambaBadPasswordCount: xsambaLogonTime: xdescription: xsambaSID: S-1-5-21-4231626423-2410014848-2360679739-552sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512dn: """ + self.samba3.dn("cn=B") + """objectClass: topcn:BsambaNextRid: xsambaBadPasswordCount: xsambaLogonTime: ydescription: xdn: """ + self.samba3.dn("cn=C") + """objectClass: topcn: CsambaNextRid: xsambaBadPasswordCount: ysambaLogonTime: zdescription: y""" self.samba3.add_ldif(substitute_var(ldif, self.samba3.substvars)) print "Testing search by DN"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -