📄 database.py
字号:
## Copyright (c) 2006, 2007 Canonical## Written by Gustavo Niemeyer <gustavo@niemeyer.net>## This file is part of Storm Object Relational Mapper.## Storm is free software; you can redistribute it and/or modify# it under the terms of the GNU Lesser General Public License as# published by the Free Software Foundation; either version 2.1 of# the License, or (at your option) any later version.## Storm is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public License# along with this program. If not, see <http://www.gnu.org/licenses/>.#import sysimport newimport gcfrom storm.exceptions import ClosedErrorfrom storm.variables import Variableimport storm.databasefrom storm.database import *from storm.uri import URIfrom storm.expr import *from tests.helper import TestHelpermarker = object()class RawConnection(object): closed = False def __init__(self, executed): self.executed = executed def cursor(self): return RawCursor(executed=self.executed) def commit(self): self.executed.append("COMMIT") def rollback(self): self.executed.append("ROLLBACK") def close(self): self.executed.append("CCLOSE")class RawCursor(object): def __init__(self, arraysize=1, executed=None): self.arraysize = arraysize if executed is None: self.executed = [] else: self.executed = executed self._fetchone_data = [("fetchone%d" % i,) for i in range(3)] self._fetchall_data = [("fetchall%d" % i,) for i in range(2)] self._fetchmany_data = [("fetchmany%d" % i,) for i in range(5)] def close(self): self.executed.append("RCLOSE") def execute(self, statement, params=marker): self.executed.append((statement, params)) def fetchone(self): if self._fetchone_data: return self._fetchone_data.pop(0) return None def fetchall(self): result = self._fetchall_data self._fetchall_data = [] return result def fetchmany(self): result = self._fetchmany_data[:self.arraysize] del self._fetchmany_data[:self.arraysize] return resultclass DatabaseTest(TestHelper): def setUp(self): TestHelper.setUp(self) self.database = Database() def test_connect(self): self.assertRaises(NotImplementedError, self.database.connect)class ConnectionTest(TestHelper): def setUp(self): TestHelper.setUp(self) self.executed = [] self.database = Database() self.raw_connection = RawConnection(self.executed) self.connection = Connection(self.database, self.raw_connection) def test_execute(self): result = self.connection.execute("something") self.assertTrue(isinstance(result, Result)) self.assertEquals(self.executed, [("something", marker)]) def test_execute_params(self): result = self.connection.execute("something", (1,2,3)) self.assertTrue(isinstance(result, Result)) self.assertEquals(self.executed, [("something", (1,2,3))]) def test_execute_noresult(self): result = self.connection.execute("something", noresult=True) self.assertEquals(result, None) self.assertEquals(self.executed, [("something", marker), "RCLOSE"]) def test_execute_convert_param_style(self): self.connection.execute("'?' ? '?' ? '?'") self.assertEquals(self.executed, [("'?' ? '?' ? '?'", marker)]) def test_execute_convert_param_style(self): class MyConnection(Connection): _param_mark = "%s" connection = MyConnection(self.database, RawConnection(self.executed)) result = connection.execute("'?' ? '?' ? '?'") self.assertEquals(self.executed, [("'?' %s '?' %s '?'", marker)]) # TODO: Unsupported for now. #result = connection.execute("$$?$$ ? $asd$'?$asd$ ? '?'") #self.assertEquals(self.executed, # [("'?' %s '?' %s '?'", marker), # ("$$?$$ %s $asd'?$asd$ %s '?'", marker)]) def test_execute_select(self): select = Select([SQLToken("column1"), SQLToken("column2")], tables=[SQLToken("table1"), SQLToken("table2")]) result = self.connection.execute(select) self.assertTrue(isinstance(result, Result)) self.assertEquals(self.executed, [("SELECT column1, column2 FROM table1, table2", marker)]) def test_execute_select_and_params(self): select = Select(["column1", "column2"], tables=["table1", "table2"]) self.assertRaises(ValueError, self.connection.execute, select, ("something",)) def test_execute_closed(self): self.connection.close() self.assertRaises(ClosedError, self.connection.execute, "SELECT 1") def test_commit(self): self.connection.commit() self.assertEquals(self.executed, ["COMMIT"]) def test_rollback(self): self.connection.rollback() self.assertEquals(self.executed, ["ROLLBACK"]) def test_close(self): self.connection.close() self.assertEquals(self.executed, ["CCLOSE"]) def test_close_twice(self): self.connection.close() self.connection.close() self.assertEquals(self.executed, ["CCLOSE"]) def test_close_deallocates_raw_connection(self): refs_before = len(gc.get_referrers(self.raw_connection)) self.connection.close() refs_after = len(gc.get_referrers(self.raw_connection)) self.assertEquals(refs_after, refs_before-1) def test_del_deallocates_raw_connection(self): refs_before = len(gc.get_referrers(self.raw_connection)) self.connection.__del__() refs_after = len(gc.get_referrers(self.raw_connection)) self.assertEquals(refs_after, refs_before-1) def test_wb_del_with_previously_deallocated_connection(self): self.connection._raw_connection = None self.connection.__del__() def test_get_insert_identity(self): result = self.connection.execute("INSERT") self.assertRaises(NotImplementedError, result.get_insert_identity, None, None)class ResultTest(TestHelper): def setUp(self): TestHelper.setUp(self) self.executed = [] self.raw_cursor = RawCursor(executed=self.executed) self.result = Result(None, self.raw_cursor) def test_get_one(self): self.assertEquals(self.result.get_one(), ("fetchone0",)) self.assertEquals(self.result.get_one(), ("fetchone1",)) self.assertEquals(self.result.get_one(), ("fetchone2",)) self.assertEquals(self.result.get_one(), None) def test_get_all(self): self.assertEquals(self.result.get_all(), [("fetchall0",), ("fetchall1",)]) self.assertEquals(self.result.get_all(), []) def test_iter_arraysize_1(self): self.assertEquals([item for item in self.result], [("fetchone0",), ("fetchone1",), ("fetchone2",),]) def test_iter_arraysize_2(self): result = Result(None, RawCursor(2)) self.assertEquals([item for item in result], [("fetchmany0",), ("fetchmany1",), ("fetchmany2",), ("fetchmany3",), ("fetchmany4",)]) def test_set_variable(self): variable = Variable() self.result.set_variable(variable, marker) self.assertEquals(variable.get(), marker) def test_close(self): self.result.close() self.assertEquals(self.executed, ["RCLOSE"]) def test_close_twice(self): self.result.close() self.result.close() self.assertEquals(self.executed, ["RCLOSE"]) def test_close_deallocates_raw_cursor(self): refs_before = len(gc.get_referrers(self.raw_cursor)) self.result.close() refs_after = len(gc.get_referrers(self.raw_cursor)) self.assertEquals(refs_after, refs_before-1) def test_del_deallocates_raw_cursor(self): refs_before = len(gc.get_referrers(self.raw_cursor)) self.result.__del__() refs_after = len(gc.get_referrers(self.raw_cursor)) self.assertEquals(refs_after, refs_before-1) def test_wb_del_with_previously_deallocated_cursor(self): self.result._raw_cursor = None self.result.__del__()class CreateDatabaseTest(TestHelper): def setUp(self): TestHelper.setUp(self) self.db_module = new.module("db_module") self.uri = None def create_from_uri(uri): self.uri = uri return "RESULT" self.db_module.create_from_uri = create_from_uri sys.modules["storm.databases.db_module"] = self.db_module def tearDown(self): del sys.modules["storm.databases.db_module"] TestHelper.tearDown(self) def test_create_database_with_str(self): create_database("db_module:db") self.assertTrue(self.uri) self.assertEquals(self.uri.scheme, "db_module") self.assertEquals(self.uri.database, "db") def test_create_database_with_unicode(self): create_database(u"db_module:db") self.assertTrue(self.uri) self.assertEquals(self.uri.scheme, "db_module") self.assertEquals(self.uri.database, "db") def test_create_database_with_uri(self): uri = URI("db_module:db") create_database(uri) self.assertTrue(self.uri is uri)class RegisterSchemeTest(TestHelper): uri = None def tearDown(self): if 'factory' in storm.database._database_schemes: del storm.database._database_schemes['factory'] TestHelper.tearDown(self) def test_register_scheme(self): def factory(uri): self.uri = uri return "FACTORY RESULT" register_scheme('factory', factory) self.assertEqual(storm.database._database_schemes['factory'], factory) # Check that we can create databases that use this scheme ... result = create_database('factory:foobar') self.assertEqual(result, "FACTORY RESULT") self.assertTrue(self.uri) self.assertEqual(self.uri.scheme, 'factory') self.assertEqual(self.uri.database, 'foobar')
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -