📄 postgres.py
字号:
import testenv; testenv.configure_for_tests()import datetimefrom sqlalchemy import *from sqlalchemy import exceptionsfrom sqlalchemy.databases import postgresfrom sqlalchemy.engine.strategies import MockEngineStrategyfrom testlib import *from sqlalchemy.sql import table, columnclass SequenceTest(TestBase, AssertsCompiledSQL): def test_basic(self): seq = Sequence("my_seq_no_schema") dialect = postgres.PGDialect() assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema" seq = Sequence("my_seq", schema="some_schema") assert dialect.identifier_preparer.format_sequence(seq) == "some_schema.my_seq" seq = Sequence("My_Seq", schema="Some_Schema") assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'class CompileTest(TestBase, AssertsCompiledSQL): def test_update_returning(self): dialect = postgres.dialect() table1 = table('mytable', column('myid', Integer), column('name', String(128)), column('description', String(128)), ) u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect) u = update(table1, values=dict(name='foo'), postgres_returning=[table1]) self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\ "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect) u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)]) self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect) def test_insert_returning(self): dialect = postgres.dialect() table1 = table('mytable', column('myid', Integer), column('name', String(128)), column('description', String(128)), ) i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name]) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect) i = insert(table1, values=dict(name='foo'), postgres_returning=[table1]) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\ "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect) i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)]) self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect)class ReturningTest(TestBase, AssertsExecutionResults): __only_on__ = 'postgres' @testing.exclude('postgres', '<', (8, 2)) def test_update_returning(self): meta = MetaData(testing.db) table = Table('tables', meta, Column('id', Integer, primary_key=True), Column('persons', Integer), Column('full', Boolean) ) table.create() try: table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute() self.assertEqual(result.fetchall(), [(1,)]) result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() self.assertEqual(result2.fetchall(), [(1,True),(2,False)]) finally: table.drop() @testing.exclude('postgres', '<', (8, 2)) def test_insert_returning(self): meta = MetaData(testing.db) table = Table('tables', meta, Column('id', Integer, primary_key=True), Column('persons', Integer), Column('full', Boolean) ) table.create() try: result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False}) self.assertEqual(result.fetchall(), [(1,)]) # Multiple inserts only return the last row result2 = table.insert(postgres_returning=[table]).execute( [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}]) self.assertEqual(result2.fetchall(), [(3,3,True)]) result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False}) self.assertEqual([dict(row) for row in result3], [{'double_id':8}]) result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons') self.assertEqual([dict(row) for row in result4], [{'persons': 10}]) finally: table.drop()class InsertTest(TestBase, AssertsExecutionResults): __only_on__ = 'postgres' def setUpAll(self): global metadata metadata = MetaData(testing.db) def tearDown(self): metadata.drop_all() metadata.tables.clear() def test_compiled_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) metadata.create_all() ins = table.insert(values={'data':bindparam('x')}).compile() ins.execute({'x':"five"}, {'x':"seven"}) assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')] def test_sequence_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq'), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_with_sequence(table, "my_seq") def test_opt_sequence_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) def test_autoincrement_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) def test_noautoincrement_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True, autoincrement=False), Column('data', String(30))) metadata.create_all() self._assert_data_noautoincrement(table) def _assert_data_autoincrement(self, table): def go(): # execute with explicit id r = table.insert().execute({'id':30, 'data':'d1'}) assert r.last_inserted_ids() == [30] # execute with prefetch id r = table.insert().execute({'data':'d2'}) assert r.last_inserted_ids() == [1] # executemany with explicit ids table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) # executemany, uses SERIAL table.insert().execute({'data':'d5'}, {'data':'d6'}) # single execute, explicit id, inline table.insert(inline=True).execute({'id':33, 'data':'d7'}) # single execute, inline, uses SERIAL table.insert(inline=True).execute({'data':'d8'}) # note that the test framework doesnt capture the "preexecute" of a seqeuence # or default. we just see it in the bind params. self.assert_sql(testing.db, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':30, 'data':'d1'} ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':1, 'data':'d2'} ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}] ), ( "INSERT INTO testtable (data) VALUES (:data)", [{'data':'d5'}, {'data':'d6'}] ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", [{'id':33, 'data':'d7'}] ), ( "INSERT INTO testtable (data) VALUES (:data)", [{'data':'d8'}] ), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), (31, 'd3'), (32, 'd4'), (2, 'd5'), (3, 'd6'), (33, 'd7'), (4, 'd8'), ] table.delete().execute() # test the same series of events using a reflected # version of the table m2 = MetaData(testing.db) table = Table(table.name, m2, autoload=True) def go(): table.insert().execute({'id':30, 'data':'d1'}) r = table.insert().execute({'data':'d2'}) assert r.last_inserted_ids() == [5] table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) table.insert().execute({'data':'d5'}, {'data':'d6'}) table.insert(inline=True).execute({'id':33, 'data':'d7'}) table.insert(inline=True).execute({'data':'d8'}) self.assert_sql(testing.db, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':30, 'data':'d1'} ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':5, 'data':'d2'} ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}] ), ( "INSERT INTO testtable (data) VALUES (:data)", [{'data':'d5'}, {'data':'d6'}] ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", [{'id':33, 'data':'d7'}] ), ( "INSERT INTO testtable (data) VALUES (:data)", [{'data':'d8'}] ), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (5, 'd2'), (31, 'd3'), (32, 'd4'), (6, 'd5'), (7, 'd6'), (33, 'd7'), (8, 'd8'), ] table.delete().execute() def _assert_data_with_sequence(self, table, seqname): def go(): table.insert().execute({'id':30, 'data':'d1'}) table.insert().execute({'data':'d2'}) table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}) table.insert().execute({'data':'d5'}, {'data':'d6'}) table.insert(inline=True).execute({'id':33, 'data':'d7'}) table.insert(inline=True).execute({'data':'d8'}) self.assert_sql(testing.db, go, [], with_sequences=[ ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':30, 'data':'d1'} ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", {'id':1, 'data':'d2'} ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}] ), ( "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname, [{'data':'d5'}, {'data':'d6'}] ), ( "INSERT INTO testtable (id, data) VALUES (:id, :data)", [{'id':33, 'data':'d7'}] ), ( "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname, [{'data':'d8'}] ), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), (31, 'd3'), (32, 'd4'), (2, 'd5'), (3, 'd6'), (33, 'd7'), (4, 'd8'), ] # cant test reflection here since the Sequence must be # explicitly specified def _assert_data_noautoincrement(self, table): table.insert().execute({'id':30, 'data':'d1'}) try: table.insert().execute({'data':'d2'}) assert False except exceptions.IntegrityError, e: assert "violates not-null constraint" in str(e) try: table.insert().execute({'data':'d2'}, {'data':'d3'}) assert False except exceptions.IntegrityError, e: assert "violates not-null constraint" in str(e) table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'}) table.insert(inline=True).execute({'id':33, 'data':'d4'}) assert table.select().execute().fetchall() == [ (30, 'd1'), (31, 'd2'), (32, 'd3'), (33, 'd4'), ] table.delete().execute() # test the same series of events using a reflected # version of the table m2 = MetaData(testing.db)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -