⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reflection.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
        )""")        try:            metadata = MetaData(bind=testing.db)            book = Table('book', metadata, autoload=True)            assert book.c.id  in book.primary_key            assert book.c.isbn  in book.primary_key            assert book.c.series not in book.primary_key            assert len(book.primary_key) == 2        finally:            testing.db.execute("drop table book")    @testing.exclude('mysql', '<', (4, 1, 1))    def test_composite_fk(self):        """test reflection of composite foreign keys"""        meta = MetaData(testing.db)        multi = Table(            'multi', meta,            Column('multi_id', Integer, primary_key=True),            Column('multi_rev', Integer, primary_key=True),            Column('multi_hoho', Integer, primary_key=True),            Column('name', String(50), nullable=False),            Column('val', String(100)),            test_needs_fk=True,        )        multi2 = Table('multi2', meta,            Column('id', Integer, primary_key=True),            Column('foo', Integer),            Column('bar', Integer),            Column('lala', Integer),            Column('data', String(50)),            ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),            test_needs_fk=True,        )        meta.create_all()        try:            meta2 = MetaData()            table = Table('multi', meta2, autoload=True, autoload_with=testing.db)            table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db)            self.assert_tables_equal(multi, table)            self.assert_tables_equal(multi2, table2)            j = join(table, table2)            self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))        finally:            meta.drop_all()    @testing.unsupported('oracle')    def testreserved(self):        # check a table that uses an SQL reserved name doesn't cause an error        meta = MetaData(testing.db)        table_a = Table('select', meta,                       Column('not', Integer, primary_key=True),                       Column('from', String(12), nullable=False),                       UniqueConstraint('from', name='when'))        Index('where', table_a.c['from'])        # There's currently no way to calculate identifier case normalization        # in isolation, so...        if testing.against('firebird', 'oracle', 'maxdb'):            check_col = 'TRUE'        else:            check_col = 'true'        quoter = meta.bind.dialect.identifier_preparer.quote_identifier        table_b = Table('false', meta,                        Column('create', Integer, primary_key=True),                        Column('true', Integer, ForeignKey('select.not')),                        CheckConstraint('%s <> 1' % quoter(check_col),                                        name='limit'))        table_c = Table('is', meta,                        Column('or', Integer, nullable=False, primary_key=True),                        Column('join', Integer, nullable=False, primary_key=True),                        PrimaryKeyConstraint('or', 'join', name='to'))        index_c = Index('else', table_c.c.join)        meta.create_all()        index_c.drop()        meta2 = MetaData(testing.db)        try:            table_a2 = Table('select', meta2, autoload=True)            table_b2 = Table('false', meta2, autoload=True)            table_c2 = Table('is', meta2, autoload=True)        finally:            meta.drop_all()    def test_reflect_all(self):        existing = testing.db.table_names()        names = ['rt_%s' % name for name in ('a','b','c','d','e')]        nameset = set(names)        for name in names:            # be sure our starting environment is sane            self.assert_(name not in existing)        self.assert_('rt_f' not in existing)        baseline = MetaData(testing.db)        for name in names:            Table(name, baseline, Column('id', Integer, primary_key=True))        baseline.create_all()        try:            m1 = MetaData(testing.db)            self.assert_(not m1.tables)            m1.reflect()            self.assert_(nameset.issubset(set(m1.tables.keys())))            m2 = MetaData()            m2.reflect(testing.db, only=['rt_a', 'rt_b'])            self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b']))            m3 = MetaData()            c = testing.db.connect()            m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c')            self.assert_(set(m3.tables.keys()) == set(['rt_c']))            m4 = MetaData(testing.db)            try:                m4.reflect(only=['rt_a', 'rt_f'])                self.assert_(False)            except exceptions.InvalidRequestError, e:                self.assert_(e.args[0].endswith('(rt_f)'))            m5 = MetaData(testing.db)            m5.reflect(only=[])            self.assert_(not m5.tables)            m6 = MetaData(testing.db)            m6.reflect(only=lambda n, m: False)            self.assert_(not m6.tables)            m7 = MetaData(testing.db, reflect=True)            self.assert_(nameset.issubset(set(m7.tables.keys())))            try:                m8 = MetaData(reflect=True)                self.assert_(False)            except exceptions.ArgumentError, e:                self.assert_(                    e.args[0] ==                    "A bind must be supplied in conjunction with reflect=True")        finally:            baseline.drop_all()        if existing:            print "Other tables present in database, skipping some checks."        else:            m9 = MetaData(testing.db)            m9.reflect()            self.assert_(not m9.tables)class CreateDropTest(TestBase):    def setUpAll(self):        global metadata, users        metadata = MetaData()        users = Table('users', metadata,                      Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),                      Column('user_name', String(40)),                      )        addresses = Table('email_addresses', metadata,            Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True),            Column('user_id', Integer, ForeignKey(users.c.user_id)),            Column('email_address', String(40)),        )        orders = Table('orders', metadata,            Column('order_id', Integer, Sequence('order_id_seq', optional=True), primary_key = True),            Column('user_id', Integer, ForeignKey(users.c.user_id)),            Column('description', String(50)),            Column('isopen', Integer),        )        orderitems = Table('items', metadata,            Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True),            Column('order_id', INT, ForeignKey("orders")),            Column('item_name', VARCHAR(50)),        )    def test_sorter( self ):        tables = metadata.table_iterator(reverse=False)        table_names = [t.name for t in tables]        self.assert_( table_names == ['users', 'orders', 'items', 'email_addresses'] or table_names ==  ['users', 'email_addresses', 'orders', 'items'])    def testcheckfirst(self):        try:            assert not users.exists(testing.db)            users.create(bind=testing.db)            assert users.exists(testing.db)            users.create(bind=testing.db, checkfirst=True)            users.drop(bind=testing.db)            users.drop(bind=testing.db, checkfirst=True)            assert not users.exists(bind=testing.db)            users.create(bind=testing.db, checkfirst=True)            users.drop(bind=testing.db)        finally:            metadata.drop_all(bind=testing.db)    @testing.exclude('mysql', '<', (4, 1, 1))    def test_createdrop(self):        metadata.create_all(bind=testing.db)        self.assertEqual( testing.db.has_table('items'), True )        self.assertEqual( testing.db.has_table('email_addresses'), True )        metadata.create_all(bind=testing.db)        self.assertEqual( testing.db.has_table('items'), True )        metadata.drop_all(bind=testing.db)        self.assertEqual( testing.db.has_table('items'), False )        self.assertEqual( testing.db.has_table('email_addresses'), False )        metadata.drop_all(bind=testing.db)        self.assertEqual( testing.db.has_table('items'), False )    def test_tablenames(self):        from sqlalchemy.util import Set        metadata.create_all(bind=testing.db)        # we only check to see if all the explicitly created tables are there, rather than        # assertEqual -- the test db could have "extra" tables if there is a misconfigured        # template.  (*cough* tsearch2 w/ the pg windows installer.)        self.assert_(not Set(metadata.tables) - Set(testing.db.table_names()))        metadata.drop_all(bind=testing.db)class SchemaManipulationTest(TestBase):    def test_append_constraint_unique(self):        meta = MetaData()                users = Table('users', meta, Column('id', Integer))        addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer))                fk = ForeignKeyConstraint(['user_id'],[users.c.id])                addresses.append_constraint(fk)        addresses.append_constraint(fk)        assert len(addresses.c.user_id.foreign_keys) == 1        assert addresses.constraints == set([addresses.primary_key, fk])        class UnicodeReflectionTest(TestBase):    def test_basic(self):        try:            # the 'convert_unicode' should not get in the way of the reflection            # process.  reflecttable for oracle, postgres (others?) expect non-unicode            # strings in result sets/bind params            bind = engines.utf8_engine(options={'convert_unicode':True})            metadata = MetaData(bind)            if testing.against('sybase', 'maxdb', 'oracle'):                names = set(['plain'])            else:                names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66'])            for name in names:                Table(name, metadata, Column('id', Integer, Sequence(name + "_id_seq"), primary_key=True))            metadata.create_all()            reflected = set(bind.table_names())            if not names.issubset(reflected):                # Python source files in the utf-8 coding seem to normalize                # literals as NFC (and the above are explicitly NFC).  Maybe                # this database normalizes NFD on reflection.                nfc = set([unicodedata.normalize('NFC', n) for n in names])                self.assert_(nfc == names)                # Yep.  But still ensure that bulk reflection and create/drop                # work with either normalization.            r = MetaData(bind, reflect=True)            r.drop_all()            r.create_all()        finally:            metadata.drop_all()            bind.dispose()class SchemaTest(TestBase):    def test_iteration(self):        metadata = MetaData()        table1 = Table('table1', metadata,            Column('col1', Integer, primary_key=True),            schema='someschema')        table2 = Table('table2', metadata,            Column('col1', Integer, primary_key=True),            Column('col2', Integer, ForeignKey('someschema.table1.col1')),            schema='someschema')        # ensure this doesnt crash        print [t for t in metadata.table_iterator()]        buf = StringIO.StringIO()        def foo(s, p=None):            buf.write(s)        gen = create_engine(testing.db.name + "://", strategy="mock", executor=foo)        gen = gen.dialect.schemagenerator(gen.dialect, gen)        gen.traverse(table1)        gen.traverse(table2)        buf = buf.getvalue()        print buf        if testing.db.dialect.preparer(testing.db.dialect).omit_schema:            assert buf.index("CREATE TABLE table1") > -1            assert buf.index("CREATE TABLE table2") > -1        else:            assert buf.index("CREATE TABLE someschema.table1") > -1            assert buf.index("CREATE TABLE someschema.table2") > -1    @testing.unsupported('sqlite', 'firebird')    # fixme: revisit these below.    @testing.fails_on('oracle', 'mssql', 'sybase', 'access')    def test_explicit_default_schema(self):        engine = testing.db        if testing.against('mysql'):            schema = testing.db.url.database        elif testing.against('postgres'):            schema = 'public'        else:            schema = engine.dialect.get_default_schema_name(engine)        metadata = MetaData(engine)        table1 = Table('table1', metadata,                       Column('col1', Integer, primary_key=True),                       schema=schema)        table2 = Table('table2', metadata,                       Column('col1', Integer, primary_key=True),                       Column('col2', Integer,                              ForeignKey('%s.table1.col1' % schema)),                       schema=schema)        try:            metadata.create_all()            metadata.create_all(checkfirst=True)            metadata.clear()            table1 = Table('table1', metadata, autoload=True, schema=schema)            table2 = Table('table2', metadata, autoload=True, schema=schema)        finally:            metadata.drop_all()class HasSequenceTest(TestBase):    def setUpAll(self):        global metadata, users        metadata = MetaData()        users = Table('users', metadata,                      Column('user_id', Integer, Sequence('user_id_seq'), primary_key=True),                      Column('user_name', String(40)),                      )    @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase')    def test_hassequence(self):        metadata.create_all(bind=testing.db)        self.assertEqual(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)        metadata.drop_all(bind=testing.db)        self.assertEqual(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)if __name__ == "__main__":    testenv.main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -