⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 defaults.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
import testenv; testenv.configure_for_tests()import datetimefrom sqlalchemy import *from sqlalchemy import exceptions, schema, utilfrom sqlalchemy.orm import mapper, create_sessionfrom testlib import *class DefaultTest(TestBase):    def setUpAll(self):        global t, f, f2, ts, currenttime, metadata, default_generator        db = testing.db        metadata = MetaData(db)        default_generator = {'x':50}        def mydefault():            default_generator['x'] += 1            return default_generator['x']        def myupdate_with_ctx(ctx):            conn = ctx.connection            return conn.execute(select([text('13')])).scalar()        def mydefault_using_connection(ctx):            conn = ctx.connection            try:                return conn.execute(select([text('12')])).scalar()            finally:                # ensure a "close()" on this connection does nothing,                # since its a "branched" connection                conn.close()        use_function_defaults = testing.against('postgres', 'oracle')        is_oracle = testing.against('oracle')        # select "count(1)" returns different results on different DBs        # also correct for "current_date" compatible as column default, value differences        currenttime = func.current_date(type_=Date, bind=db)        if is_oracle:            ts = db.scalar(select([func.trunc(func.sysdate(), literal_column("'DAY'"), type_=Date).label('today')]))            assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime)            f = select([func.length('abcdef')], bind=db).scalar()            f2 = select([func.length('abcdefghijk')], bind=db).scalar()            # TODO: engine propigation across nested functions not working            currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db, type_=Date)            def1 = currenttime            def2 = func.trunc(text("sysdate"), literal_column("'DAY'"), type_=Date)            deftype = Date        elif use_function_defaults:            f = select([func.length('abcdef')], bind=db).scalar()            f2 = select([func.length('abcdefghijk')], bind=db).scalar()            def1 = currenttime            if testing.against('maxdb'):                def2 = text("curdate")            else:                def2 = text("current_date")            deftype = Date            ts = db.func.current_date().scalar()        else:            f = select([func.length('abcdef')], bind=db).scalar()            f2 = select([func.length('abcdefghijk')], bind=db).scalar()            def1 = def2 = "3"            ts = 3            deftype = Integer        t = Table('default_test1', metadata,            # python function            Column('col1', Integer, primary_key=True, default=mydefault),            # python literal            Column('col2', String(20), default="imthedefault", onupdate="im the update"),            # preexecute expression            Column('col3', Integer, default=func.length('abcdef'), onupdate=func.length('abcdefghijk')),            # SQL-side default from sql expression            Column('col4', deftype, PassiveDefault(def1)),            # SQL-side default from literal expression            Column('col5', deftype, PassiveDefault(def2)),            # preexecute + update timestamp            Column('col6', Date, default=currenttime, onupdate=currenttime),            Column('boolcol1', Boolean, default=True),            Column('boolcol2', Boolean, default=False),            # python function which uses ExecutionContext            Column('col7', Integer, default=mydefault_using_connection, onupdate=myupdate_with_ctx),            # python builtin            Column('col8', Date, default=datetime.date.today, onupdate=datetime.date.today)        )        t.create()    def tearDownAll(self):        t.drop()    def tearDown(self):        default_generator['x'] = 50        t.delete().execute()    def test_bad_argsignature(self):        ex_msg = \          "ColumnDefault Python function takes zero or one positional arguments"        def fn1(x, y): pass        def fn2(x, y, z=3): pass        class fn3(object):            def __init__(self, x, y):                pass        class FN4(object):            def __call__(self, x, y):                pass        fn4 = FN4()        for fn in fn1, fn2, fn3, fn4:            try:                c = ColumnDefault(fn)                assert False, str(fn)            except exceptions.ArgumentError, e:                assert str(e) == ex_msg    def test_argsignature(self):        def fn1(): pass        def fn2(): pass        def fn3(x=1): pass        def fn4(x=1, y=2, z=3): pass        fn5 = list        class fn6(object):            def __init__(self, x):                pass        class fn6(object):            def __init__(self, x, y=3):                pass        class FN7(object):            def __call__(self, x):                pass        fn7 = FN7()        class FN8(object):            def __call__(self, x, y=3):                pass        fn8 = FN8()        for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:            c = ColumnDefault(fn)    def teststandalone(self):        c = testing.db.engine.contextual_connect()        x = c.execute(t.c.col1.default)        y = t.c.col2.default.execute()        z = c.execute(t.c.col3.default)        self.assert_(50 <= x <= 57)        self.assert_(y == 'imthedefault')        self.assert_(z == f)        self.assert_(f2==11)    def testinsert(self):        r = t.insert().execute()        assert r.lastrow_has_defaults()        assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])        r = t.insert(inline=True).execute()        assert r.lastrow_has_defaults()        assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])        t.insert().execute()        t.insert().execute()        ctexec = select([currenttime.label('now')], bind=testing.db).scalar()        l = t.select().execute()        today = datetime.date.today()        self.assertEquals(l.fetchall(), [            (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),            (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),            (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),            (54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),            ])    def testinsertmany(self):        # MySQL-Python 1.2.2 breaks functions in execute_many :(        if (testing.against('mysql') and            testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):            return        r = t.insert().execute({}, {}, {})        ctexec = currenttime.scalar()        l = t.select().execute()        today = datetime.date.today()        self.assert_(l.fetchall() == [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today)])    def testinsertvalues(self):        t.insert(values={'col3':50}).execute()        l = t.select().execute()        self.assert_(l.fetchone()['col3'] == 50)    def testupdatemany(self):        # MySQL-Python 1.2.2 breaks functions in execute_many :(        if (testing.against('mysql') and            testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):            return        t.insert().execute({}, {}, {})        t.update(t.c.col1==bindparam('pkval')).execute(            {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False},        )        t.update(t.c.col1==bindparam('pkval')).execute(            {'pkval':51,},            {'pkval':52,},            {'pkval':53,},        )        l = t.select().execute()        ctexec = currenttime.scalar()        today = datetime.date.today()        self.assert_(l.fetchall() == [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today), (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today), (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today)])    def testupdate(self):        r = t.insert().execute()        pk = r.last_inserted_ids()[0]        t.update(t.c.col1==pk).execute(col4=None, col5=None)        ctexec = currenttime.scalar()        l = t.select(t.c.col1==pk).execute()        l = l.fetchone()        self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today()))        self.assert_(f2==11)    def testupdatevalues(self):        r = t.insert().execute()        pk = r.last_inserted_ids()[0]        t.update(t.c.col1==pk, values={'col3': 55}).execute()        l = t.select(t.c.col1==pk).execute()        l = l.fetchone()        self.assert_(l['col3'] == 55)    @testing.fails_on_everything_except('postgres')    def testpassiveoverride(self):        """primarily for postgres, tests that when we get a primary key column back        from reflecting a table which has a default value on it, we pre-execute        that PassiveDefault upon insert, even though PassiveDefault says        "let the database execute this", because in postgres we must have all the primary        key values in memory before insert; otherwise we cant locate the just inserted row."""        try:            meta = MetaData(testing.db)            testing.db.execute("""

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -