📄 defaults.py
字号:
import testenv; testenv.configure_for_tests()import datetimefrom sqlalchemy import *from sqlalchemy import exceptions, schema, utilfrom sqlalchemy.orm import mapper, create_sessionfrom testlib import *class DefaultTest(TestBase): def setUpAll(self): global t, f, f2, ts, currenttime, metadata, default_generator db = testing.db metadata = MetaData(db) default_generator = {'x':50} def mydefault(): default_generator['x'] += 1 return default_generator['x'] def myupdate_with_ctx(ctx): conn = ctx.connection return conn.execute(select([text('13')])).scalar() def mydefault_using_connection(ctx): conn = ctx.connection try: return conn.execute(select([text('12')])).scalar() finally: # ensure a "close()" on this connection does nothing, # since its a "branched" connection conn.close() use_function_defaults = testing.against('postgres', 'oracle') is_oracle = testing.against('oracle') # select "count(1)" returns different results on different DBs # also correct for "current_date" compatible as column default, value differences currenttime = func.current_date(type_=Date, bind=db) if is_oracle: ts = db.scalar(select([func.trunc(func.sysdate(), literal_column("'DAY'"), type_=Date).label('today')])) assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime) f = select([func.length('abcdef')], bind=db).scalar() f2 = select([func.length('abcdefghijk')], bind=db).scalar() # TODO: engine propigation across nested functions not working currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db, type_=Date) def1 = currenttime def2 = func.trunc(text("sysdate"), literal_column("'DAY'"), type_=Date) deftype = Date elif use_function_defaults: f = select([func.length('abcdef')], bind=db).scalar() f2 = select([func.length('abcdefghijk')], bind=db).scalar() def1 = currenttime if testing.against('maxdb'): def2 = text("curdate") else: def2 = text("current_date") deftype = Date ts = db.func.current_date().scalar() else: f = select([func.length('abcdef')], bind=db).scalar() f2 = select([func.length('abcdefghijk')], bind=db).scalar() def1 = def2 = "3" ts = 3 deftype = Integer t = Table('default_test1', metadata, # python function Column('col1', Integer, primary_key=True, default=mydefault), # python literal Column('col2', String(20), default="imthedefault", onupdate="im the update"), # preexecute expression Column('col3', Integer, default=func.length('abcdef'), onupdate=func.length('abcdefghijk')), # SQL-side default from sql expression Column('col4', deftype, PassiveDefault(def1)), # SQL-side default from literal expression Column('col5', deftype, PassiveDefault(def2)), # preexecute + update timestamp Column('col6', Date, default=currenttime, onupdate=currenttime), Column('boolcol1', Boolean, default=True), Column('boolcol2', Boolean, default=False), # python function which uses ExecutionContext Column('col7', Integer, default=mydefault_using_connection, onupdate=myupdate_with_ctx), # python builtin Column('col8', Date, default=datetime.date.today, onupdate=datetime.date.today) ) t.create() def tearDownAll(self): t.drop() def tearDown(self): default_generator['x'] = 50 t.delete().execute() def test_bad_argsignature(self): ex_msg = \ "ColumnDefault Python function takes zero or one positional arguments" def fn1(x, y): pass def fn2(x, y, z=3): pass class fn3(object): def __init__(self, x, y): pass class FN4(object): def __call__(self, x, y): pass fn4 = FN4() for fn in fn1, fn2, fn3, fn4: try: c = ColumnDefault(fn) assert False, str(fn) except exceptions.ArgumentError, e: assert str(e) == ex_msg def test_argsignature(self): def fn1(): pass def fn2(): pass def fn3(x=1): pass def fn4(x=1, y=2, z=3): pass fn5 = list class fn6(object): def __init__(self, x): pass class fn6(object): def __init__(self, x, y=3): pass class FN7(object): def __call__(self, x): pass fn7 = FN7() class FN8(object): def __call__(self, x, y=3): pass fn8 = FN8() for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8: c = ColumnDefault(fn) def teststandalone(self): c = testing.db.engine.contextual_connect() x = c.execute(t.c.col1.default) y = t.c.col2.default.execute() z = c.execute(t.c.col3.default) self.assert_(50 <= x <= 57) self.assert_(y == 'imthedefault') self.assert_(z == f) self.assert_(f2==11) def testinsert(self): r = t.insert().execute() assert r.lastrow_has_defaults() assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]) r = t.insert(inline=True).execute() assert r.lastrow_has_defaults() assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]) t.insert().execute() t.insert().execute() ctexec = select([currenttime.label('now')], bind=testing.db).scalar() l = t.select().execute() today = datetime.date.today() self.assertEquals(l.fetchall(), [ (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), ]) def testinsertmany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( if (testing.against('mysql') and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): return r = t.insert().execute({}, {}, {}) ctexec = currenttime.scalar() l = t.select().execute() today = datetime.date.today() self.assert_(l.fetchall() == [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today)]) def testinsertvalues(self): t.insert(values={'col3':50}).execute() l = t.select().execute() self.assert_(l.fetchone()['col3'] == 50) def testupdatemany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( if (testing.against('mysql') and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): return t.insert().execute({}, {}, {}) t.update(t.c.col1==bindparam('pkval')).execute( {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False}, ) t.update(t.c.col1==bindparam('pkval')).execute( {'pkval':51,}, {'pkval':52,}, {'pkval':53,}, ) l = t.select().execute() ctexec = currenttime.scalar() today = datetime.date.today() self.assert_(l.fetchall() == [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today), (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today), (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today)]) def testupdate(self): r = t.insert().execute() pk = r.last_inserted_ids()[0] t.update(t.c.col1==pk).execute(col4=None, col5=None) ctexec = currenttime.scalar() l = t.select(t.c.col1==pk).execute() l = l.fetchone() self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today())) self.assert_(f2==11) def testupdatevalues(self): r = t.insert().execute() pk = r.last_inserted_ids()[0] t.update(t.c.col1==pk, values={'col3': 55}).execute() l = t.select(t.c.col1==pk).execute() l = l.fetchone() self.assert_(l['col3'] == 55) @testing.fails_on_everything_except('postgres') def testpassiveoverride(self): """primarily for postgres, tests that when we get a primary key column back from reflecting a table which has a default value on it, we pre-execute that PassiveDefault upon insert, even though PassiveDefault says "let the database execute this", because in postgres we must have all the primary key values in memory before insert; otherwise we cant locate the just inserted row.""" try: meta = MetaData(testing.db) testing.db.execute("""
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -