📄 reflection.py
字号:
)""") try: metadata = MetaData(bind=testing.db) book = Table('book', metadata, autoload=True) assert book.c.id in book.primary_key assert book.c.isbn in book.primary_key assert book.c.series not in book.primary_key assert len(book.primary_key) == 2 finally: testing.db.execute("drop table book") @testing.exclude('mysql', '<', (4, 1, 1)) def test_composite_fk(self): """test reflection of composite foreign keys""" meta = MetaData(testing.db) multi = Table( 'multi', meta, Column('multi_id', Integer, primary_key=True), Column('multi_rev', Integer, primary_key=True), Column('multi_hoho', Integer, primary_key=True), Column('name', String(50), nullable=False), Column('val', String(100)), test_needs_fk=True, ) multi2 = Table('multi2', meta, Column('id', Integer, primary_key=True), Column('foo', Integer), Column('bar', Integer), Column('lala', Integer), Column('data', String(50)), ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), test_needs_fk=True, ) meta.create_all() try: meta2 = MetaData() table = Table('multi', meta2, autoload=True, autoload_with=testing.db) table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db) self.assert_tables_equal(multi, table) self.assert_tables_equal(multi2, table2) j = join(table, table2) self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause)) finally: meta.drop_all() @testing.unsupported('oracle') def testreserved(self): # check a table that uses an SQL reserved name doesn't cause an error meta = MetaData(testing.db) table_a = Table('select', meta, Column('not', Integer, primary_key=True), Column('from', String(12), nullable=False), UniqueConstraint('from', name='when')) Index('where', table_a.c['from']) # There's currently no way to calculate identifier case normalization # in isolation, so... if testing.against('firebird', 'oracle', 'maxdb'): check_col = 'TRUE' else: check_col = 'true' quoter = meta.bind.dialect.identifier_preparer.quote_identifier table_b = Table('false', meta, Column('create', Integer, primary_key=True), Column('true', Integer, ForeignKey('select.not')), CheckConstraint('%s <> 1' % quoter(check_col), name='limit')) table_c = Table('is', meta, Column('or', Integer, nullable=False, primary_key=True), Column('join', Integer, nullable=False, primary_key=True), PrimaryKeyConstraint('or', 'join', name='to')) index_c = Index('else', table_c.c.join) meta.create_all() index_c.drop() meta2 = MetaData(testing.db) try: table_a2 = Table('select', meta2, autoload=True) table_b2 = Table('false', meta2, autoload=True) table_c2 = Table('is', meta2, autoload=True) finally: meta.drop_all() def test_reflect_all(self): existing = testing.db.table_names() names = ['rt_%s' % name for name in ('a','b','c','d','e')] nameset = set(names) for name in names: # be sure our starting environment is sane self.assert_(name not in existing) self.assert_('rt_f' not in existing) baseline = MetaData(testing.db) for name in names: Table(name, baseline, Column('id', Integer, primary_key=True)) baseline.create_all() try: m1 = MetaData(testing.db) self.assert_(not m1.tables) m1.reflect() self.assert_(nameset.issubset(set(m1.tables.keys()))) m2 = MetaData() m2.reflect(testing.db, only=['rt_a', 'rt_b']) self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b'])) m3 = MetaData() c = testing.db.connect() m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c') self.assert_(set(m3.tables.keys()) == set(['rt_c'])) m4 = MetaData(testing.db) try: m4.reflect(only=['rt_a', 'rt_f']) self.assert_(False) except exceptions.InvalidRequestError, e: self.assert_(e.args[0].endswith('(rt_f)')) m5 = MetaData(testing.db) m5.reflect(only=[]) self.assert_(not m5.tables) m6 = MetaData(testing.db) m6.reflect(only=lambda n, m: False) self.assert_(not m6.tables) m7 = MetaData(testing.db, reflect=True) self.assert_(nameset.issubset(set(m7.tables.keys()))) try: m8 = MetaData(reflect=True) self.assert_(False) except exceptions.ArgumentError, e: self.assert_( e.args[0] == "A bind must be supplied in conjunction with reflect=True") finally: baseline.drop_all() if existing: print "Other tables present in database, skipping some checks." else: m9 = MetaData(testing.db) m9.reflect() self.assert_(not m9.tables)class CreateDropTest(TestBase): def setUpAll(self): global metadata, users metadata = MetaData() users = Table('users', metadata, Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True), Column('user_name', String(40)), ) addresses = Table('email_addresses', metadata, Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True), Column('user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(40)), ) orders = Table('orders', metadata, Column('order_id', Integer, Sequence('order_id_seq', optional=True), primary_key = True), Column('user_id', Integer, ForeignKey(users.c.user_id)), Column('description', String(50)), Column('isopen', Integer), ) orderitems = Table('items', metadata, Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True), Column('order_id', INT, ForeignKey("orders")), Column('item_name', VARCHAR(50)), ) def test_sorter( self ): tables = metadata.table_iterator(reverse=False) table_names = [t.name for t in tables] self.assert_( table_names == ['users', 'orders', 'items', 'email_addresses'] or table_names == ['users', 'email_addresses', 'orders', 'items']) def testcheckfirst(self): try: assert not users.exists(testing.db) users.create(bind=testing.db) assert users.exists(testing.db) users.create(bind=testing.db, checkfirst=True) users.drop(bind=testing.db) users.drop(bind=testing.db, checkfirst=True) assert not users.exists(bind=testing.db) users.create(bind=testing.db, checkfirst=True) users.drop(bind=testing.db) finally: metadata.drop_all(bind=testing.db) @testing.exclude('mysql', '<', (4, 1, 1)) def test_createdrop(self): metadata.create_all(bind=testing.db) self.assertEqual( testing.db.has_table('items'), True ) self.assertEqual( testing.db.has_table('email_addresses'), True ) metadata.create_all(bind=testing.db) self.assertEqual( testing.db.has_table('items'), True ) metadata.drop_all(bind=testing.db) self.assertEqual( testing.db.has_table('items'), False ) self.assertEqual( testing.db.has_table('email_addresses'), False ) metadata.drop_all(bind=testing.db) self.assertEqual( testing.db.has_table('items'), False ) def test_tablenames(self): from sqlalchemy.util import Set metadata.create_all(bind=testing.db) # we only check to see if all the explicitly created tables are there, rather than # assertEqual -- the test db could have "extra" tables if there is a misconfigured # template. (*cough* tsearch2 w/ the pg windows installer.) self.assert_(not Set(metadata.tables) - Set(testing.db.table_names())) metadata.drop_all(bind=testing.db)class SchemaManipulationTest(TestBase): def test_append_constraint_unique(self): meta = MetaData() users = Table('users', meta, Column('id', Integer)) addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer)) fk = ForeignKeyConstraint(['user_id'],[users.c.id]) addresses.append_constraint(fk) addresses.append_constraint(fk) assert len(addresses.c.user_id.foreign_keys) == 1 assert addresses.constraints == set([addresses.primary_key, fk]) class UnicodeReflectionTest(TestBase): def test_basic(self): try: # the 'convert_unicode' should not get in the way of the reflection # process. reflecttable for oracle, postgres (others?) expect non-unicode # strings in result sets/bind params bind = engines.utf8_engine(options={'convert_unicode':True}) metadata = MetaData(bind) if testing.against('sybase', 'maxdb', 'oracle'): names = set(['plain']) else: names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66']) for name in names: Table(name, metadata, Column('id', Integer, Sequence(name + "_id_seq"), primary_key=True)) metadata.create_all() reflected = set(bind.table_names()) if not names.issubset(reflected): # Python source files in the utf-8 coding seem to normalize # literals as NFC (and the above are explicitly NFC). Maybe # this database normalizes NFD on reflection. nfc = set([unicodedata.normalize('NFC', n) for n in names]) self.assert_(nfc == names) # Yep. But still ensure that bulk reflection and create/drop # work with either normalization. r = MetaData(bind, reflect=True) r.drop_all() r.create_all() finally: metadata.drop_all() bind.dispose()class SchemaTest(TestBase): def test_iteration(self): metadata = MetaData() table1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), schema='someschema') table2 = Table('table2', metadata, Column('col1', Integer, primary_key=True), Column('col2', Integer, ForeignKey('someschema.table1.col1')), schema='someschema') # ensure this doesnt crash print [t for t in metadata.table_iterator()] buf = StringIO.StringIO() def foo(s, p=None): buf.write(s) gen = create_engine(testing.db.name + "://", strategy="mock", executor=foo) gen = gen.dialect.schemagenerator(gen.dialect, gen) gen.traverse(table1) gen.traverse(table2) buf = buf.getvalue() print buf if testing.db.dialect.preparer(testing.db.dialect).omit_schema: assert buf.index("CREATE TABLE table1") > -1 assert buf.index("CREATE TABLE table2") > -1 else: assert buf.index("CREATE TABLE someschema.table1") > -1 assert buf.index("CREATE TABLE someschema.table2") > -1 @testing.unsupported('sqlite', 'firebird') # fixme: revisit these below. @testing.fails_on('oracle', 'mssql', 'sybase', 'access') def test_explicit_default_schema(self): engine = testing.db if testing.against('mysql'): schema = testing.db.url.database elif testing.against('postgres'): schema = 'public' else: schema = engine.dialect.get_default_schema_name(engine) metadata = MetaData(engine) table1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), schema=schema) table2 = Table('table2', metadata, Column('col1', Integer, primary_key=True), Column('col2', Integer, ForeignKey('%s.table1.col1' % schema)), schema=schema) try: metadata.create_all() metadata.create_all(checkfirst=True) metadata.clear() table1 = Table('table1', metadata, autoload=True, schema=schema) table2 = Table('table2', metadata, autoload=True, schema=schema) finally: metadata.drop_all()class HasSequenceTest(TestBase): def setUpAll(self): global metadata, users metadata = MetaData() users = Table('users', metadata, Column('user_id', Integer, Sequence('user_id_seq'), primary_key=True), Column('user_name', String(40)), ) @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase') def test_hassequence(self): metadata.create_all(bind=testing.db) self.assertEqual(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) metadata.drop_all(bind=testing.db) self.assertEqual(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)if __name__ == "__main__": testenv.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -