zxtest.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,243 行 · 第 1/3 页

PY
1,243
字号
	def testTableTypeInfo(self):		"""testing cursor.gettabletypeinfo()"""		c = self.cursor()		try:			c.gettabletypeinfo()			c.fetchall()			assert c.rowcount > 0, "expected some table types"		finally:			c.close()	def testTupleParams(self):		"""testing the different ways to pass params to execute()"""		c = self.cursor()		try:			self.assertRaises(zxJDBC.ProgrammingError, c.execute, "select * from zxtesting where id = ?", params=4)			c.execute("select * from zxtesting where id = ?", params=[4])			c.execute("select * from zxtesting where id = ?", params=(4,))		finally:			c.close()	def testConnectionAttribute(self):		"""testing the getting and setting of cursor.connection"""		c = self.cursor()		try:			from com.ziclix.python.sql import PyConnection			assert isinstance(c.connection, PyConnection), "expected PyConnection"			self.assertRaises(TypeError, setattr, (c, "connection", None), None)		finally:			c.close()	def testFetchMany(self):		"""testing cursor.fetchmany()"""		c = self.cursor()		try:			c.execute("select * from zxtesting")			data = c.fetchmany(6)			assert len(data) == 6, "expected [6] rows, got [%d]" % (len(data))			c.execute("select * from zxtesting")			data = c.fetchmany(16)			assert len(data) == 7, "expected [7] rows, got [%d]" % (len(data))		finally:			c.close()	def testQueryWithParameter(self):		"""testing query by parameter"""		c = self.cursor()		try:			c.execute("select name from zxtesting where state = ?", [("il",)], {0:zxJDBC.VARCHAR})			data = c.fetchall()			assert len(data) == 2, "expected [2] rows, got [%d]" % (len(data))			c.execute("select name from zxtesting where state = ?", [("co",)], {0:zxJDBC.VARCHAR})			data = c.fetchall()			assert len(data) == 1, "expected [1] row, got [%d]" % (len(data))		finally:			c.close()	def testInsertWithFile(self):		"""testing insert with file"""		assert self.has_table("texttable"), "missing attribute texttable"		fp = open(tempfile.mktemp(), "w")		c = self.cursor()		try:			try:				c.execute(self.table("texttable")[1])				data = fp.name * 300				data = data[:3500]				fp.write(data)				fp.flush()				fp.close()				fp = open(fp.name, "r")				c.execute("insert into %s (a, b) values (?, ?)" % (self.table("texttable")[0]), [(0, fp)], {1:zxJDBC.LONGVARCHAR})				self.db.commit()				c.execute("select b from %s" % (self.table("texttable")[0]))				f = c.fetchall()				assert len(f) == 1, "expected [1] row, got [%d]" % (len(f))				assert len(f[0][0]) == len(data), "expected [%d], got [%d]" % (len(data), len(f[0][0]))				assert data == f[0][0], "failed to retrieve the same text as inserted"			except Exception, e:				raise e		finally:			c.execute("drop table %s" % (self.table("texttable")[0]))			c.close()			self.db.commit()			fp.close()			os.remove(fp.name)	def calendar(self):		c = Calendar.getInstance()		c.setTime(JDate())		return c	def testDate(self):		"""testing creation of Date"""		# Java uses milliseconds and Python uses seconds, so adjust the time accordingly		# seeded with Java		c = self.calendar()		o = zxJDBC.DateFromTicks(c.getTime().getTime() / 1000L)		v = zxJDBC.Date(c.get(Calendar.YEAR), c.get(Calendar.MONTH) + 1, c.get(Calendar.DATE))		assert o.equals(v), "incorrect date conversion using java, got [%ld], expected [%ld]" % (v.getTime(), o.getTime())		# seeded with Python		t = time.time()		l = time.localtime(t)		o = zxJDBC.DateFromTicks(t)		v = zxJDBC.Date(l[0], l[1], l[2])		assert o.equals(v), "incorrect date conversion, got [%ld], expected [%ld]" % (v.getTime(), o.getTime())	def testTime(self):		"""testing creation of Time"""		# Java uses milliseconds and Python uses seconds, so adjust the time accordingly		# seeded with Java		c = self.calendar()		o = zxJDBC.TimeFromTicks(c.getTime().getTime() / 1000L)		v = zxJDBC.Time(c.get(Calendar.HOUR), c.get(Calendar.MINUTE), c.get(Calendar.SECOND))		assert o.equals(v), "incorrect date conversion using java, got [%ld], expected [%ld]" % (v.getTime(), o.getTime())		# seeded with Python		#t = time.time()		#l = time.localtime(t)		#o = zxJDBC.TimeFromTicks(t)		#v = zxJDBC.Time(l[3], l[4], l[5])		#assert o.equals(v), "incorrect date conversion using python, got [%ld], expected [%ld]" % (v.getTime(), o.getTime())	def testTimestamp(self):		"""testing creation of Timestamp"""		# Java uses milliseconds and Python uses seconds, so adjust the time accordingly		# seeded with Java		c = self.calendar()		o = zxJDBC.TimestampFromTicks(c.getTime().getTime() / 1000L)		v = zxJDBC.Timestamp(c.get(Calendar.YEAR), c.get(Calendar.MONTH) + 1, c.get(Calendar.DATE),			c.get(Calendar.HOUR), c.get(Calendar.MINUTE), c.get(Calendar.SECOND))		assert o.equals(v), "incorrect date conversion using java, got [%ld], expected [%ld]" % (v.getTime(), o.getTime())		# seeded with Python		#t = time.time()		#l = time.localtime(t)		#o = zxJDBC.TimestampFromTicks(t)		#v = zxJDBC.Timestamp(l[0], l[1], l[2], l[3], l[4], l[5])		#assert o.equals(v), "incorrect date conversion using python, got [%ld], expected [%ld]" % (v.getTime(), o.getTime())	def _test_precision(self, (tabname, sql), diff, values, attr):		try:			c = self.cursor()			try:				c.execute("drop table %s" % (tabname))				self.db.commit()			except:				self.db.rollback()		finally:			c.close()		try:			c = self.cursor()			c.execute(sql)			c.execute("insert into %s (a, b) values (?, ?)" % (tabname), map(lambda x: (0, x), values))			c.execute("select a, b from %s" % (tabname))			f = c.fetchall()			assert len(values) == len(f), "mismatched result set length"			for i in range(0, len(f)):				v = values[i]				if attr: v = getattr(v, attr)()				msg = "expected [%0.10f], got [%0.10f] for index [%d] of [%d]" % (v, f[i][1], (i+1), len(f))				assert diff(f[i][1], values[i]) < 0.01, msg			self.db.commit()		finally:			c.close()			try:				c = self.cursor()				try:					c.execute("drop table %s" % (tabname))					self.db.commit()				except:					self.db.rollback()			finally:				c.close()	def testFloat(self):		"""testing value of float"""		assert self.has_table("floattable"), "missing attribute floattable"		values = [4.22, 123.44, 292.09, 33.2, 102.00, 445]		self._test_precision(self.table("floattable"), lambda x, y: x-y, values, None)	def testBigDecimal(self):		"""testing value of BigDecimal"""		assert self.has_table("floattable"), "missing attribute floattable"		from java.math import BigDecimal		values = [BigDecimal(x).setScale(2, BigDecimal.ROUND_UP) for x in [4.22, 123.44, 292.09, 33.2, 102.00, 445]]		self._test_precision(self.table("floattable"), lambda x, y, b=BigDecimal: b(x).subtract(y).doubleValue(), values, "doubleValue")	def testBigDecimalConvertedToDouble(self):		"""testing value of BigDecimal when converted to double"""		assert self.has_table("floattable"), "missing attribute floattable"		from java.math import BigDecimal		values = [BigDecimal(x).setScale(2, BigDecimal.ROUND_UP) for x in [4.22, 123.44, 292.09, 33.2, 102.00, 445]]		self._test_precision(self.table("floattable"), lambda x, y: x - y.doubleValue(), values, "doubleValue")	def testNextset(self):		"""testing nextset"""		c = self.cursor()		try:			c.execute("select * from zxtesting where id = ?", [(3,), (4,)])			f = c.fetchall()			assert f, "expected results, got None"			assert len(f) == 1, "expected [1], got [%d]" % (len(f))			assert c.nextset(), "expected next set, got None"			f = c.fetchall()			assert f, "expected results after call to nextset(), got None"			assert len(f) == 1, "expected [1], got [%d]" % (len(f))		finally:			c.close()	def testJavaUtilList(self):		"""testing parameterized values in a java.util.List"""		c = self.cursor()		try:			from java.util import LinkedList			a = LinkedList()			a.add((3,))			c.execute("select * from zxtesting where id = ?", a)			f = c.fetchall()			assert len(f) == 1, "expected [1], got [%d]" % (len(f))		finally:			c.close()	def testUpdateCount(self):		"""testing update count functionality"""		c = self.cursor()		try:			c.execute("insert into zxtesting values (?, ?, ?)", [(500, 'bz', 'or')])			assert c.updatecount == 1, "expected [1], got [%d]" % (c.updatecount)			c.execute("select * from zxtesting")			self.assertEquals(None, c.updatecount)			# there's a *feature* in the mysql engine where it returns 0 for delete if there is no			#  where clause, regardless of the actual value.  using a where clause forces it to calculate			#  the appropriate value			c.execute("delete from zxtesting where 1>0")			assert c.updatecount == 8, "expected [8], got [%d]" % (c.updatecount)			c.execute("update zxtesting set name = 'nothing'")			self.assertEquals(0, c.updatecount)		finally:			c.close()	def _test_time(self, (tabname, sql), factory, values, _type, _cmp=cmp, datahandler=None):		c = self.cursor()		if datahandler: c.datahandler = datahandler(c.datahandler)		try:			c.execute(sql)			dates = map(lambda x, f=factory: apply(f, x), values)			for a in dates:				c.execute("insert into %s values (1, ?)" % (tabname), [(a,)], {0:_type})			self.db.commit()			c.execute("select * from %s where b = ?" % (tabname), [(dates[0],)], {0:_type})			f = c.fetchall()			assert len(f) == 1, "expected length [1], got [%d]" % (len(f))			assert _cmp(f[0][1], dates[0]) == 0, "expected date [%s], got [%s]" % (str(dates[0]), str(f[0][1]))			c.execute("delete from %s where b = ?" % (tabname), [(dates[1],)], {0:_type})			self.db.commit()			c.execute("select * from %s" % (tabname))			f = c.fetchall()			self.assertEquals(len(f), len(dates) - 1)		finally:			c.execute("drop table %s" % (tabname))			c.close()			self.db.commit()	def testUpdateSelectByDate(self):		"""testing insert, update, query and delete by java.sql.Date"""		assert self.has_table("datetable"), "missing attribute datetable"		def _cmp_(x, y):			xt = (x.getYear(), x.getMonth(), x.getDay())			yt = (y.getYear(), y.getMonth(), y.getDay())			return not xt == yt		values = [(1996, 6, 22), (2000, 11, 12), (2000, 1, 12), (1999, 9, 24)]		self._test_time(self.table("datetable"), zxJDBC.Date, values, zxJDBC.DATE, _cmp_)	def testUpdateSelectByTime(self):		"""testing insert, update, query and delete by java.sql.Time"""		assert self.has_table("timetable"), "missing attribute timetable"		def _cmp_(x, y):			xt = (x.getHours(), x.getMinutes(), x.getSeconds())			yt = (y.getHours(), y.getMinutes(), y.getSeconds())			return not xt == yt		values = [(10, 11, 12), (3, 1, 12), (22, 9, 24)]		self._test_time(self.table("timetable"), zxJDBC.Time, values, zxJDBC.TIME, _cmp_)	def testUpdateSelectByTimestamp(self):		"""testing insert, update, query and delete by java.sql.Timestamp"""		assert self.has_table("timestamptable"), "missing attribute timestamptable"		def _cmp_(x, y):			xt = (x.getYear(), x.getMonth(), x.getDay(), x.getHours(), x.getMinutes(), x.getSeconds())			yt = (y.getYear(), y.getMonth(), y.getDay(), y.getHours(), y.getMinutes(), y.getSeconds())			return not xt == yt		values = [(1996, 6, 22, 10, 11, 12), (2000, 11, 12, 3, 1, 12), (2001, 1, 12, 4, 9, 24)]		self._test_time(self.table("timestamptable"), zxJDBC.Timestamp, values, zxJDBC.TIMESTAMP, _cmp_)	def testOrderOfArgsMaxRowsOnly(self):		"""testing execute with max rows only"""		c = self.cursor()		try:			# maxrows only (SAPDB doesn't support maxrows as of version 7.2.0)			c.execute("select * from zxtesting", maxrows=3)			f = c.fetchall()			assert len(f) == 3, "expected length [3], got [%d]" % (len(f))		finally:			c.close()			self.db.commit()	def testOrderOfArgs(self):		"""testing execute with different argument orderings"""		c = self.cursor()		try:			# bindings and params flipped			c.execute("select * from zxtesting where id = ?", bindings={0:zxJDBC.INTEGER}, params=[(3,)])			f = c.fetchall()			assert len(f) == 1, "expected length [1], got [%d]" % (len(f))			# bindings and params flipped, empty params			c.execute("select * from zxtesting where id = ?", bindings={}, params=[(3,)])			f = c.fetchall()			assert len(f) == 1, "expected length [1], got [%d]" % (len(f))			# bindings and params flipped, empty params, empty bindings			c.execute("select * from zxtesting where id = 3", bindings={}, params=[])			f = c.fetchall()			assert len(f) == 1, "expected length [1], got [%d]" % (len(f))		finally:			c.close()			self.db.commit()	def testMaxrows(self):		"""testing maxrows"""		c = self.cursor()		try:			c.execute("select * from zxtesting", maxrows=3)			self.assertEquals(3, len(c.fetchall()))			c.execute("select * from zxtesting where id > ?", (1,), maxrows=3)			self.assertEquals(3, len(c.fetchall()))			c.execute("select count(*) from zxtesting")			f = c.fetchall()			num = f[0][0]			c.execute("select * from zxtesting", maxrows=0)			self.assertEquals(num, len(c.fetchall()))		finally:			c.close()			self.db.commit()	def testPrimaryKey(self):		"""testing for primary key information"""		c = self.cursor()		try:			c.primarykeys(None, None, "zxtesting")			f = c.fetchall()			assert len(f) == 1, "expected [1], got [%d]" % (len(f))			assert f[0][3].lower() == "id", "expected [id], got [%s]" % (f[0][3])		finally:			c.close()			self.db.commit()	def testForeignKey(self):		"""testing for foreign key information"""		pass	def testIndexInfo(self):		"""testing index information"""		c = self.cursor()		try:			c.statistics(None, None, "zxtesting", 0, 0)			f = c.fetchall()			assert f is not None, "expected some values"			# filter out any indicies with name None			f = filter(lambda x: x[5], f)			assert len(f) == 1, "expected [1], got [%d]" % (len(f))		finally:			c.close()	def _test_fetching(self, dynamic=0):		c = self.cursor(dynamic)		try:			# make sure None if the result is an empty result set			c.execute("select * from zxtesting where 1<0")			self.assertEquals(None, c.fetchone())			# make sure an empty sequence if the result is an empty result set			c.execute("select * from zxtesting where 1<0")			self.assertEquals([], c.fetchmany())			# make sure an empty sequence if the result is an empty result set			c.execute("select * from zxtesting where 1<0")			self.assertEquals([], c.fetchall())			# test some arraysize features			c.execute("select * from zxtesting")			f = c.fetchmany()			assert len(f) == c.arraysize, "expecting [%d] rows, got [%d]" % (c.arraysize, len(f))			c.execute("select * from zxtesting")			c.arraysize = 4			f = c.fetchmany()			assert len(f) == 4, "expecting [4] rows, got [%d]" % (len(f))			c.execute("select * from zxtesting")

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?