📄 reflection.py
字号:
import testenv; testenv.configure_for_tests()import StringIO, unicodedatafrom sqlalchemy import *from sqlalchemy import exceptionsfrom sqlalchemy import types as sqltypesfrom testlib import *from testlib import enginesclass ReflectionTest(TestBase, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1)) def test_basic_reflection(self): meta = MetaData(testing.db) users = Table('engine_users', meta, Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20), nullable=False), Column('test1', CHAR(5), nullable=False), Column('test2', Float(5), nullable=False), Column('test3', Text), Column('test4', Numeric, nullable = False), Column('test5', DateTime), Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')), Column('test6', DateTime, nullable=False), Column('test7', Text), Column('test8', Binary), Column('test_passivedefault2', Integer, PassiveDefault("5")), Column('test9', Binary(100)), Column('test_numeric', Numeric()), test_needs_fk=True, ) addresses = Table('engine_email_addresses', meta, Column('address_id', Integer, primary_key = True), Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(20)), test_needs_fk=True, ) meta.create_all() try: meta2 = MetaData() reflected_users = Table('engine_users', meta2, autoload=True, autoload_with=testing.db) reflected_addresses = Table('engine_email_addresses', meta2, autoload=True, autoload_with=testing.db) self.assert_tables_equal(users, reflected_users) self.assert_tables_equal(addresses, reflected_addresses) finally: addresses.drop() users.drop() def test_include_columns(self): meta = MetaData(testing.db) foo = Table('foo', meta, *[Column(n, String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']]) meta.create_all() try: meta2 = MetaData(testing.db) foo = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e']) # test that cols come back in original order self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f']) for c in ('b', 'f', 'e'): assert c in foo.c for c in ('a', 'c', 'd'): assert c not in foo.c # test against a table which is already reflected meta3 = MetaData(testing.db) foo = Table('foo', meta3, autoload=True) foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], useexisting=True) self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f']) for c in ('b', 'f', 'e'): assert c in foo.c for c in ('a', 'c', 'd'): assert c not in foo.c finally: meta.drop_all() def test_unknown_types(self): meta = MetaData(testing.db) t = Table("test", meta, Column('foo', DateTime)) import sys dialect_module = sys.modules[testing.db.dialect.__module__] # we're relying on the presence of "ischema_names" in the # dialect module, else we can't test this. we need to be able # to get the dialect to not be aware of some type so we temporarily # monkeypatch. not sure what a better way for this could be, # except for an established dialect hook or dialect-specific tests if not hasattr(dialect_module, 'ischema_names'): return ischema_names = dialect_module.ischema_names t.create() dialect_module.ischema_names = {} try: try: m2 = MetaData(testing.db) t2 = Table("test", m2, autoload=True) assert False except exceptions.SAWarning: assert True @testing.emits_warning('Did not recognize type') def warns(): m3 = MetaData(testing.db) t3 = Table("test", m3, autoload=True) assert t3.c.foo.type.__class__ == sqltypes.NullType finally: dialect_module.ischema_names = ischema_names t.drop() def test_basic_override(self): meta = MetaData(testing.db) table = Table( 'override_test', meta, Column('col1', Integer, primary_key=True), Column('col2', String(20)), Column('col3', Numeric) ) table.create() meta2 = MetaData(testing.db) try: table = Table( 'override_test', meta2, Column('col2', Unicode()), Column('col4', String(30)), autoload=True) self.assert_(isinstance(table.c.col1.type, Integer)) self.assert_(isinstance(table.c.col2.type, Unicode)) self.assert_(isinstance(table.c.col4.type, String)) finally: table.drop() def test_override_pkfk(self): """test that you can override columns which contain foreign keys to other reflected tables, where the foreign key column is also a primary key column""" meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), Column('name', String(30))) addresses = Table('addresses', meta, Column('id', Integer, primary_key=True), Column('street', String(30))) meta.create_all() try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, Column('id', Integer, ForeignKey('users.id'), primary_key=True), autoload=True) u2 = Table('users', meta2, autoload=True) assert list(a2.primary_key) == [a2.c.id] assert list(u2.primary_key) == [u2.c.id] assert u2.join(a2).onclause == u2.c.id==a2.c.id meta3 = MetaData(testing.db) u3 = Table('users', meta3, autoload=True) a3 = Table('addresses', meta3, Column('id', Integer, ForeignKey('users.id'), primary_key=True), autoload=True) assert list(a3.primary_key) == [a3.c.id] assert list(u3.primary_key) == [u3.c.id] assert u3.join(a3).onclause == u3.c.id==a3.c.id finally: meta.drop_all() def test_override_nonexistent_fk(self): """test that you can override columns and create new foreign keys to other reflected tables which have no foreign keys. this is common with MySQL MyISAM tables.""" meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), Column('name', String(30))) addresses = Table('addresses', meta, Column('id', Integer, primary_key=True), Column('street', String(30)), Column('user_id', Integer)) meta.create_all() try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, Column('user_id', Integer, ForeignKey('users.id')), autoload=True) u2 = Table('users', meta2, autoload=True) assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.foreign_keys) == 1 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id assert u2.join(a2).onclause == u2.c.id==a2.c.user_id meta3 = MetaData(testing.db) u3 = Table('users', meta3, autoload=True) a3 = Table('addresses', meta3, Column('user_id', Integer, ForeignKey('users.id')), autoload=True) assert u3.join(a3).onclause == u3.c.id==a3.c.user_id meta4 = MetaData(testing.db) u4 = Table('users', meta4, Column('id', Integer, key='u_id', primary_key=True), autoload=True) a4 = Table('addresses', meta4, Column('id', Integer, key='street', primary_key=True), Column('street', String(30), key='user_id'), Column('user_id', Integer, ForeignKey('users.u_id'), key='id'), autoload=True) assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id) assert list(u4.primary_key) == [u4.c.u_id] assert len(u4.columns) == 2 assert len(u4.constraints) == 1 assert len(a4.columns) == 3 assert len(a4.constraints) == 2 finally: meta.drop_all() def test_override_existing_fk(self): """test that you can override columns and specify new foreign keys to other reflected tables, on columns which *do* already have that foreign key, and that the FK is not duped. """ meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), Column('name', String(30)), test_needs_fk=True) addresses = Table('addresses', meta, Column('id', Integer,primary_key=True), Column('user_id', Integer, ForeignKey('users.id')), test_needs_fk=True) meta.create_all() try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, Column('user_id',Integer, ForeignKey('users.id')), autoload=True) u2 = Table('users', meta2, autoload=True) assert len(a2.foreign_keys) == 1 assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.constraints) == 2 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id assert u2.join(a2).onclause == u2.c.id==a2.c.user_id meta2 = MetaData(testing.db) u2 = Table('users', meta2, Column('id', Integer, primary_key=True), autoload=True) a2 = Table('addresses', meta2, Column('id', Integer, primary_key=True), Column('user_id',Integer, ForeignKey('users.id')), autoload=True) assert len(a2.foreign_keys) == 1 assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.constraints) == 2 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id assert u2.join(a2).onclause == u2.c.id==a2.c.user_id finally: meta.drop_all() def test_use_existing(self): meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), Column('name', String(30)), test_needs_fk=True) addresses = Table('addresses', meta, Column('id', Integer,primary_key=True), Column('user_id', Integer, ForeignKey('users.id')), Column('data', String(100)), test_needs_fk=True) meta.create_all() try: meta2 = MetaData(testing.db) addresses = Table('addresses', meta2, Column('data', Unicode), autoload=True) try: users = Table('users', meta2, Column('name', Unicode), autoload=True) assert False except exceptions.InvalidRequestError, err: assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." users = Table('users', meta2, Column('name', Unicode), autoload=True, useexisting=True) assert isinstance(users.c.name.type, Unicode) assert not users.quote users = Table('users', meta2, quote=True, autoload=True, useexisting=True) assert users.quote finally: meta.drop_all() def test_pks_not_uniques(self): """test that primary key reflection not tripped up by unique indexes""" testing.db.execute(""" CREATE TABLE book ( id INTEGER NOT NULL, title VARCHAR(100) NOT NULL, series INTEGER, series_id INTEGER, UNIQUE(series, series_id), PRIMARY KEY(id) )""") try: metadata = MetaData(bind=testing.db) book = Table('book', metadata, autoload=True) assert book.c.id in book.primary_key assert book.c.series not in book.primary_key assert len(book.primary_key) == 1 finally: testing.db.execute("drop table book") def test_fk_error(self): metadata = MetaData(testing.db) slots_table = Table('slots', metadata, Column('slot_id', Integer, primary_key=True), Column('pkg_id', Integer, ForeignKey('pkgs.pkg_id')), Column('slot', String(128)), ) try: metadata.create_all() assert False except exceptions.InvalidRequestError, err: assert str(err) == "Could not find table 'pkgs' with which to generate a foreign key" def test_composite_pks(self): """test reflection of a composite primary key""" testing.db.execute(""" CREATE TABLE book ( id INTEGER NOT NULL, isbn VARCHAR(50) NOT NULL, title VARCHAR(100) NOT NULL, series INTEGER, series_id INTEGER, UNIQUE(series, series_id), PRIMARY KEY(id, isbn)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -