📄 testtypes.py
字号:
def testcolumns(self): expectedResults = { 'int_column': 'int_column INTEGER', 'smallint_column': 'smallint_column SMALLINT', 'varchar_column': 'varchar_column VARCHAR(20)', 'numeric_column': 'numeric_column NUMERIC(12, 3)', 'float_column': 'float_column FLOAT(25)', } db = testing.db if testing.against('sqlite', 'oracle'): expectedResults['float_column'] = 'float_column NUMERIC(25, 2)' if testing.against('maxdb'): expectedResults['numeric_column'] = ( expectedResults['numeric_column'].replace('NUMERIC', 'FIXED')) print db.engine.__module__ testTable = Table('testColumns', MetaData(db), Column('int_column', Integer), Column('smallint_column', SmallInteger), Column('varchar_column', String(20)), Column('numeric_column', Numeric(12,3)), Column('float_column', Float(25)), ) for aCol in testTable.c: self.assertEquals( expectedResults[aCol.name], db.dialect.schemagenerator(db.dialect, db, None, None).\ get_column_specification(aCol))class UnicodeTest(TestBase, AssertsExecutionResults): """tests the Unicode type. also tests the TypeDecorator with instances in the types package.""" def setUpAll(self): global unicode_table metadata = MetaData(testing.db) unicode_table = Table('unicode_table', metadata, Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True), Column('unicode_varchar', Unicode(250)), Column('unicode_text', UnicodeText), Column('plain_varchar', String(250)) ) unicode_table.create() def tearDownAll(self): unicode_table.drop() def tearDown(self): unicode_table.delete().execute() def testbasic(self): assert unicode_table.c.unicode_varchar.type.length == 250 rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' unicodedata = rawdata.decode('utf-8') unicode_table.insert().execute(unicode_varchar=unicodedata, unicode_text=unicodedata, plain_varchar=rawdata) x = unicode_table.select().execute().fetchone() print 0, repr(unicodedata) print 1, repr(x['unicode_varchar']) print 2, repr(x['unicode_text']) print 3, repr(x['plain_varchar']) self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) if isinstance(x['plain_varchar'], unicode): # SQLLite and MSSQL return non-unicode data as unicode self.assert_(testing.against('sqlite', 'mssql')) self.assert_(x['plain_varchar'] == unicodedata) print "it's %s!" % testing.db.name else: self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata) def testassert(self): try: unicode_table.insert().execute(unicode_varchar='not unicode') assert False except exceptions.SAWarning, e: assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e) unicode_engine = engines.utf8_engine(options={'convert_unicode':True, 'assert_unicode':True}) try: try: unicode_engine.execute(unicode_table.insert(), plain_varchar='im not unicode') assert False except exceptions.InvalidRequestError, e: assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'" @testing.emits_warning('.*non-unicode bind') def warns(): # test that data still goes in if warning is emitted.... unicode_table.insert().execute(unicode_varchar='not unicode') assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )]) warns() finally: unicode_engine.dispose() @testing.fails_on('oracle') def testblanks(self): unicode_table.insert().execute(unicode_varchar=u'') assert select([unicode_table.c.unicode_varchar]).scalar() == u'' def testengineparam(self): """tests engine-wide unicode conversion""" prev_unicode = testing.db.engine.dialect.convert_unicode prev_assert = testing.db.engine.dialect.assert_unicode try: testing.db.engine.dialect.convert_unicode = True testing.db.engine.dialect.assert_unicode = False rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' unicodedata = rawdata.decode('utf-8') unicode_table.insert().execute(unicode_varchar=unicodedata, unicode_text=unicodedata, plain_varchar=rawdata) x = unicode_table.select().execute().fetchone() print 0, repr(unicodedata) print 1, repr(x['unicode_varchar']) print 2, repr(x['unicode_text']) print 3, repr(x['plain_varchar']) self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata) finally: testing.db.engine.dialect.convert_unicode = prev_unicode testing.db.engine.dialect.convert_unicode = prev_assert @testing.unsupported('oracle') def testlength(self): """checks the database correctly understands the length of a unicode string""" teststr = u'aaa\x1234' self.assert_(testing.db.func.length(teststr).scalar() == len(teststr))class BinaryTest(TestBase, AssertsExecutionResults): def setUpAll(self): global binary_table, MyPickleType class MyPickleType(types.TypeDecorator): impl = PickleType def process_bind_param(self, value, dialect): if value: value.stuff = 'this is modified stuff' return value def process_result_value(self, value, dialect): if value: value.stuff = 'this is the right stuff' return value binary_table = Table('binary_table', MetaData(testing.db), Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True), Column('data', Binary), Column('data_slice', Binary(100)), Column('misc', String(30)), # construct PickleType with non-native pickle module, since cPickle uses relative module # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative # to the 'types' module Column('pickled', PickleType), Column('mypickle', MyPickleType) ) binary_table.create() def tearDown(self): binary_table.delete().execute() def tearDownAll(self): binary_table.drop() def testbinary(self): testobj1 = pickleable.Foo('im foo 1') testobj2 = pickleable.Foo('im foo 2') testobj3 = pickleable.Foo('im foo 3') stream1 =self.load_stream('binary_data_one.dat') stream2 =self.load_stream('binary_data_two.dat') binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3) binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2) binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None) for stmt in ( binary_table.select(order_by=binary_table.c.primary_id), text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db) ): l = stmt.execute().fetchall() print type(stream1), type(l[0]['data']), type(l[0]['data_slice']) print len(stream1), len(l[0]['data']), len(l[0]['data_slice']) self.assertEquals(list(stream1), list(l[0]['data'])) self.assertEquals(list(stream1[0:100]), list(l[0]['data_slice'])) self.assertEquals(list(stream2), list(l[1]['data'])) self.assertEquals(testobj1, l[0]['pickled']) self.assertEquals(testobj2, l[1]['pickled']) self.assertEquals(testobj3.moredata, l[0]['mypickle'].moredata) self.assertEquals(l[0]['mypickle'].stuff, 'this is the right stuff') def load_stream(self, name, len=12579): f = os.path.join(os.path.dirname(testenv.__file__), name) # put a number less than the typical MySQL default BLOB size return file(f).read(len)class ExpressionTest(TestBase, AssertsExecutionResults): def setUpAll(self): global test_table, meta class MyCustomType(types.TypeEngine): def get_col_spec(self): return "INT" def bind_processor(self, dialect): def process(value): return value * 10 return process def result_processor(self, dialect): def process(value): return value / 10 return process def adapt_operator(self, op): return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op) meta = MetaData(testing.db) test_table = Table('test', meta, Column('id', Integer, primary_key=True), Column('data', String(30)), Column('atimestamp', Date), Column('avalue', MyCustomType)) meta.create_all() test_table.insert().execute({'id':1, 'data':'somedata', 'atimestamp':datetime.date(2007, 10, 15), 'avalue':25}) def tearDownAll(self): meta.drop_all() def test_control(self): assert testing.db.execute("select avalue from test").scalar() == 250 assert test_table.select().execute().fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)] def test_bind_adapt(self): expr = test_table.c.atimestamp == bindparam("thedate") assert expr.right.type.__class__ == test_table.c.atimestamp.type.__class__ assert testing.db.execute(test_table.select().where(expr), {"thedate":datetime.date(2007, 10, 15)}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)] expr = test_table.c.avalue == bindparam("somevalue") assert expr.right.type.__class__ == test_table.c.avalue.type.__class__ assert testing.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)] def test_operator_adapt(self): """test type-based overloading of operators""" # test string concatenation expr = test_table.c.data + "somedata" assert testing.db.execute(select([expr])).scalar() == "somedatasomedata" expr = test_table.c.id + 15 assert testing.db.execute(select([expr])).scalar() == 16 # test custom operator conversion expr = test_table.c.avalue + 40 assert expr.type.__class__ is test_table.c.avalue.type.__class__ # + operator converted to - # value is calculated as: (250 - (40 * 10)) / 10 == -15 assert testing.db.execute(select([expr.label('foo')])).scalar() == -15 # this one relies upon anonymous labeling to assemble result # processing rules on the column. assert testing.db.execute(select([expr])).scalar() == -15
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -