⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 old_dbconn.py

📁 sqlalchemy连接数据库.已经如何保持一个连接:就像java中的one object类似.
💻 PY
字号:
# -*- coding:gb2312 -*- 
#!/usr/bin/env python

from sqlalchemy import *

S_CONN = 'mysql://root:root@127.0.0.1/keyword'

class DbBase:
    #   s_conn = 'mysql://root:root@192.168.200.9/keyword'
    def __init__( self, s_conn = S_CONN ):
        self.__conn = None
        self.setconn( s_conn )
    
    #function:
    def setconn(self, s_conn ):
        engine      = create_engine( s_conn, echo = True)
        self.__conn = engine.connect()
    
    def getConn( self ):
        return self.__conn
    
    #function:
    def closeConn( self ):
        if ( None != self.__conn ):
            self.__conn.close()
            self.__conn = None

    def closeResult ( self ,result ):
        if ( None != result ):
            result.close()
            result = None
    
    #function:    def query(self, s_sql ):
        return self.__execute(s_sql)
    
    #function: 
    #return  :  
    def insert( self, s_sql ):
        return self.__execute( s_sql )
        
    #function: 
    #return  :  
    def update(self, s_sql ):
        return self.__execute( s_sql )
    
    #function:
    #return  :
    def delete(self, s_sql ):
        return self.__execute( s_sql )
    
    #function:
    #return  :
    def __execute( self, s_sql ):
        print "s_sql = [" + s_sql + "]"
        return self.__conn.execute( s_sql )
        
class DbOperator( DbBase ):
    def __init__( self, s_conn = S_CONN ):
        DbBase.__init__( self, s_conn )
    #    
    def query( self, table, s_fieldKeys = "*", dictWhere = None ):
        s_where = self.buildWhere( dictWhere )
        sql     = "select " + s_fieldKeys + " from " + table + " " + s_where
        DbBase.query( self, sql )
    #function: 
    #return  :  
    def insert( self, table, dictField ):
        s_values= self.__getDictValue2Str( dictField )
        s_keys  = self.__getDictKeys2Str( dictField )
        sql     = "insert into " + table + s_keys + " values " + s_values
        print sql
        DbBase.insert( self, sql )
        
    #function:
    #return  :
    def update( self, table, dictUpdtField, dictWhere = None ):
        s_setField = self.__getSetField2Str( dictField )
        s_where    = self.buildWhere( dictWhere )
        sql        = "update " + table + " set " + s_setField + " " + s_where
        DbBase.update( self, sql )
    
    #function:
    #return  :
    def delete( self, table, dictWhere = None ):
        s_where = self.buildWhere( dictWhere )
        sql     = "delete from " + table + " " + s_where
        DbBase.delete( self, sql )
    #
    def buildWhere( self, dictWhere ):
        if ( None == dictWhere ):
            return ""
        s_where = "where "
        
        if ( self.__isNotDict(dictWhere) ):    #type(dictWhere).__name__ != "dict" ):
            s_where+= dictWhere
        else:
            keys   = dictWhere.keys()
            ksLen  = len( keys )
            format = ""
            for key in keys:
                if ( self.__isNotStr( dictWhere[ key ] ) ):                         #type(dictWhere[ key ]) != "str" ):
                    format = "%s = %s"
                else:
                    format = "%s = \'%s\'"
                s_where += format % ( key, dictWhere[ key ] )
                if ( ksLen >= 2 ):
                    s_where += " and "
                ksLen -= 1
        print s_where
        return s_where
    
    #"(k1,k2)","(k1)"
    def __getDictKeys2Str( self, mydict ):
        s_keys = ""
        keys   = mydict.keys()
        format = ""
        ksLen  = len( keys )
        #for key in keys:
            
        ########
        if ( len( keys ) == 1 ):
            s_keys += keys[0]
        else:
            format = "(" + (ksLen -1) * "%s," + "%s" +")"
            tempks =  tuple(keys) #"%s" % keys
            s_keys = format % tuple(keys)
            print s_keys
        return s_keys
    #
    #"(v1,v2)","(v1)"
    def __getDictValue2Str( self, mydict ):
        keys     = mydict.keys()
        ksLen    = len( keys )
        format   = ""
        s_values = "("
        
        for key in keys:
            if ( self.__isNotStr( mydict[ key ] ) ):                 ##type(mydict[ key ]).__name__ != "str" ):
                format = "%s"
            else:
                format = "\'%s\'"
            s_values += format % mydict[ key ]
            if ( ksLen >= 2 ):
                s_values += ","
            ksLen -= 1
        #
        s_values += ")"
        return s_values
    #"flg = \'1\',ff = 2"
    def __getSetField2Str( self, dictField ):
        s_set = ""
        if ( self.__isNotDict( dictField ) ):                         #type(dictField).__name__ != "dict" ):
            s_set += dictField
        else:
            keys   = dictField.keys()
            ksLen  = len( keys )
            format = ""
            for key in keys:
                if ( self.__isNotDict( dictField[key] ) ):                                              #type(dictField[key]).__name__ != "str"):
                    format = "%s = %s"
                else:
                    format = "%s = \'%s\'"
                s_set +=  format % ( key,dictField[key] )
                if ( ksLen >= 2):
                    s_set += ","
                ksLen -= 1
        return s_set
    
    def __isNotStr( self, obj ):
        return not self.__cmpType( obj,"str")
    #
    def __isNotDict( self, obj ):
        return not self.__cmpType( obj,"dict")
    #
    def __cmpType( self, obj, s_type ):
        if (type(obj).__name__ == s_type ):
            return True
        else:
            return False
    
##############################修改!!!!!!!!####################################
#测试针对特定的表:
#sql table
#+-------------+----------------------+------+-----+---------+----------------+
#| Field       | Type                 | Null | Key | Default | Extra          |
#+-------------+----------------------+------+-----+---------+----------------+
#| id          | int(11)              | NO   | PRI | NULL    | auto_increment |
#| title       | varchar(1024)        | NO   |     |         |                |
#| template    | longtext             | NO   |     |         |                |
#| show_number | smallint(5) unsigned | NO   |     |         |                |
#| brief       | longtext             | NO   |     |         |                |
#+-------------+----------------------+------+-----+---------+----------------+
import unittest
import types
import time

class TestSqlBase( unittest.TestCase ):
    #t_db is Db' obj
    #cmp_q   = ( title, '33', 44, '55')
    def __init__( self, t_db, table, fieldKey, cmp_q , where = None ):
        self.fieldKey = fieldKey
        self.table    = table
        if ( None == where ):
            where = fieldKey
        #
        self.t_db     = t_db
        self.s_sql_q  = "select title,template,show_number,brief from " + table + " where title = \'" + fieldKey + "\'"
        self.s_sql_i  = "insert into " + table + " (title,template,show_number,brief) values (\'" + fieldKey + "\',\'33\',44,\'55\')"
        self.s_sql_u  = "update " + table + " set title = \'" + fieldKey + "\'' where title = \'"+ where + "\'"
        self.s_sql_d  = "delete from " + table + " where title = \'" + where + "\'"
        #
        self.cmp_q    = cmp_q
        
    def insert( self ):
        try:
            rt = self.t_db.insert( self.s_sql_i )
            print "insert succeed"
        except  Exception, e:
            print "insert false"
        #self.query()
    
    #
    def query( self ):
        #print "self.s_sql_q===============" + self.s_sql_q
        result = self.t_db.query( self.s_sql_q )
        self.assertEqual( result.fetchone() , self.cmp_q )
    #
    def update( self, fieldKey_old, fieldKey_new ):
        self.s_sql_u  = "update " + self.table + " set title = \'" + fieldKey_new + "\' where title = \'"+ fieldKey_old + "\'"
        try:
            self.t_db.update( self.s_sql_u )
            print "update succeed"
        except  Exception, e:
            print "update false"
        self.s_sql_q  = "select title,template,show_number,brief from " + self.table + " where title = \'" + fieldKey_new + "\'"
        #self.query()
    #
    def delete( self ):
        try:
            self.t_db.delete( self.s_sql_d )
            print "delete succeed"
        except Exception, e:
            print "delete false"
        #self.query()

class TestDbKnownCase ( unittest.TestCase ):
    def setUp( self ):
        self.table = "adv_style"
        self.t_db = DbBase( S_CONN )    #'mysql://root:root@127.0.0.1/keyword' )
    def tearDown( self ):
        self.table= None
        self.t_db = None

    #t_db, table, fieldKey,cmp_q
    def testQuery( self ):
        title = time.ctime()
        cmp_q = ( title, '33', 44, '55')
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
    def testInsert( self ):
        title = time.ctime()
        cmp_q = ( title, '33', 44, '55')
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
    #
    def testUpdate( self ):
        title = time.ctime()
        cmp_q = ( title, '33', 44, '55')
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        title_u = title + "update"
        cmp_q = ( title_u, '33', 44, '55')
        TestSqlBase(self.t_db, self.table, title, cmp_q).update( title, title_u )
        TestSqlBase( self.t_db, self.table, title_u, cmp_q ).query()
    #    
    def testDelete( self ):
        title = time.ctime()
        cmp_q = None#
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        TestSqlBase( self.t_db, self.table, title, cmp_q ).delete()
        TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
    #
    def testCloseConn( self ):
        pass
    def testClossResult( self ):
        pass
class TestDbBadCase ( unittest.TestCase ):
    def setUp( self ):
        self.table = "adv_style"
        self.badTb = self.table + "bad"
        self.t_db = DbBase( S_CONN )#  'mysql://root:root@127.0.0.1/keyword' )
    def tearDown( self ):
        self.table= None
        self.t_db = None
    #
    def testSetConn( self ):
        pass
    def testRollback( self ):
        pass
    
    def testQuery( self ):
        title = time.ctime()
        cmp_q = None
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        title_bad = title + "bad"
        TestSqlBase( self.t_db, self.table, title_bad, cmp_q ).query()
    #
    def testInsert( self ):
        title = time.ctime()
        cmp_q = None
        #table is bad table
        TestSqlBase( self.t_db, self.badTb, title, cmp_q ).insert()#insert to bad table
        #
        TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
    #
    def testUpdate( self ):
        title = time.ctime()
        cmp_q = None
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        title_u  = title + "update"
        title_bad= title + "bad"
        TestSqlBase(self.t_db, self.table, title, cmp_q).update( title_bad, title_u )
        #None
        TestSqlBase( self.t_db, self.table, title_bad, cmp_q ).query()
        cmp_q = (title, '33', 44, '55')
        TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
    #
    def testDelete( self ):
        title = time.ctime()
        cmp_q = None#( title, '33', 44, '55')
        TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
        title_d_bad = title + "bad"
        TestSqlBase( self.t_db, self.table, title_d_bad, cmp_q ).delete()
        #
        cmp_q = ( title, '33', 44, '55')
        TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
        
    #
    def testCloseConn( self ):
        pass
    #
    def testClossResult( self ):
        pass   
#
def suite():
    suite = unittest.TestSuite()
    
    #test succeed case
    testInsertCase = TestDbKnownCase("testInsert")
    suite.addTest( testInsertCase )
    #    testQueryCase = TestDbKnownCase("testQuery")
    suite.addTest( testQueryCase )
    #
    testUpdateCase = TestDbKnownCase("testUpdate")
    suite.addTest( testUpdateCase )
    #
    testDeleteCase = TestDbKnownCase("testDelete")
    suite.addTest( testDeleteCase )
    #
    
    #    
    #test false case    testRollbackCase  = TestDbBadCase("testRollback")
    suite.addTest( testRollbackCase )
    
    testBadInsertCase = TestDbBadCase("testInsert")
    suite.addTest( testBadInsertCase)
    
    testBadQueryCase  = TestDbBadCase("testQuery")
    suite.addTest( testBadQueryCase )
    
    testBadUpdateCase = TestDbBadCase("testUpdate")
    suite.addTest( testBadUpdateCase )
    #
    testBadDeleteCase = TestDbBadCase("testDelete")
    suite.addTest( testBadDeleteCase )
    #
    return suite
    
if (__name__ == "__main__"):
    #unittest.main(defaultTest = 'suite')
    #
    #########################DbOperator基本功能测试############################
    #########################DbOperator以及api使用方法实例#####################
    table = "adv_style"
    
    dbop = DbOperator()
#    dbop.query( table )
    #table, dictField
    dbop.insert( table,{"title":"2222222", "template":'5555555555', "show_number":22, "brief":'55555555555'} )
#    #def delete( self, table, dictWhere = None ):
#    dbop.delete( table ,{"id":'90',"title":"2"})
#    #def update( self, table, dictField, dictWhere = None ):
#    dbop.update( table, {"id":400},{"id":90})

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -