📄 old_dbconn.py
字号:
# -*- coding:gb2312 -*-
#!/usr/bin/env python
from sqlalchemy import *
S_CONN = 'mysql://root:root@127.0.0.1/keyword'
class DbBase:
# s_conn = 'mysql://root:root@192.168.200.9/keyword'
def __init__( self, s_conn = S_CONN ):
self.__conn = None
self.setconn( s_conn )
#function:
def setconn(self, s_conn ):
engine = create_engine( s_conn, echo = True)
self.__conn = engine.connect()
def getConn( self ):
return self.__conn
#function:
def closeConn( self ):
if ( None != self.__conn ):
self.__conn.close()
self.__conn = None
def closeResult ( self ,result ):
if ( None != result ):
result.close()
result = None
#function: def query(self, s_sql ):
return self.__execute(s_sql)
#function:
#return :
def insert( self, s_sql ):
return self.__execute( s_sql )
#function:
#return :
def update(self, s_sql ):
return self.__execute( s_sql )
#function:
#return :
def delete(self, s_sql ):
return self.__execute( s_sql )
#function:
#return :
def __execute( self, s_sql ):
print "s_sql = [" + s_sql + "]"
return self.__conn.execute( s_sql )
class DbOperator( DbBase ):
def __init__( self, s_conn = S_CONN ):
DbBase.__init__( self, s_conn )
#
def query( self, table, s_fieldKeys = "*", dictWhere = None ):
s_where = self.buildWhere( dictWhere )
sql = "select " + s_fieldKeys + " from " + table + " " + s_where
DbBase.query( self, sql )
#function:
#return :
def insert( self, table, dictField ):
s_values= self.__getDictValue2Str( dictField )
s_keys = self.__getDictKeys2Str( dictField )
sql = "insert into " + table + s_keys + " values " + s_values
print sql
DbBase.insert( self, sql )
#function:
#return :
def update( self, table, dictUpdtField, dictWhere = None ):
s_setField = self.__getSetField2Str( dictField )
s_where = self.buildWhere( dictWhere )
sql = "update " + table + " set " + s_setField + " " + s_where
DbBase.update( self, sql )
#function:
#return :
def delete( self, table, dictWhere = None ):
s_where = self.buildWhere( dictWhere )
sql = "delete from " + table + " " + s_where
DbBase.delete( self, sql )
#
def buildWhere( self, dictWhere ):
if ( None == dictWhere ):
return ""
s_where = "where "
if ( self.__isNotDict(dictWhere) ): #type(dictWhere).__name__ != "dict" ):
s_where+= dictWhere
else:
keys = dictWhere.keys()
ksLen = len( keys )
format = ""
for key in keys:
if ( self.__isNotStr( dictWhere[ key ] ) ): #type(dictWhere[ key ]) != "str" ):
format = "%s = %s"
else:
format = "%s = \'%s\'"
s_where += format % ( key, dictWhere[ key ] )
if ( ksLen >= 2 ):
s_where += " and "
ksLen -= 1
print s_where
return s_where
#"(k1,k2)","(k1)"
def __getDictKeys2Str( self, mydict ):
s_keys = ""
keys = mydict.keys()
format = ""
ksLen = len( keys )
#for key in keys:
########
if ( len( keys ) == 1 ):
s_keys += keys[0]
else:
format = "(" + (ksLen -1) * "%s," + "%s" +")"
tempks = tuple(keys) #"%s" % keys
s_keys = format % tuple(keys)
print s_keys
return s_keys
#
#"(v1,v2)","(v1)"
def __getDictValue2Str( self, mydict ):
keys = mydict.keys()
ksLen = len( keys )
format = ""
s_values = "("
for key in keys:
if ( self.__isNotStr( mydict[ key ] ) ): ##type(mydict[ key ]).__name__ != "str" ):
format = "%s"
else:
format = "\'%s\'"
s_values += format % mydict[ key ]
if ( ksLen >= 2 ):
s_values += ","
ksLen -= 1
#
s_values += ")"
return s_values
#"flg = \'1\',ff = 2"
def __getSetField2Str( self, dictField ):
s_set = ""
if ( self.__isNotDict( dictField ) ): #type(dictField).__name__ != "dict" ):
s_set += dictField
else:
keys = dictField.keys()
ksLen = len( keys )
format = ""
for key in keys:
if ( self.__isNotDict( dictField[key] ) ): #type(dictField[key]).__name__ != "str"):
format = "%s = %s"
else:
format = "%s = \'%s\'"
s_set += format % ( key,dictField[key] )
if ( ksLen >= 2):
s_set += ","
ksLen -= 1
return s_set
def __isNotStr( self, obj ):
return not self.__cmpType( obj,"str")
#
def __isNotDict( self, obj ):
return not self.__cmpType( obj,"dict")
#
def __cmpType( self, obj, s_type ):
if (type(obj).__name__ == s_type ):
return True
else:
return False
##############################修改!!!!!!!!####################################
#测试针对特定的表:
#sql table
#+-------------+----------------------+------+-----+---------+----------------+
#| Field | Type | Null | Key | Default | Extra |
#+-------------+----------------------+------+-----+---------+----------------+
#| id | int(11) | NO | PRI | NULL | auto_increment |
#| title | varchar(1024) | NO | | | |
#| template | longtext | NO | | | |
#| show_number | smallint(5) unsigned | NO | | | |
#| brief | longtext | NO | | | |
#+-------------+----------------------+------+-----+---------+----------------+
import unittest
import types
import time
class TestSqlBase( unittest.TestCase ):
#t_db is Db' obj
#cmp_q = ( title, '33', 44, '55')
def __init__( self, t_db, table, fieldKey, cmp_q , where = None ):
self.fieldKey = fieldKey
self.table = table
if ( None == where ):
where = fieldKey
#
self.t_db = t_db
self.s_sql_q = "select title,template,show_number,brief from " + table + " where title = \'" + fieldKey + "\'"
self.s_sql_i = "insert into " + table + " (title,template,show_number,brief) values (\'" + fieldKey + "\',\'33\',44,\'55\')"
self.s_sql_u = "update " + table + " set title = \'" + fieldKey + "\'' where title = \'"+ where + "\'"
self.s_sql_d = "delete from " + table + " where title = \'" + where + "\'"
#
self.cmp_q = cmp_q
def insert( self ):
try:
rt = self.t_db.insert( self.s_sql_i )
print "insert succeed"
except Exception, e:
print "insert false"
#self.query()
#
def query( self ):
#print "self.s_sql_q===============" + self.s_sql_q
result = self.t_db.query( self.s_sql_q )
self.assertEqual( result.fetchone() , self.cmp_q )
#
def update( self, fieldKey_old, fieldKey_new ):
self.s_sql_u = "update " + self.table + " set title = \'" + fieldKey_new + "\' where title = \'"+ fieldKey_old + "\'"
try:
self.t_db.update( self.s_sql_u )
print "update succeed"
except Exception, e:
print "update false"
self.s_sql_q = "select title,template,show_number,brief from " + self.table + " where title = \'" + fieldKey_new + "\'"
#self.query()
#
def delete( self ):
try:
self.t_db.delete( self.s_sql_d )
print "delete succeed"
except Exception, e:
print "delete false"
#self.query()
class TestDbKnownCase ( unittest.TestCase ):
def setUp( self ):
self.table = "adv_style"
self.t_db = DbBase( S_CONN ) #'mysql://root:root@127.0.0.1/keyword' )
def tearDown( self ):
self.table= None
self.t_db = None
#t_db, table, fieldKey,cmp_q
def testQuery( self ):
title = time.ctime()
cmp_q = ( title, '33', 44, '55')
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
def testInsert( self ):
title = time.ctime()
cmp_q = ( title, '33', 44, '55')
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
#
def testUpdate( self ):
title = time.ctime()
cmp_q = ( title, '33', 44, '55')
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
title_u = title + "update"
cmp_q = ( title_u, '33', 44, '55')
TestSqlBase(self.t_db, self.table, title, cmp_q).update( title, title_u )
TestSqlBase( self.t_db, self.table, title_u, cmp_q ).query()
#
def testDelete( self ):
title = time.ctime()
cmp_q = None#
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
TestSqlBase( self.t_db, self.table, title, cmp_q ).delete()
TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
#
def testCloseConn( self ):
pass
def testClossResult( self ):
pass
class TestDbBadCase ( unittest.TestCase ):
def setUp( self ):
self.table = "adv_style"
self.badTb = self.table + "bad"
self.t_db = DbBase( S_CONN )# 'mysql://root:root@127.0.0.1/keyword' )
def tearDown( self ):
self.table= None
self.t_db = None
#
def testSetConn( self ):
pass
def testRollback( self ):
pass
def testQuery( self ):
title = time.ctime()
cmp_q = None
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
title_bad = title + "bad"
TestSqlBase( self.t_db, self.table, title_bad, cmp_q ).query()
#
def testInsert( self ):
title = time.ctime()
cmp_q = None
#table is bad table
TestSqlBase( self.t_db, self.badTb, title, cmp_q ).insert()#insert to bad table
#
TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
#
def testUpdate( self ):
title = time.ctime()
cmp_q = None
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
title_u = title + "update"
title_bad= title + "bad"
TestSqlBase(self.t_db, self.table, title, cmp_q).update( title_bad, title_u )
#None
TestSqlBase( self.t_db, self.table, title_bad, cmp_q ).query()
cmp_q = (title, '33', 44, '55')
TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
#
def testDelete( self ):
title = time.ctime()
cmp_q = None#( title, '33', 44, '55')
TestSqlBase( self.t_db, self.table, title, cmp_q ).insert()
title_d_bad = title + "bad"
TestSqlBase( self.t_db, self.table, title_d_bad, cmp_q ).delete()
#
cmp_q = ( title, '33', 44, '55')
TestSqlBase( self.t_db, self.table, title, cmp_q ).query()
#
def testCloseConn( self ):
pass
#
def testClossResult( self ):
pass
#
def suite():
suite = unittest.TestSuite()
#test succeed case
testInsertCase = TestDbKnownCase("testInsert")
suite.addTest( testInsertCase )
# testQueryCase = TestDbKnownCase("testQuery")
suite.addTest( testQueryCase )
#
testUpdateCase = TestDbKnownCase("testUpdate")
suite.addTest( testUpdateCase )
#
testDeleteCase = TestDbKnownCase("testDelete")
suite.addTest( testDeleteCase )
#
#
#test false case testRollbackCase = TestDbBadCase("testRollback")
suite.addTest( testRollbackCase )
testBadInsertCase = TestDbBadCase("testInsert")
suite.addTest( testBadInsertCase)
testBadQueryCase = TestDbBadCase("testQuery")
suite.addTest( testBadQueryCase )
testBadUpdateCase = TestDbBadCase("testUpdate")
suite.addTest( testBadUpdateCase )
#
testBadDeleteCase = TestDbBadCase("testDelete")
suite.addTest( testBadDeleteCase )
#
return suite
if (__name__ == "__main__"):
#unittest.main(defaultTest = 'suite')
#
#########################DbOperator基本功能测试############################
#########################DbOperator以及api使用方法实例#####################
table = "adv_style"
dbop = DbOperator()
# dbop.query( table )
#table, dictField
dbop.insert( table,{"title":"2222222", "template":'5555555555', "show_number":22, "brief":'55555555555'} )
# #def delete( self, table, dictWhere = None ):
# dbop.delete( table ,{"id":'90',"title":"2"})
# #def update( self, table, dictField, dictWhere = None ):
# dbop.update( table, {"id":400},{"id":90})
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -