⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dbapi20.py

📁 python联接mysql驱动 python联接mysql驱动
💻 PY
📖 第 1 页 / 共 3 页
字号:
                'results are exhausted'            )            self.failUnless(cur.rowcount in (-1,6))            # Same as above, using cursor.arraysize            cur.arraysize=4            cur.execute('select name from %sbooze' % self.table_prefix)            r = cur.fetchmany() # Should get 4 rows            self.assertEqual(len(r),4,                'cursor.arraysize not being honoured by fetchmany'                )            r = cur.fetchmany() # Should get 2 more            self.assertEqual(len(r),2)            r = cur.fetchmany() # Should be an empty sequence            self.assertEqual(len(r),0)            self.failUnless(cur.rowcount in (-1,6))            cur.arraysize=6            cur.execute('select name from %sbooze' % self.table_prefix)            rows = cur.fetchmany() # Should get all rows            self.failUnless(cur.rowcount in (-1,6))            self.assertEqual(len(rows),6)            self.assertEqual(len(rows),6)            rows = [r[0] for r in rows]            rows.sort()                      # Make sure we get the right data back out            for i in range(0,6):                self.assertEqual(rows[i],self.samples[i],                    'incorrect data retrieved by cursor.fetchmany'                    )            rows = cur.fetchmany() # Should return an empty list            self.assertEqual(len(rows),0,                'cursor.fetchmany should return an empty sequence if '                'called after the whole result set has been fetched'                )            self.failUnless(cur.rowcount in (-1,6))            self.executeDDL2(cur)            cur.execute('select name from %sbarflys' % self.table_prefix)            r = cur.fetchmany() # Should get empty sequence            self.assertEqual(len(r),0,                'cursor.fetchmany should return an empty sequence if '                'query retrieved no rows'                )            self.failUnless(cur.rowcount in (-1,0))        finally:            con.close()    def test_fetchall(self):        con = self._connect()        try:            cur = con.cursor()            # cursor.fetchall should raise an Error if called            # without executing a query that may return rows (such            # as a select)            self.assertRaises(self.driver.Error, cur.fetchall)            self.executeDDL1(cur)            for sql in self._populate():                cur.execute(sql)            # cursor.fetchall should raise an Error if called            # after executing a a statement that cannot return rows            self.assertRaises(self.driver.Error,cur.fetchall)            cur.execute('select name from %sbooze' % self.table_prefix)            rows = cur.fetchall()            self.failUnless(cur.rowcount in (-1,len(self.samples)))            self.assertEqual(len(rows),len(self.samples),                'cursor.fetchall did not retrieve all rows'                )            rows = [r[0] for r in rows]            rows.sort()            for i in range(0,len(self.samples)):                self.assertEqual(rows[i],self.samples[i],                'cursor.fetchall retrieved incorrect rows'                )            rows = cur.fetchall()            self.assertEqual(                len(rows),0,                'cursor.fetchall should return an empty list if called '                'after the whole result set has been fetched'                )            self.failUnless(cur.rowcount in (-1,len(self.samples)))            self.executeDDL2(cur)            cur.execute('select name from %sbarflys' % self.table_prefix)            rows = cur.fetchall()            self.failUnless(cur.rowcount in (-1,0))            self.assertEqual(len(rows),0,                'cursor.fetchall should return an empty list if '                'a select query returns no rows'                )                    finally:            con.close()        def test_mixedfetch(self):        con = self._connect()        try:            cur = con.cursor()            self.executeDDL1(cur)            for sql in self._populate():                cur.execute(sql)            cur.execute('select name from %sbooze' % self.table_prefix)            rows1  = cur.fetchone()            rows23 = cur.fetchmany(2)            rows4  = cur.fetchone()            rows56 = cur.fetchall()            self.failUnless(cur.rowcount in (-1,6))            self.assertEqual(len(rows23),2,                'fetchmany returned incorrect number of rows'                )            self.assertEqual(len(rows56),2,                'fetchall returned incorrect number of rows'                )            rows = [rows1[0]]            rows.extend([rows23[0][0],rows23[1][0]])            rows.append(rows4[0])            rows.extend([rows56[0][0],rows56[1][0]])            rows.sort()            for i in range(0,len(self.samples)):                self.assertEqual(rows[i],self.samples[i],                    'incorrect data retrieved or inserted'                    )        finally:            con.close()    def help_nextset_setUp(self,cur):        ''' Should create a procedure called deleteme            that returns two result sets, first the 	    number of rows in booze then "name from booze"        '''        raise NotImplementedError,'Helper not implemented'        #sql="""        #    create procedure deleteme as        #    begin        #        select count(*) from booze        #        select name from booze        #    end        #"""        #cur.execute(sql)    def help_nextset_tearDown(self,cur):        'If cleaning up is needed after nextSetTest'        raise NotImplementedError,'Helper not implemented'        #cur.execute("drop procedure deleteme")    def test_nextset(self):        con = self._connect()        try:            cur = con.cursor()            if not hasattr(cur,'nextset'):                return            try:                self.executeDDL1(cur)                sql=self._populate()                for sql in self._populate():                    cur.execute(sql)                self.help_nextset_setUp(cur)                cur.callproc('deleteme')                numberofrows=cur.fetchone()                assert numberofrows[0]== len(self.samples)                assert cur.nextset()                names=cur.fetchall()                assert len(names) == len(self.samples)                s=cur.nextset()                assert s == None,'No more return sets, should return None'            finally:                self.help_nextset_tearDown(cur)        finally:            con.close()    def test_nextset(self):        raise NotImplementedError,'Drivers need to override this test'    def test_arraysize(self):        # Not much here - rest of the tests for this are in test_fetchmany        con = self._connect()        try:            cur = con.cursor()            self.failUnless(hasattr(cur,'arraysize'),                'cursor.arraysize must be defined'                )        finally:            con.close()    def test_setinputsizes(self):        con = self._connect()        try:            cur = con.cursor()            cur.setinputsizes( (25,) )            self._paraminsert(cur) # Make sure cursor still works        finally:            con.close()    def test_setoutputsize_basic(self):        # Basic test is to make sure setoutputsize doesn't blow up        con = self._connect()        try:            cur = con.cursor()            cur.setoutputsize(1000)            cur.setoutputsize(2000,0)            self._paraminsert(cur) # Make sure the cursor still works        finally:            con.close()    def test_setoutputsize(self):        # Real test for setoutputsize is driver dependant        raise NotImplementedError,'Driver need to override this test'    def test_None(self):        con = self._connect()        try:            cur = con.cursor()            self.executeDDL1(cur)            cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)            cur.execute('select name from %sbooze' % self.table_prefix)            r = cur.fetchall()            self.assertEqual(len(r),1)            self.assertEqual(len(r[0]),1)            self.assertEqual(r[0][0],None,'NULL value not returned as None')        finally:            con.close()    def test_Date(self):        d1 = self.driver.Date(2002,12,25)        d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))        # Can we assume this? API doesn't specify, but it seems implied        # self.assertEqual(str(d1),str(d2))    def test_Time(self):        t1 = self.driver.Time(13,45,30)        t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))        # Can we assume this? API doesn't specify, but it seems implied        # self.assertEqual(str(t1),str(t2))    def test_Timestamp(self):        t1 = self.driver.Timestamp(2002,12,25,13,45,30)        t2 = self.driver.TimestampFromTicks(            time.mktime((2002,12,25,13,45,30,0,0,0))            )        # Can we assume this? API doesn't specify, but it seems implied        # self.assertEqual(str(t1),str(t2))    def test_Binary(self):        b = self.driver.Binary('Something')        b = self.driver.Binary('')    def test_STRING(self):        self.failUnless(hasattr(self.driver,'STRING'),            'module.STRING must be defined'            )    def test_BINARY(self):        self.failUnless(hasattr(self.driver,'BINARY'),            'module.BINARY must be defined.'            )    def test_NUMBER(self):        self.failUnless(hasattr(self.driver,'NUMBER'),            'module.NUMBER must be defined.'            )    def test_DATETIME(self):        self.failUnless(hasattr(self.driver,'DATETIME'),            'module.DATETIME must be defined.'            )    def test_ROWID(self):        self.failUnless(hasattr(self.driver,'ROWID'),            'module.ROWID must be defined.'            )

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -