zxtest.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,243 行 · 第 1/3 页
PY
1,243 行
def testTableTypeInfo(self): """testing cursor.gettabletypeinfo()""" c = self.cursor() try: c.gettabletypeinfo() c.fetchall() assert c.rowcount > 0, "expected some table types" finally: c.close() def testTupleParams(self): """testing the different ways to pass params to execute()""" c = self.cursor() try: self.assertRaises(zxJDBC.ProgrammingError, c.execute, "select * from zxtesting where id = ?", params=4) c.execute("select * from zxtesting where id = ?", params=[4]) c.execute("select * from zxtesting where id = ?", params=(4,)) finally: c.close() def testConnectionAttribute(self): """testing the getting and setting of cursor.connection""" c = self.cursor() try: from com.ziclix.python.sql import PyConnection assert isinstance(c.connection, PyConnection), "expected PyConnection" self.assertRaises(TypeError, setattr, (c, "connection", None), None) finally: c.close() def testFetchMany(self): """testing cursor.fetchmany()""" c = self.cursor() try: c.execute("select * from zxtesting") data = c.fetchmany(6) assert len(data) == 6, "expected [6] rows, got [%d]" % (len(data)) c.execute("select * from zxtesting") data = c.fetchmany(16) assert len(data) == 7, "expected [7] rows, got [%d]" % (len(data)) finally: c.close() def testQueryWithParameter(self): """testing query by parameter""" c = self.cursor() try: c.execute("select name from zxtesting where state = ?", [("il",)], {0:zxJDBC.VARCHAR}) data = c.fetchall() assert len(data) == 2, "expected [2] rows, got [%d]" % (len(data)) c.execute("select name from zxtesting where state = ?", [("co",)], {0:zxJDBC.VARCHAR}) data = c.fetchall() assert len(data) == 1, "expected [1] row, got [%d]" % (len(data)) finally: c.close() def testInsertWithFile(self): """testing insert with file""" assert self.has_table("texttable"), "missing attribute texttable" fp = open(tempfile.mktemp(), "w") c = self.cursor() try: try: c.execute(self.table("texttable")[1]) data = fp.name * 300 data = data[:3500] fp.write(data) fp.flush() fp.close() fp = open(fp.name, "r") c.execute("insert into %s (a, b) values (?, ?)" % (self.table("texttable")[0]), [(0, fp)], {1:zxJDBC.LONGVARCHAR}) self.db.commit() c.execute("select b from %s" % (self.table("texttable")[0])) f = c.fetchall() assert len(f) == 1, "expected [1] row, got [%d]" % (len(f)) assert len(f[0][0]) == len(data), "expected [%d], got [%d]" % (len(data), len(f[0][0])) assert data == f[0][0], "failed to retrieve the same text as inserted" except Exception, e: raise e finally: c.execute("drop table %s" % (self.table("texttable")[0])) c.close() self.db.commit() fp.close() os.remove(fp.name) def calendar(self): c = Calendar.getInstance() c.setTime(JDate()) return c def testDate(self): """testing creation of Date""" # Java uses milliseconds and Python uses seconds, so adjust the time accordingly # seeded with Java c = self.calendar() o = zxJDBC.DateFromTicks(c.getTime().getTime() / 1000L) v = zxJDBC.Date(c.get(Calendar.YEAR), c.get(Calendar.MONTH) + 1, c.get(Calendar.DATE)) assert o.equals(v), "incorrect date conversion using java, got [%ld], expected [%ld]" % (v.getTime(), o.getTime()) # seeded with Python t = time.time() l = time.localtime(t) o = zxJDBC.DateFromTicks(t) v = zxJDBC.Date(l[0], l[1], l[2]) assert o.equals(v), "incorrect date conversion, got [%ld], expected [%ld]" % (v.getTime(), o.getTime()) def testTime(self): """testing creation of Time""" # Java uses milliseconds and Python uses seconds, so adjust the time accordingly # seeded with Java c = self.calendar() o = zxJDBC.TimeFromTicks(c.getTime().getTime() / 1000L) v = zxJDBC.Time(c.get(Calendar.HOUR), c.get(Calendar.MINUTE), c.get(Calendar.SECOND)) assert o.equals(v), "incorrect date conversion using java, got [%ld], expected [%ld]" % (v.getTime(), o.getTime()) # seeded with Python #t = time.time() #l = time.localtime(t) #o = zxJDBC.TimeFromTicks(t) #v = zxJDBC.Time(l[3], l[4], l[5]) #assert o.equals(v), "incorrect date conversion using python, got [%ld], expected [%ld]" % (v.getTime(), o.getTime()) def testTimestamp(self): """testing creation of Timestamp""" # Java uses milliseconds and Python uses seconds, so adjust the time accordingly # seeded with Java c = self.calendar() o = zxJDBC.TimestampFromTicks(c.getTime().getTime() / 1000L) v = zxJDBC.Timestamp(c.get(Calendar.YEAR), c.get(Calendar.MONTH) + 1, c.get(Calendar.DATE), c.get(Calendar.HOUR), c.get(Calendar.MINUTE), c.get(Calendar.SECOND)) assert o.equals(v), "incorrect date conversion using java, got [%ld], expected [%ld]" % (v.getTime(), o.getTime()) # seeded with Python #t = time.time() #l = time.localtime(t) #o = zxJDBC.TimestampFromTicks(t) #v = zxJDBC.Timestamp(l[0], l[1], l[2], l[3], l[4], l[5]) #assert o.equals(v), "incorrect date conversion using python, got [%ld], expected [%ld]" % (v.getTime(), o.getTime()) def _test_precision(self, (tabname, sql), diff, values, attr): try: c = self.cursor() try: c.execute("drop table %s" % (tabname)) self.db.commit() except: self.db.rollback() finally: c.close() try: c = self.cursor() c.execute(sql) c.execute("insert into %s (a, b) values (?, ?)" % (tabname), map(lambda x: (0, x), values)) c.execute("select a, b from %s" % (tabname)) f = c.fetchall() assert len(values) == len(f), "mismatched result set length" for i in range(0, len(f)): v = values[i] if attr: v = getattr(v, attr)() msg = "expected [%0.10f], got [%0.10f] for index [%d] of [%d]" % (v, f[i][1], (i+1), len(f)) assert diff(f[i][1], values[i]) < 0.01, msg self.db.commit() finally: c.close() try: c = self.cursor() try: c.execute("drop table %s" % (tabname)) self.db.commit() except: self.db.rollback() finally: c.close() def testFloat(self): """testing value of float""" assert self.has_table("floattable"), "missing attribute floattable" values = [4.22, 123.44, 292.09, 33.2, 102.00, 445] self._test_precision(self.table("floattable"), lambda x, y: x-y, values, None) def testBigDecimal(self): """testing value of BigDecimal""" assert self.has_table("floattable"), "missing attribute floattable" from java.math import BigDecimal values = [BigDecimal(x).setScale(2, BigDecimal.ROUND_UP) for x in [4.22, 123.44, 292.09, 33.2, 102.00, 445]] self._test_precision(self.table("floattable"), lambda x, y, b=BigDecimal: b(x).subtract(y).doubleValue(), values, "doubleValue") def testBigDecimalConvertedToDouble(self): """testing value of BigDecimal when converted to double""" assert self.has_table("floattable"), "missing attribute floattable" from java.math import BigDecimal values = [BigDecimal(x).setScale(2, BigDecimal.ROUND_UP) for x in [4.22, 123.44, 292.09, 33.2, 102.00, 445]] self._test_precision(self.table("floattable"), lambda x, y: x - y.doubleValue(), values, "doubleValue") def testNextset(self): """testing nextset""" c = self.cursor() try: c.execute("select * from zxtesting where id = ?", [(3,), (4,)]) f = c.fetchall() assert f, "expected results, got None" assert len(f) == 1, "expected [1], got [%d]" % (len(f)) assert c.nextset(), "expected next set, got None" f = c.fetchall() assert f, "expected results after call to nextset(), got None" assert len(f) == 1, "expected [1], got [%d]" % (len(f)) finally: c.close() def testJavaUtilList(self): """testing parameterized values in a java.util.List""" c = self.cursor() try: from java.util import LinkedList a = LinkedList() a.add((3,)) c.execute("select * from zxtesting where id = ?", a) f = c.fetchall() assert len(f) == 1, "expected [1], got [%d]" % (len(f)) finally: c.close() def testUpdateCount(self): """testing update count functionality""" c = self.cursor() try: c.execute("insert into zxtesting values (?, ?, ?)", [(500, 'bz', 'or')]) assert c.updatecount == 1, "expected [1], got [%d]" % (c.updatecount) c.execute("select * from zxtesting") self.assertEquals(None, c.updatecount) # there's a *feature* in the mysql engine where it returns 0 for delete if there is no # where clause, regardless of the actual value. using a where clause forces it to calculate # the appropriate value c.execute("delete from zxtesting where 1>0") assert c.updatecount == 8, "expected [8], got [%d]" % (c.updatecount) c.execute("update zxtesting set name = 'nothing'") self.assertEquals(0, c.updatecount) finally: c.close() def _test_time(self, (tabname, sql), factory, values, _type, _cmp=cmp, datahandler=None): c = self.cursor() if datahandler: c.datahandler = datahandler(c.datahandler) try: c.execute(sql) dates = map(lambda x, f=factory: apply(f, x), values) for a in dates: c.execute("insert into %s values (1, ?)" % (tabname), [(a,)], {0:_type}) self.db.commit() c.execute("select * from %s where b = ?" % (tabname), [(dates[0],)], {0:_type}) f = c.fetchall() assert len(f) == 1, "expected length [1], got [%d]" % (len(f)) assert _cmp(f[0][1], dates[0]) == 0, "expected date [%s], got [%s]" % (str(dates[0]), str(f[0][1])) c.execute("delete from %s where b = ?" % (tabname), [(dates[1],)], {0:_type}) self.db.commit() c.execute("select * from %s" % (tabname)) f = c.fetchall() self.assertEquals(len(f), len(dates) - 1) finally: c.execute("drop table %s" % (tabname)) c.close() self.db.commit() def testUpdateSelectByDate(self): """testing insert, update, query and delete by java.sql.Date""" assert self.has_table("datetable"), "missing attribute datetable" def _cmp_(x, y): xt = (x.getYear(), x.getMonth(), x.getDay()) yt = (y.getYear(), y.getMonth(), y.getDay()) return not xt == yt values = [(1996, 6, 22), (2000, 11, 12), (2000, 1, 12), (1999, 9, 24)] self._test_time(self.table("datetable"), zxJDBC.Date, values, zxJDBC.DATE, _cmp_) def testUpdateSelectByTime(self): """testing insert, update, query and delete by java.sql.Time""" assert self.has_table("timetable"), "missing attribute timetable" def _cmp_(x, y): xt = (x.getHours(), x.getMinutes(), x.getSeconds()) yt = (y.getHours(), y.getMinutes(), y.getSeconds()) return not xt == yt values = [(10, 11, 12), (3, 1, 12), (22, 9, 24)] self._test_time(self.table("timetable"), zxJDBC.Time, values, zxJDBC.TIME, _cmp_) def testUpdateSelectByTimestamp(self): """testing insert, update, query and delete by java.sql.Timestamp""" assert self.has_table("timestamptable"), "missing attribute timestamptable" def _cmp_(x, y): xt = (x.getYear(), x.getMonth(), x.getDay(), x.getHours(), x.getMinutes(), x.getSeconds()) yt = (y.getYear(), y.getMonth(), y.getDay(), y.getHours(), y.getMinutes(), y.getSeconds()) return not xt == yt values = [(1996, 6, 22, 10, 11, 12), (2000, 11, 12, 3, 1, 12), (2001, 1, 12, 4, 9, 24)] self._test_time(self.table("timestamptable"), zxJDBC.Timestamp, values, zxJDBC.TIMESTAMP, _cmp_) def testOrderOfArgsMaxRowsOnly(self): """testing execute with max rows only""" c = self.cursor() try: # maxrows only (SAPDB doesn't support maxrows as of version 7.2.0) c.execute("select * from zxtesting", maxrows=3) f = c.fetchall() assert len(f) == 3, "expected length [3], got [%d]" % (len(f)) finally: c.close() self.db.commit() def testOrderOfArgs(self): """testing execute with different argument orderings""" c = self.cursor() try: # bindings and params flipped c.execute("select * from zxtesting where id = ?", bindings={0:zxJDBC.INTEGER}, params=[(3,)]) f = c.fetchall() assert len(f) == 1, "expected length [1], got [%d]" % (len(f)) # bindings and params flipped, empty params c.execute("select * from zxtesting where id = ?", bindings={}, params=[(3,)]) f = c.fetchall() assert len(f) == 1, "expected length [1], got [%d]" % (len(f)) # bindings and params flipped, empty params, empty bindings c.execute("select * from zxtesting where id = 3", bindings={}, params=[]) f = c.fetchall() assert len(f) == 1, "expected length [1], got [%d]" % (len(f)) finally: c.close() self.db.commit() def testMaxrows(self): """testing maxrows""" c = self.cursor() try: c.execute("select * from zxtesting", maxrows=3) self.assertEquals(3, len(c.fetchall())) c.execute("select * from zxtesting where id > ?", (1,), maxrows=3) self.assertEquals(3, len(c.fetchall())) c.execute("select count(*) from zxtesting") f = c.fetchall() num = f[0][0] c.execute("select * from zxtesting", maxrows=0) self.assertEquals(num, len(c.fetchall())) finally: c.close() self.db.commit() def testPrimaryKey(self): """testing for primary key information""" c = self.cursor() try: c.primarykeys(None, None, "zxtesting") f = c.fetchall() assert len(f) == 1, "expected [1], got [%d]" % (len(f)) assert f[0][3].lower() == "id", "expected [id], got [%s]" % (f[0][3]) finally: c.close() self.db.commit() def testForeignKey(self): """testing for foreign key information""" pass def testIndexInfo(self): """testing index information""" c = self.cursor() try: c.statistics(None, None, "zxtesting", 0, 0) f = c.fetchall() assert f is not None, "expected some values" # filter out any indicies with name None f = filter(lambda x: x[5], f) assert len(f) == 1, "expected [1], got [%d]" % (len(f)) finally: c.close() def _test_fetching(self, dynamic=0): c = self.cursor(dynamic) try: # make sure None if the result is an empty result set c.execute("select * from zxtesting where 1<0") self.assertEquals(None, c.fetchone()) # make sure an empty sequence if the result is an empty result set c.execute("select * from zxtesting where 1<0") self.assertEquals([], c.fetchmany()) # make sure an empty sequence if the result is an empty result set c.execute("select * from zxtesting where 1<0") self.assertEquals([], c.fetchall()) # test some arraysize features c.execute("select * from zxtesting") f = c.fetchmany() assert len(f) == c.arraysize, "expecting [%d] rows, got [%d]" % (c.arraysize, len(f)) c.execute("select * from zxtesting") c.arraysize = 4 f = c.fetchmany() assert len(f) == 4, "expecting [4] rows, got [%d]" % (len(f)) c.execute("select * from zxtesting")
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?