⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testtypes.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
    def testcolumns(self):        expectedResults = { 'int_column': 'int_column INTEGER',                            'smallint_column': 'smallint_column SMALLINT',                            'varchar_column': 'varchar_column VARCHAR(20)',                            'numeric_column': 'numeric_column NUMERIC(12, 3)',                            'float_column': 'float_column FLOAT(25)',                          }        db = testing.db        if testing.against('sqlite', 'oracle'):            expectedResults['float_column'] = 'float_column NUMERIC(25, 2)'        if testing.against('maxdb'):            expectedResults['numeric_column'] = (                expectedResults['numeric_column'].replace('NUMERIC', 'FIXED'))        print db.engine.__module__        testTable = Table('testColumns', MetaData(db),            Column('int_column', Integer),            Column('smallint_column', SmallInteger),            Column('varchar_column', String(20)),            Column('numeric_column', Numeric(12,3)),            Column('float_column', Float(25)),        )        for aCol in testTable.c:            self.assertEquals(                expectedResults[aCol.name],                db.dialect.schemagenerator(db.dialect, db, None, None).\                  get_column_specification(aCol))class UnicodeTest(TestBase, AssertsExecutionResults):    """tests the Unicode type.  also tests the TypeDecorator with instances in the types package."""    def setUpAll(self):        global unicode_table        metadata = MetaData(testing.db)        unicode_table = Table('unicode_table', metadata,            Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),            Column('unicode_varchar', Unicode(250)),            Column('unicode_text', UnicodeText),            Column('plain_varchar', String(250))            )        unicode_table.create()    def tearDownAll(self):        unicode_table.drop()    def tearDown(self):        unicode_table.delete().execute()    def testbasic(self):        assert unicode_table.c.unicode_varchar.type.length == 250        rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'        unicodedata = rawdata.decode('utf-8')        unicode_table.insert().execute(unicode_varchar=unicodedata,                                       unicode_text=unicodedata,                                       plain_varchar=rawdata)        x = unicode_table.select().execute().fetchone()        print 0, repr(unicodedata)        print 1, repr(x['unicode_varchar'])        print 2, repr(x['unicode_text'])        print 3, repr(x['plain_varchar'])        self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)        self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)        if isinstance(x['plain_varchar'], unicode):            # SQLLite and MSSQL return non-unicode data as unicode            self.assert_(testing.against('sqlite', 'mssql'))            self.assert_(x['plain_varchar'] == unicodedata)            print "it's %s!" % testing.db.name        else:            self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)    def testassert(self):        try:            unicode_table.insert().execute(unicode_varchar='not unicode')            assert False        except exceptions.SAWarning, e:            assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)        unicode_engine = engines.utf8_engine(options={'convert_unicode':True,                                                      'assert_unicode':True})        try:            try:                unicode_engine.execute(unicode_table.insert(), plain_varchar='im not unicode')                assert False            except exceptions.InvalidRequestError, e:                assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"            @testing.emits_warning('.*non-unicode bind')            def warns():                # test that data still goes in if warning is emitted....                unicode_table.insert().execute(unicode_varchar='not unicode')                assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])            warns()        finally:            unicode_engine.dispose()    @testing.fails_on('oracle')    def testblanks(self):        unicode_table.insert().execute(unicode_varchar=u'')        assert select([unicode_table.c.unicode_varchar]).scalar() == u''    def testengineparam(self):        """tests engine-wide unicode conversion"""        prev_unicode = testing.db.engine.dialect.convert_unicode        prev_assert = testing.db.engine.dialect.assert_unicode        try:            testing.db.engine.dialect.convert_unicode = True            testing.db.engine.dialect.assert_unicode = False            rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'            unicodedata = rawdata.decode('utf-8')            unicode_table.insert().execute(unicode_varchar=unicodedata,                                           unicode_text=unicodedata,                                           plain_varchar=rawdata)            x = unicode_table.select().execute().fetchone()            print 0, repr(unicodedata)            print 1, repr(x['unicode_varchar'])            print 2, repr(x['unicode_text'])            print 3, repr(x['plain_varchar'])            self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)            self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)            self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata)        finally:            testing.db.engine.dialect.convert_unicode = prev_unicode            testing.db.engine.dialect.convert_unicode = prev_assert    @testing.unsupported('oracle')    def testlength(self):        """checks the database correctly understands the length of a unicode string"""        teststr = u'aaa\x1234'        self.assert_(testing.db.func.length(teststr).scalar() == len(teststr))class BinaryTest(TestBase, AssertsExecutionResults):    def setUpAll(self):        global binary_table, MyPickleType        class MyPickleType(types.TypeDecorator):            impl = PickleType            def process_bind_param(self, value, dialect):                if value:                    value.stuff = 'this is modified stuff'                return value            def process_result_value(self, value, dialect):                if value:                    value.stuff = 'this is the right stuff'                return value        binary_table = Table('binary_table', MetaData(testing.db),        Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),        Column('data', Binary),        Column('data_slice', Binary(100)),        Column('misc', String(30)),        # construct PickleType with non-native pickle module, since cPickle uses relative module        # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative        # to the 'types' module        Column('pickled', PickleType),        Column('mypickle', MyPickleType)        )        binary_table.create()    def tearDown(self):        binary_table.delete().execute()    def tearDownAll(self):        binary_table.drop()    def testbinary(self):        testobj1 = pickleable.Foo('im foo 1')        testobj2 = pickleable.Foo('im foo 2')        testobj3 = pickleable.Foo('im foo 3')        stream1 =self.load_stream('binary_data_one.dat')        stream2 =self.load_stream('binary_data_two.dat')        binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat',    data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)        binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)        binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)        for stmt in (            binary_table.select(order_by=binary_table.c.primary_id),            text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db)        ):            l = stmt.execute().fetchall()            print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])            print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])            self.assertEquals(list(stream1), list(l[0]['data']))            self.assertEquals(list(stream1[0:100]), list(l[0]['data_slice']))            self.assertEquals(list(stream2), list(l[1]['data']))            self.assertEquals(testobj1, l[0]['pickled'])            self.assertEquals(testobj2, l[1]['pickled'])            self.assertEquals(testobj3.moredata, l[0]['mypickle'].moredata)            self.assertEquals(l[0]['mypickle'].stuff, 'this is the right stuff')    def load_stream(self, name, len=12579):        f = os.path.join(os.path.dirname(testenv.__file__), name)        # put a number less than the typical MySQL default BLOB size        return file(f).read(len)class ExpressionTest(TestBase, AssertsExecutionResults):    def setUpAll(self):        global test_table, meta        class MyCustomType(types.TypeEngine):            def get_col_spec(self):                return "INT"            def bind_processor(self, dialect):                def process(value):                    return value * 10                return process            def result_processor(self, dialect):                def process(value):                    return value / 10                return process            def adapt_operator(self, op):                return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op)        meta = MetaData(testing.db)        test_table = Table('test', meta,            Column('id', Integer, primary_key=True),            Column('data', String(30)),            Column('atimestamp', Date),            Column('avalue', MyCustomType))        meta.create_all()        test_table.insert().execute({'id':1, 'data':'somedata', 'atimestamp':datetime.date(2007, 10, 15), 'avalue':25})    def tearDownAll(self):        meta.drop_all()    def test_control(self):        assert testing.db.execute("select avalue from test").scalar() == 250        assert test_table.select().execute().fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]    def test_bind_adapt(self):        expr = test_table.c.atimestamp == bindparam("thedate")        assert expr.right.type.__class__ == test_table.c.atimestamp.type.__class__        assert testing.db.execute(test_table.select().where(expr), {"thedate":datetime.date(2007, 10, 15)}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]        expr = test_table.c.avalue == bindparam("somevalue")        assert expr.right.type.__class__ == test_table.c.avalue.type.__class__        assert testing.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]    def test_operator_adapt(self):        """test type-based overloading of operators"""        # test string concatenation        expr = test_table.c.data + "somedata"        assert testing.db.execute(select([expr])).scalar() == "somedatasomedata"        expr = test_table.c.id + 15        assert testing.db.execute(select([expr])).scalar() == 16        # test custom operator conversion        expr = test_table.c.avalue + 40        assert expr.type.__class__ is test_table.c.avalue.type.__class__        # + operator converted to -        # value is calculated as: (250 - (40 * 10)) / 10 == -15        assert testing.db.execute(select([expr.label('foo')])).scalar() == -15        # this one relies upon anonymous labeling to assemble result        # processing rules on the column.        assert testing.db.execute(select([expr])).scalar() == -15

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -