📄 testtypes.py
字号:
import testenv; testenv.configure_for_tests()import datetime, os, pickleable, refrom sqlalchemy import *from sqlalchemy import exceptions, types, utilfrom sqlalchemy.sql import operatorsimport sqlalchemy.engine.url as urlfrom sqlalchemy.databases import mssql, oracle, mysql, postgres, firebirdfrom testlib import *class AdaptTest(TestBase): def testadapt(self): e1 = url.URL('postgres').get_dialect()() e2 = url.URL('mysql').get_dialect()() e3 = url.URL('sqlite').get_dialect()() e4 = url.URL('firebird').get_dialect()() type = String(40) t1 = type.dialect_impl(e1) t2 = type.dialect_impl(e2) t3 = type.dialect_impl(e3) t4 = type.dialect_impl(e4) impls = [t1, t2, t3, t4] for i,ta in enumerate(impls): for j,tb in enumerate(impls): if i == j: assert ta == tb # call me paranoid... :) else: assert ta != tb def testmsnvarchar(self): dialect = mssql.MSSQLDialect() # run the test twice to insure the caching step works too for x in range(0, 1): col = Column('', Unicode(length=10)) dialect_type = col.type.dialect_impl(dialect) assert isinstance(dialect_type, mssql.MSNVarchar) assert dialect_type.get_col_spec() == 'NVARCHAR(10)' def testoracletext(self): dialect = oracle.OracleDialect() class MyDecoratedType(types.TypeDecorator): impl = String def copy(self): return MyDecoratedType() col = Column('', MyDecoratedType) dialect_type = col.type.dialect_impl(dialect) assert isinstance(dialect_type.impl, oracle.OracleText), repr(dialect_type.impl) def testoracletimestamp(self): dialect = oracle.OracleDialect() t1 = oracle.OracleTimestamp t2 = oracle.OracleTimestamp() t3 = types.TIMESTAMP assert isinstance(dialect.type_descriptor(t1), oracle.OracleTimestamp) assert isinstance(dialect.type_descriptor(t2), oracle.OracleTimestamp) assert isinstance(dialect.type_descriptor(t3), oracle.OracleTimestamp) def testmysqlbinary(self): dialect = mysql.MySQLDialect() t1 = mysql.MSVarBinary t2 = mysql.MSVarBinary() assert isinstance(dialect.type_descriptor(t1), mysql.MSVarBinary) assert isinstance(dialect.type_descriptor(t2), mysql.MSVarBinary) def teststringadapt(self): """test that String with no size becomes TEXT, *all* others stay as varchar/String""" oracle_dialect = oracle.OracleDialect() mysql_dialect = mysql.MySQLDialect() postgres_dialect = postgres.PGDialect() firebird_dialect = firebird.FBDialect() for dialect, start, test in [ (oracle_dialect, String(), oracle.OracleText), (oracle_dialect, VARCHAR(), oracle.OracleString), (oracle_dialect, String(50), oracle.OracleString), (oracle_dialect, Unicode(), oracle.OracleText), (oracle_dialect, UnicodeText(), oracle.OracleText), (oracle_dialect, NCHAR(), oracle.OracleString), (mysql_dialect, String(), mysql.MSText), (mysql_dialect, VARCHAR(), mysql.MSString), (mysql_dialect, String(50), mysql.MSString), (mysql_dialect, Unicode(), mysql.MSText), (mysql_dialect, UnicodeText(), mysql.MSText), (mysql_dialect, NCHAR(), mysql.MSNChar), (postgres_dialect, String(), postgres.PGText), (postgres_dialect, VARCHAR(), postgres.PGString), (postgres_dialect, String(50), postgres.PGString), (postgres_dialect, Unicode(), postgres.PGText), (postgres_dialect, UnicodeText(), postgres.PGText), (postgres_dialect, NCHAR(), postgres.PGString), (firebird_dialect, String(), firebird.FBText), (firebird_dialect, VARCHAR(), firebird.FBString), (firebird_dialect, String(50), firebird.FBString), (firebird_dialect, Unicode(), firebird.FBText), (firebird_dialect, UnicodeText(), firebird.FBText), (firebird_dialect, NCHAR(), firebird.FBString), ]: assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))class UserDefinedTest(TestBase): """tests user-defined types.""" def testbasic(self): print users.c.goofy4.type print users.c.goofy4.type.dialect_impl(testing.db.dialect) print users.c.goofy4.type.dialect_impl(testing.db.dialect).get_col_spec() def testprocessing(self): global users users.insert().execute(user_id = 2, goofy = 'jack', goofy2='jack', goofy3='jack', goofy4=u'jack', goofy5=u'jack', goofy6='jack', goofy7=u'jack', goofy8=12, goofy9=12) users.insert().execute(user_id = 3, goofy = 'lala', goofy2='lala', goofy3='lala', goofy4=u'lala', goofy5=u'lala', goofy6='lala', goofy7=u'lala', goofy8=15, goofy9=15) users.insert().execute(user_id = 4, goofy = 'fred', goofy2='fred', goofy3='fred', goofy4=u'fred', goofy5=u'fred', goofy6='fred', goofy7=u'fred', goofy8=9, goofy9=9) l = users.select().execute().fetchall() for assertstr, assertint, assertint2, row in zip( ["BIND_INjackBIND_OUT", "BIND_INlalaBIND_OUT", "BIND_INfredBIND_OUT"], [1200, 1500, 900], [1800, 2250, 1350], l ): for col in row[1:8]: self.assertEquals(col, assertstr) self.assertEquals(row[8], assertint) self.assertEquals(row[9], assertint2) for col in (row[4], row[5], row[7]): assert isinstance(col, unicode) def setUpAll(self): global users, metadata class MyType(types.TypeEngine): def get_col_spec(self): return "VARCHAR(100)" def bind_processor(self, dialect): def process(value): return "BIND_IN"+ value return process def result_processor(self, dialect): def process(value): return value + "BIND_OUT" return process def adapt(self, typeobj): return typeobj() class MyDecoratedType(types.TypeDecorator): impl = String def bind_processor(self, dialect): impl_processor = super(MyDecoratedType, self).bind_processor(dialect) or (lambda value:value) def process(value): return "BIND_IN"+ impl_processor(value) return process def result_processor(self, dialect): impl_processor = super(MyDecoratedType, self).result_processor(dialect) or (lambda value:value) def process(value): return impl_processor(value) + "BIND_OUT" return process def copy(self): return MyDecoratedType() class MyNewUnicodeType(types.TypeDecorator): impl = Unicode def process_bind_param(self, value, dialect): return "BIND_IN" + value def process_result_value(self, value, dialect): return value + "BIND_OUT" def copy(self): return MyNewUnicodeType(self.impl.length) class MyNewIntType(types.TypeDecorator): impl = Integer def process_bind_param(self, value, dialect): return value * 10 def process_result_value(self, value, dialect): return value * 10 def copy(self): return MyNewIntType() class MyNewIntSubClass(MyNewIntType): def process_result_value(self, value, dialect): return value * 15 def copy(self): return MyNewIntSubClass() class MyUnicodeType(types.TypeDecorator): impl = Unicode def bind_processor(self, dialect): impl_processor = super(MyUnicodeType, self).bind_processor(dialect) or (lambda value:value) def process(value): return "BIND_IN"+ impl_processor(value) return process def result_processor(self, dialect): impl_processor = super(MyUnicodeType, self).result_processor(dialect) or (lambda value:value) def process(value): return impl_processor(value) + "BIND_OUT" return process def copy(self): return MyUnicodeType(self.impl.length) class LegacyType(types.TypeEngine): def get_col_spec(self): return "VARCHAR(100)" def convert_bind_param(self, value, dialect): return "BIND_IN"+ value def convert_result_value(self, value, dialect): return value + "BIND_OUT" def adapt(self, typeobj): return typeobj() class LegacyUnicodeType(types.TypeDecorator): impl = Unicode def convert_bind_param(self, value, dialect): return "BIND_IN" + super(LegacyUnicodeType, self).convert_bind_param(value, dialect) def convert_result_value(self, value, dialect): return super(LegacyUnicodeType, self).convert_result_value(value, dialect) + "BIND_OUT" def copy(self): return LegacyUnicodeType(self.impl.length) metadata = MetaData(testing.db) users = Table('type_users', metadata, Column('user_id', Integer, primary_key = True), # totall custom type Column('goofy', MyType, nullable = False), # decorated type with an argument, so its a String Column('goofy2', MyDecoratedType(50), nullable = False), # decorated type without an argument, it will adapt_args to TEXT Column('goofy3', MyDecoratedType, nullable = False), Column('goofy4', MyUnicodeType, nullable = False), Column('goofy5', LegacyUnicodeType, nullable = False), Column('goofy6', LegacyType, nullable = False), Column('goofy7', MyNewUnicodeType, nullable = False), Column('goofy8', MyNewIntType, nullable = False), Column('goofy9', MyNewIntSubClass, nullable = False), ) metadata.create_all() def tearDownAll(self): metadata.drop_all()class ColumnsTest(TestBase, AssertsExecutionResults):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -