zxtest.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,243 行 · 第 1/3 页
PY
1,243 行
# Jython Database Specification API 2.0## $Id: zxtest.py,v 1.1 2005/10/28 23:47:47 kuzman Exp $## Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>from com.ziclix.python.sql import zxJDBCfrom java.util import Calendar, Date as JDateimport tempfile, os, time, runnerclass zxCoreTestCase(runner.SQLTestCase): def setUp(self): runner.SQLTestCase.setUp(self) self.db = self.connect() self.db.autocommit = 0 def tearDown(self): self.db.close() runner.SQLTestCase.tearDown(self) def connect(self): factory = runner.__imp__(self.factory.classname) args = map(lambda x: x[1], self.factory.arguments) connect = getattr(factory, self.factory.method) return apply(connect, args, self.factory.keywords) def cursor(self, *args, **kws): c = apply(self.db.cursor, args, kws) if hasattr(self, "datahandler"): c.datahandler = self.datahandler(c.datahandler) return cclass zxJDBCTestCase(zxCoreTestCase): def setUp(self): zxCoreTestCase.setUp(self) c = self.cursor() try: c.execute("drop table zxtesting") self.db.commit() except: self.db.rollback() try: c.execute("create table zxtesting (id int not null, name varchar(32), state varchar(32), primary key (id))") self.db.commit() c.execute("insert into zxtesting (id, name, state) values (1, 'test0', 'il')") c.execute("insert into zxtesting (id, name, state) values (2, 'test1', 'wi')") c.execute("insert into zxtesting (id, name, state) values (3, 'test2', 'tx')") c.execute("insert into zxtesting (id, name, state) values (4, 'test3', 'co')") c.execute("insert into zxtesting (id, name, state) values (5, 'test4', 'il')") c.execute("insert into zxtesting (id, name, state) values (6, 'test5', 'ca')") c.execute("insert into zxtesting (id, name, state) values (7, 'test6', 'wi')") self.db.commit() finally: c.close() def tearDown(self): c = self.cursor() try: try: c.execute("drop table zxtesting") except: self.db.rollback() finally: c.close() zxCoreTestCase.tearDown(self)class zxAPITestCase(zxJDBCTestCase): def testConnection(self): """testing connection""" assert self.db, "invalid connection" def testAutocommit(self): """testing autocommit functionality""" if self.db.__connection__.getMetaData().supportsTransactions(): self.db.autocommit = 1 self.assertEquals(1, self.db.__connection__.getAutoCommit()) self.db.autocommit = 0 self.assertEquals(0, self.db.__connection__.getAutoCommit()) def testSimpleQuery(self): """testing simple queries with cursor.execute(), no parameters""" c = self.cursor() try: c.execute("select count(*) from zxtesting") f = c.fetchall() assert len(f) == 1, "expecting one row" c.execute("select * from zxtesting") data = c.fetchone() assert len(f) == 1, "expecting one row" assert data[0] == 1, "expected [1] rows, got [%d]" % (data[0]) finally: c.close() def testNoneQuery(self): """testing that executing None doesn't fail""" c = self.cursor() try: c.execute(None) finally: c.close() def _test_preparedstatement(self, dynamic): c = self.cursor(dynamic) try: p = c.prepare("select * from zxtesting where id = ?") for i in range(1, 8): c.execute(p, (i,)) data = c.fetchall() self.assertEquals(1, len(data)) assert not p.closed p.close() assert p.closed self.assertRaises(zxJDBC.ProgrammingError, c.execute, p, (1,)) finally: c.close() def testStaticPrepare(self): """testing the prepare() functionality for static cursors""" self._test_preparedstatement(0) def testDynamicPrepare(self): """testing the prepare() functionality for dynamic cursors""" self._test_preparedstatement(1) def _test_cursorkeywords(self, *args, **kws): c = self.cursor(*args, **kws) try: c.execute("select * from zxtesting") data = c.fetchmany(1) assert len(data) == 1, "expecting one row" finally: c.close() def testCursorKeywords(self): """testing the creation of a cursor with keywords""" self._test_cursorkeywords(dynamic=1) self._test_cursorkeywords(dynamic=1, rstype=zxJDBC.TYPE_SCROLL_INSENSITIVE, rsconcur=zxJDBC.CONCUR_READ_ONLY ) self._test_cursorkeywords(1, rstype=zxJDBC.TYPE_SCROLL_INSENSITIVE, rsconcur=zxJDBC.CONCUR_READ_ONLY ) self._test_cursorkeywords(1,zxJDBC.TYPE_SCROLL_INSENSITIVE,zxJDBC.CONCUR_READ_ONLY) self.assertRaises(TypeError, self.cursor, 1, zxJDBC.TYPE_SCROLL_INSENSITIVE) def testFileLikeCursor(self): """testing the cursor as a file-like object""" c = self.cursor() try: print >> c, "insert into zxtesting (id, name, state) values (100, 'test100', 'wa')" print >> c, "insert into zxtesting (id, name, state) values (101, 'test101', 'co')" print >> c, "insert into zxtesting (id, name, state) values (102, 'test102', 'or')" self.db.commit() finally: c.close() c = self.cursor() try: c.execute("select * from zxtesting where id in (100, 101, 102)") f = c.fetchall() self.assertEquals(3, len(f)) finally: c.close() def testIteration(self): """testing the iteration protocol""" c = self.cursor() try: # first with a for loop cnt = 0 c.execute("select * from zxtesting") for a in c: self.assertEquals(3, len(a)) cnt += 1 self.assertEquals(7, cnt) # then with a while loop cnt = 0 c.execute("select * from zxtesting") while 1: try: self.assertEquals(3, len(c.next())) except StopIteration: break cnt += 1 self.assertEquals(7, cnt) finally: c.close() def testClosingCursor(self): """testing that a closed cursor throws an exception""" c = self.cursor() try: c.execute("select * from zxtesting") finally: c.close() self.assertRaises(zxJDBC.ProgrammingError, c.execute, ("select * from zxtesting",)) def testClosingConnectionWithOpenCursors(self): """testing that a closed connection closes any open cursors""" c = self.cursor() d = self.cursor() e = self.cursor() self.db.close() # open a new connection so the tearDown can run self.db = self.connect() self.assertRaises(zxJDBC.ProgrammingError, c.execute, ("select * from zxtesting",)) self.assertRaises(zxJDBC.ProgrammingError, d.execute, ("select * from zxtesting",)) self.assertRaises(zxJDBC.ProgrammingError, e.execute, ("select * from zxtesting",)) def testNativeSQL(self): """testing the connection's ability to convert sql""" sql = self.db.nativesql("select * from zxtesting where id = ?") assert sql is not None assert len(sql) > 0 def testTables(self): """testing cursor.tables()""" c = self.cursor() try: c.tables(None, None, None, None) # let's look for zxtesting found = 0 while not found: try: found = "zxtesting" == c.next()[2].lower() except StopIteration: break assert found, "expected to find 'zxtesting'" c.tables(None, None, "zxtesting", None) self.assertEquals(1, len(c.fetchall())) c.tables(None, None, "zxtesting", ("TABLE",)) self.assertEquals(1, len(c.fetchall())) c.tables(None, None, "zxtesting", ("table",)) self.assertEquals(1, len(c.fetchall())) finally: c.close() def testColumns(self): """testing cursor.columns()""" c = self.cursor() try: # deliberately copied so as to produce useful line numbers c.columns(None, None, "zxtesting", None) f = c.fetchall() self.assertEquals(3, c.rowcount) f.sort(lambda x, y: cmp(x[3], y[3])) self.assertEquals("name", f[1][3].lower()) # if the db engine handles mixed case, then don't ask about a different # case because it will fail if not self.db.__connection__.getMetaData().storesMixedCaseIdentifiers(): c.columns(None, None, "ZXTESTING", None) f = c.fetchall() self.assertEquals(3, c.rowcount) f.sort(lambda x, y: cmp(x[3], y[3])) self.assertEquals("name", f[1][3].lower()) finally: c.close() def testBestRow(self): """testing bestrow which finds the optimal set of columns that uniquely identify a row""" c = self.cursor() try: # we're really just testing that this doesn't blow up c.bestrow(None, None, "zxtesting") f = c.fetchall() if f: # we might as well see that it worked self.assertEquals(1, len(f)) finally: c.close() def testTypeInfo(self): """testing cursor.gettypeinfo()""" c = self.cursor() try: c.gettypeinfo() f = c.fetchall() assert f is not None, "expected some type information, got None" # this worked prior to the Fetch re-write, now the client will have to bear the burden, sorry #c.gettypeinfo(zxJDBC.INTEGER) #f = c.fetchall() #assert f[0][1] == zxJDBC.INTEGER, "expected [%d], got [%d]" % (zxJDBC.INTEGER, f[0][1]) finally: c.close() def _test_scrolling(self, dynamic=0): if self.vendor.scroll: c = self.cursor(dynamic, rstype=getattr(zxJDBC, self.vendor.scroll), rsconcur=zxJDBC.CONCUR_READ_ONLY ) else: c = self.cursor(dynamic) try: # set everything up c.execute("select id, name, state from zxtesting order by id") self.assertEquals(1, c.fetchone()[0]) self.assertEquals(2, c.fetchone()[0]) self.assertEquals(3, c.fetchone()[0]) # move back two and fetch the row again c.scroll(-2) self.assertEquals(2, c.fetchone()[0]) # move to the fifth row (0-based indexing) c.scroll(4, "absolute") self.assertEquals(5, c.fetchone()[0]) # move back to the start c.scroll(-5) self.assertEquals(1, c.fetchone()[0]) # move to the end c.scroll(6, "absolute") self.assertEquals(7, c.fetchone()[0]) # make sure we get an IndexError self.assertRaises(IndexError, c.scroll, 1, "relative") self.assertRaises(IndexError, c.scroll, -1, "absolute") self.assertRaises(zxJDBC.ProgrammingError, c.scroll, 1, "somethingelsealltogether") finally: c.close() def testDynamicCursorScrolling(self): """testing the ability to scroll a dynamic cursor""" self._test_scrolling(1) def testStaticCursorScrolling(self): """testing the ability to scroll a static cursor""" self._test_scrolling(0) def _test_rownumber(self, dynamic=0): if self.vendor.scroll: c = self.cursor(dynamic, rstype=getattr(zxJDBC, self.vendor.scroll), rsconcur=zxJDBC.CONCUR_READ_ONLY ) else: c = self.cursor(dynamic) try: if not dynamic: # a dynamic cursor doesn't know if any rows really exist # maybe the 'possibility' of rows should change .rownumber to 0? c.execute("select * from zxtesting where 1=0") self.assertEquals(0, c.rownumber) c.execute("select * from zxtesting") self.assertEquals(0, c.rownumber) c.next() self.assertEquals(1, c.rownumber) c.next() self.assertEquals(2, c.rownumber) c.scroll(-1) self.assertEquals(1, c.rownumber) c.scroll(2, "absolute") self.assertEquals(2, c.rownumber) c.scroll(6, "absolute") self.assertEquals(6, c.rownumber) finally: c.close() self.assertEquals(None, c.rownumber) def testStaticRownumber(self): """testing a static cursor's rownumber""" self._test_rownumber(0) def testDynamicRownumber(self): """testing a dynamic cursor's rownumber""" self._test_rownumber(1) def _test_rowcount(self, dynamic=0): if self.vendor.scroll: c = self.cursor(dynamic, rstype=getattr(zxJDBC, self.vendor.scroll), rsconcur=zxJDBC.CONCUR_READ_ONLY ) else: c = self.cursor(dynamic) try: c.execute("select * from zxtesting") c.next() c.next() c.next() if dynamic: # dynamic cursors only know about the number of rows encountered self.assertEquals(3, c.rowcount) else: self.assertEquals(7, c.rowcount) c.scroll(-1) # make sure they don't change just because we scrolled backwards if dynamic: # dynamic cursors only know about the number of rows encountered self.assertEquals(3, c.rowcount) else: self.assertEquals(7, c.rowcount) finally: c.close() def testStaticRowcount(self): """testing a static cursor's rowcount""" self._test_rowcount(0) def testDynamicRowcount(self): """testing a dynamic cursor's rowcount""" self._test_rowcount(1)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?