📄 mysql.py
字号:
assert t.c.c3.default.arg == 'abc' finally: def_table.drop() @testing.exclude('mysql', '<', (5, 0, 0)) @testing.uses_deprecated('Using String type with no length') def test_type_reflection(self): # (ask_for, roundtripped_as_if_different) specs = [( String(), mysql.MSText(), ), ( String(1), mysql.MSString(1), ), ( String(3), mysql.MSString(3), ), ( Text(), mysql.MSText(), ), ( Unicode(), mysql.MSText(), ), ( Unicode(1), mysql.MSString(1), ), ( Unicode(3), mysql.MSString(3), ), ( UnicodeText(), mysql.MSText(), ), ( mysql.MSChar(1), ), ( mysql.MSChar(3), ), ( NCHAR(2), mysql.MSChar(2), ), ( mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only ( mysql.MSNVarChar(22), mysql.MSString(22), ), ( SmallInteger(), mysql.MSSmallInteger(), ), ( SmallInteger(4), mysql.MSSmallInteger(4), ), ( mysql.MSSmallInteger(), ), ( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ), ( Binary(3), mysql.MSBlob(3), ), ( Binary(), mysql.MSBlob() ), ( mysql.MSBinary(3), mysql.MSBinary(3), ), ( mysql.MSVarBinary(3),), ( mysql.MSVarBinary(), mysql.MSBlob()), ( mysql.MSTinyBlob(),), ( mysql.MSBlob(),), ( mysql.MSBlob(1234), mysql.MSBlob()), ( mysql.MSMediumBlob(),), ( mysql.MSLongBlob(),), ( mysql.MSEnum("''","'fleem'"), ), ] columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] db = testing.db m = MetaData(db) t_table = Table('mysql_types', m, *columns) try: m.create_all() m2 = MetaData(db) rt = Table('mysql_types', m2, autoload=True) try: db.execute('CREATE OR REPLACE VIEW mysql_types_v ' 'AS SELECT * from mysql_types') rv = Table('mysql_types_v', m2, autoload=True) expected = [len(c) > 1 and c[1] or c[0] for c in specs] # Early 5.0 releases seem to report more "general" for columns # in a view, e.g. char -> varchar, tinyblob -> mediumblob # # Not sure exactly which point version has the fix. if db.dialect.server_version_info(db.connect()) < (5, 0, 11): tables = rt, else: tables = rt, rv for table in tables: for i, reflected in enumerate(table.c): assert isinstance(reflected.type, type(expected[i])) finally: db.execute('DROP VIEW mysql_types_v') finally: m.drop_all() def test_autoincrement(self): meta = MetaData(testing.db) try: Table('ai_1', meta, Column('int_y', Integer, primary_key=True), Column('int_n', Integer, PassiveDefault('0'), primary_key=True)) Table('ai_2', meta, Column('int_y', Integer, primary_key=True), Column('int_n', Integer, PassiveDefault('0'), primary_key=True)) Table('ai_3', meta, Column('int_n', Integer, PassiveDefault('0'), primary_key=True, autoincrement=False), Column('int_y', Integer, primary_key=True)) Table('ai_4', meta, Column('int_n', Integer, PassiveDefault('0'), primary_key=True, autoincrement=False), Column('int_n2', Integer, PassiveDefault('0'), primary_key=True, autoincrement=False)) Table('ai_5', meta, Column('int_y', Integer, primary_key=True), Column('int_n', Integer, PassiveDefault('0'), primary_key=True, autoincrement=False)) Table('ai_6', meta, Column('o1', String(1), PassiveDefault('x'), primary_key=True), Column('int_y', Integer, primary_key=True)) Table('ai_7', meta, Column('o1', String(1), PassiveDefault('x'), primary_key=True), Column('o2', String(1), PassiveDefault('x'), primary_key=True), Column('int_y', Integer, primary_key=True)) Table('ai_8', meta, Column('o1', String(1), PassiveDefault('x'), primary_key=True), Column('o2', String(1), PassiveDefault('x'), primary_key=True)) meta.create_all() table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', 'ai_5', 'ai_6', 'ai_7', 'ai_8'] mr = MetaData(testing.db) mr.reflect(only=table_names) for tbl in [mr.tables[name] for name in table_names]: for c in tbl.c: if c.name.startswith('int_y'): assert c.autoincrement elif c.name.startswith('int_n'): assert not c.autoincrement tbl.insert().execute() if 'int_y' in tbl.c: assert select([tbl.c.int_y]).scalar() == 1 assert list(tbl.select().execute().fetchone()).count(1) == 1 else: assert 1 not in list(tbl.select().execute().fetchone()) finally: meta.drop_all() def assert_eq(self, got, wanted): if got != wanted: print "Expected %s" % wanted print "Found %s" % got self.assertEqual(got, wanted)class SQLTest(TestBase, AssertsCompiledSQL): """Tests MySQL-dialect specific compilation.""" __dialect__ = mysql.dialect() def test_precolumns(self): dialect = self.__dialect__ def gen(distinct=None, prefixes=None): kw = {} if distinct is not None: kw['distinct'] = distinct if prefixes is not None: kw['prefixes'] = prefixes return str(select(['q'], **kw).compile(dialect=dialect)) self.assertEqual(gen(None), 'SELECT q') self.assertEqual(gen(True), 'SELECT DISTINCT q') self.assertEqual(gen(1), 'SELECT DISTINCT q') self.assertEqual(gen('diSTInct'), 'SELECT DISTINCT q') self.assertEqual(gen('DISTINCT'), 'SELECT DISTINCT q') # Standard SQL self.assertEqual(gen('all'), 'SELECT ALL q') self.assertEqual(gen('distinctrow'), 'SELECT DISTINCTROW q') # Interaction with MySQL prefix extensions self.assertEqual( gen(None, ['straight_join']), 'SELECT straight_join q') self.assertEqual( gen('all', ['HIGH_PRIORITY SQL_SMALL_RESULT']), 'SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q') self.assertEqual( gen(True, ['high_priority', sql.text('sql_cache')]), 'SELECT high_priority sql_cache DISTINCT q') def test_limit(self): t = sql.table('t', sql.column('col1'), sql.column('col2')) self.assert_compile( select([t]).limit(10).offset(20), "SELECT t.col1, t.col2 FROM t LIMIT 20, 10" ) self.assert_compile( select([t]).limit(10), "SELECT t.col1, t.col2 FROM t LIMIT 10") self.assert_compile( select([t]).offset(10), "SELECT t.col1, t.col2 FROM t LIMIT 10, 18446744073709551615" ) def test_update_limit(self): t = sql.table('t', sql.column('col1'), sql.column('col2')) self.assert_compile( t.update(values={'col1':123}), "UPDATE t SET col1=%s" ) self.assert_compile( t.update(values={'col1':123}, mysql_limit=5), "UPDATE t SET col1=%s LIMIT 5" ) self.assert_compile( t.update(values={'col1':123}, mysql_limit=None), "UPDATE t SET col1=%s" ) self.assert_compile( t.update(t.c.col2==456, values={'col1':123}, mysql_limit=1), "UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1" ) def test_cast(self): t = sql.table('t', sql.column('col')) m = mysql specs = [ (Integer, "CAST(t.col AS SIGNED INTEGER)"), (INT, "CAST(t.col AS SIGNED INTEGER)"), (m.MSInteger, "CAST(t.col AS SIGNED INTEGER)"), (m.MSInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"), (SmallInteger, "CAST(t.col AS SIGNED INTEGER)"), (m.MSSmallInteger, "CAST(t.col AS SIGNED INTEGER)"), (m.MSTinyInteger, "CAST(t.col AS SIGNED INTEGER)"), # 'SIGNED INTEGER' is a bigint, so this is ok. (m.MSBigInteger, "CAST(t.col AS SIGNED INTEGER)"), (m.MSBigInteger(unsigned=False), "CAST(t.col AS SIGNED INTEGER)"), (m.MSBigInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"), (m.MSBit, "t.col"), # this is kind of sucky. thank you default arguments! (NUMERIC, "CAST(t.col AS DECIMAL(10, 2))"), (DECIMAL, "CAST(t.col AS DECIMAL(10, 2))"), (Numeric, "CAST(t.col AS DECIMAL(10, 2))"), (m.MSNumeric, "CAST(t.col AS DECIMAL(10, 2))"), (m.MSDecimal, "CAST(t.col AS DECIMAL(10, 2))"), (FLOAT, "t.col"), (Float, "t.col"), (m.MSFloat, "t.col"), (m.MSDouble, "t.col"), (m.MSReal, "t.col"), (TIMESTAMP, "CAST(t.col AS DATETIME)"), (DATETIME, "CAST(t.col AS DATETIME)"), (DATE, "CAST(t.col AS DATE)"), (TIME, "CAST(t.col AS TIME)"), (DateTime, "CAST(t.col AS DATETIME)"), (Date, "CAST(t.col AS DATE)"), (Time, "CAST(t.col AS TIME)"), (m.MSDateTime, "CAST(t.col AS DATETIME)"), (m.MSDate, "CAST(t.col AS DATE)"), (m.MSTime, "CAST(t.col AS TIME)"), (m.MSTimeStamp, "CAST(t.col AS DATETIME)"), (m.MSYear, "t.col"), (m.MSYear(2), "t.col"), (Interval, "t.col"), (String, "CAST(t.col AS CHAR)"), (Unicode, "CAST(t.col AS CHAR)"), (UnicodeText, "CAST(t.col AS CHAR)"), (VARCHAR, "CAST(t.col AS CHAR)"), (NCHAR, "CAST(t.col AS CHAR)"), (CHAR, "CAST(t.col AS CHAR)"), (CLOB, "CAST(t.col AS CHAR)"), (TEXT, "CAST(t.col AS CHAR)"), (String(32), "CAST(t.col AS CHAR(32))"), (Unicode(32), "CAST(t.col AS CHAR(32))"), (CHAR(32), "CAST(t.col AS CHAR(32))"), (m.MSString, "CAST(t.col AS CHAR)"), (m.MSText, "CAST(t.col AS CHAR)"), (m.MSTinyText, "CAST(t.col AS CHAR)"), (m.MSMediumText, "CAST(t.col AS CHAR)"), (m.MSLongText, "CAST(t.col AS CHAR)"), (m.MSNChar, "CAST(t.col AS CHAR)"), (m.MSNVarChar, "CAST(t.col AS CHAR)"), (Binary, "CAST(t.col AS BINARY)"), (BLOB, "CAST(t.col AS BINARY)"), (m.MSBlob, "CAST(t.col AS BINARY)"), (m.MSBlob(32), "CAST(t.col AS BINARY)"), (m.MSTinyBlob, "CAST(t.col AS BINARY)"), (m.MSMediumBlob, "CAST(t.col AS BINARY)"), (m.MSLongBlob, "CAST(t.col AS BINARY)"), (m.MSBinary, "CAST(t.col AS BINARY)"), (m.MSBinary(32), "CAST(t.col AS BINARY)"), (m.MSVarBinary, "CAST(t.col AS BINARY)"), (m.MSVarBinary(32), "CAST(t.col AS BINARY)"), # maybe this could be changed to something more DWIM, needs # testing (Boolean, "t.col"), (BOOLEAN, "t.col"), (m.MSBoolean, "t.col"), (m.MSEnum, "t.col"), (m.MSEnum("'1'", "'2'"), "t.col"), (m.MSSet, "t.col"), (m.MSSet("'1'", "'2'"), "t.col"), ] for type_, expected in specs: self.assert_compile(cast(t.c.col, type_), expected)def colspec(c): return testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None).get_column_specification(c)if __name__ == "__main__": testenv.main()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -