tux.py
来自「使用python调用tuxedo服务的代码, 中间以 c 调用tuxedo客户端」· Python 代码 · 共 468 行
PY
468 行
# coding=gbk
import struct, types, socket, io, threading, Queue, time
from util import log_r
class FML32:
GPARM32_0 = 0x0a000c80
SEND_PARMS_NUM32 = 0x0a000ce4
RECP_PARMS_NUM32 = 0x0a000ce6
SVC_ERR_NO32 = 0x0a000cf8
SVC_ERR_MSG32 = 0x0a000cf9
F_ServiceName32 = 0x0a000d03
F_Wsnaddr32 = 0x0a000d04
__magic0 = '\xff\xff\xd8\xf0'
__magic1 = '\x00\x00\x04\x00'
__magic2 = '\x00\x00\x00\x00\x00\x00\x00\x10'
__magic3 = '\x00\x00\x10\x00'
def __init__(self, is_send_buf=1):
self.data = {}
self.fsize = 24
self.issendbuf = is_send_buf
self.fcount = 0
def fadd(self, fid, value):
"add a field value"
if type(value) != types.StringType:
return -1
self.fcount += 1
self.fsize += self.roundup4(len(value)+8+1)
try:
x = self.data[fid]
x.append(value)
except KeyError,x:
x = [value]
self.data[fid] = x
return 0
def fchg(self, fid, row, value):
"change a field value"
if type(value) != types.StringType:
return -1
try:
x = self.data[fid]
except KeyError,x:
x = []
self.data[fid] = x
if len(x) > row:
oldv = x[row]
x[row] = value
self.fsize -= self.roundup4(len(oldv)+8+1)
self.fsize += self.roundup4(len(value)+8+1)
else: #need fill blank
gap = row - len(x)
self.fcount += gap+1
self.fsize += 12 * gap
for i in range(0, row-len(x)):
x.append('')
x.append(value)
self.fsize += self.roundup4(len(value)+8+1)
return 0
def fget(self, fid, row):
"get a field value, return type is string"
try:
x = self.data[fid]
return x[row].rstrip()
except KeyError,x:
return None
def focc(self, fid):
try:
x = self.data[fid]
return len(x)
except KeyError,x:
return 0
def str2int(self, str):
#(r, ) = struct.unpack('I', str[::-1])
(r, ) = struct.unpack('I', str)
return socket.ntohl(r)
def int2str(self, ii):
#return (struct.pack('I',ii))[::-1]
return (struct.pack('I',socket.htonl(ii)))
def roundup4(self, ii):
return ii + 3 & -4
def fromstr(self, s):
"从文件输入中读取一个fml32 buffer"
fin = io.BytesIO(s)
self.clear()
m0 = fin.read(4)
if m0 != FML32.__magic0:
return False
self.fsize = self.str2int(fin.read(4))
x = fin.read(4)
if x == FML32.__magic1:
self.issendbuf = 1
elif x == FML32.__magic3:
self.issendbuf = 0
else:
self.issendbuf = 2
self.fcount = self.str2int(fin.read(4))
x = fin.read(8)
if x != FML32.__magic2:
pass
fid = 0
fid1 = 0
idx = []
for i in range(0, self.fcount):
x = fin.read(4)
fid = self.str2int(x)
#print 'fid', fid
if fid != fid1:
idx = []
self.data[fid] = idx
fid1 = fid
flen0 = self.str2int(fin.read(4))
flen1 = self.roundup4(flen0)
ll = flen1 - 8
d1=''
while ll > 0:
d2 = fin.read(int(ll))
d1 = d1 + d2
ll = ll - len(d2)
idx.append(d1[0:(flen0-8-1)])
return True
def tostr(self):
"将FML32 buffer中的数据写入到文件输出流中"
#write header, header size 4 + 4 + 4 + 4 + 8
pout = io.BytesIO()
pout.write(FML32.__magic0)
pout.write(self.int2str(self.fsize))
if self.issendbuf == 1:
pout.write(FML32.__magic1)
else:
pout.write(FML32.__magic3)
pout.write(self.int2str(self.fcount))
pout.write(FML32.__magic2)
# write body
ks = self.data.keys()
ks = ks[0:len(ks)]
ks.sort()
fill = '\x00\x00\x00\x00'
for k in ks:
va = self.data[k]
for v in va:
pout.write(self.int2str(k)) #write fid
pout.write(self.int2str(len(v)+8+1)) # write field length
pout.write(v) #write the field value
l1 = self.roundup4(len(v)+1) - len(v)
pout.write(fill[0:l1])
return pout.getvalue()
def clear(self):
self.data.clear()
self.fsize = 24
self.fcount = 0
# end of class FML32
class TuxConn:
def __init__(self, tuxproxy):
self.connPool = {}
self.lockpool = threading.Lock()
self.q_tuxrply = Queue.Queue()
self.tux_proxy = tuxproxy
self.tux_seq = 1
def readTuxPkt (self, sock):
if type(sock) != socket.socket:
return None
hlen = 8
s = sock.recv(hlen)
if len(s) != hlen:
print 'invalid read header len', repr(s)
return None
ss = [s]
(pkt_len, ) = struct.unpack('I', s[4:8])
pkt_len = socket.ntohl(pkt_len)
if pkt_len <= hlen:
return
pkt_len -= hlen
while pkt_len > 0:
sb = sock.recv(pkt_len)
if len(sb) == 0:
return
ss.append(sb)
pkt_len -= len(sb)
return "".join(ss)
def th_readtux_reply (self, cc):
"接收tuxedo服务调用应答的线程函数"
global q_tuxrply
sock = cc['sock']
call_list = cc['call_list']
wsnaddr = cc['wsn']
x = FML32()
while True:
#print "begin readTuxPkt"
s = self.readTuxPkt(sock)
cc['tm_active'] = time.time()
#print "readTuxPkt returned haha"
x.fromstr(s)
lseq = 0
try:
lseq = int(x.fget(FML32.F_Wsnaddr32, 0))
except TypeError,ex:
# 获取sequenceid失败, 则认为连接已经出现问题, 需要重连,
#一个遗留问题: 本次调用所产生的队列消息将不能被清理
log_r( "wsnaddr:%s call failed, there is no call sequence no"%wsnaddr)
cc['lock'].acquire()
sock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
chost = self.tux_proxy
sock.connect(chost)
x = FML32()
x.fadd(FML32.F_Wsnaddr32, wsnaddr)
sock.sendall(x.tostr())
cc['sock'] = sock
cc['lock'].release()
#通知所有在该连接上等待的异步调用序号,失败了
for lseq in cc['asyn_list'].keys():
del cc['asyn_list'][lseq]
x = FML32()
x.fadd(FML32.F_Wsnaddr32, str(lseq))
x.fadd(FML32.SVC_ERR_NO32, "777701")
x.fadd(FML32.SVC_ERR_MSG32, "tuxedo连接中断,调用失败")
q_tuxrply.put(x)
continue # need to reconnect to tuxedo
if lseq in call_list:
dc = call_list[lseq]
print "syn call return"
dc[1] = s
dc[0].release() #同步调用, 等待结果的, 需要唤醒先
else:
del cc['asyn_list'][lseq]
self.q_tuxrply.put(x) #非同步调用, 直接将结果写到存到应答队列中
#print "reply lseq:", lseq, x.fget(FML32.F_Wsnaddr32, 0)
def create_conn (self, wsnaddr):
"新建到tuxedo代理的连接, 并启动接收应答线程"
chost = self.tux_proxy
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print repr(chost)
ret = sock.connect(chost)
#print "connect to %s, return %s"%(repr(chost), repr(ret))
x = FML32()
x.clear()
x.fadd(FML32.F_Wsnaddr32, wsnaddr)
ret = sock.sendall(x.tostr())
#print "create conn, sendall return %s"%repr(ret)
cc = {'sock':sock, 'seq':1, 'lock':threading.Lock(), 'call_list':{},
'wsn':wsnaddr, 'asyn_list':{}}
self.connPool[wsnaddr] = cc
t = threading.Thread(target=self.th_readtux_reply, args=(cc,))
t.daemon = True
t.start()
return cc
def call_service (self, wsn, service_name, inParm, th_lock, outParm=None):
"同步调用服务, 阻塞等待服务返回, 并将返回参数保存到 outParm中"
if not wsn.startswith("WSNADDR=//"):
print "not a valid wsnaddr: %s"%wsn
return None
self.lockpool.acquire()
if wsn in self.connPool:
cc = self.connPool[wsn]
else:
cc = self.create_conn(wsn)
self.tux_seq += 1
lseq = self.tux_seq
self.lockpool.release()
#print type(cc), wsn, repr(connPool)
x = FML32()
x.fadd(FML32.F_Wsnaddr32, str(lseq))
x.fadd(FML32.F_ServiceName32, service_name)
for (k, v) in inParm.items():
if k == 'in_num':
x.fadd(FML32.SEND_PARMS_NUM32, str(v))
elif k == 'out_num':
x.fadd(FML32.RECP_PARMS_NUM32, str(v))
else:
try:
pos = k.find('.')
if pos < 0:
col = int(k)
row = 0
else:
col = int(k[0:pos])
row = int(k[pos+1:])
x.fchg(FML32.GPARM32_0+col, row, str(v))
except ValueError,x:
continue
cc['lock'].acquire()
ret = cc['sock'].sendall(x.tostr())
cc['lock'].release()
#print "call_service sendall return %s"%repr(ret)
call_list = cc['call_list']
call_list[lseq] = [th_lock, None]
# 等待应答, 接收调用应答的工作有另外的线程来处理
#print "begin wait call return"
th_lock.acquire()
th_lock.acquire()
th_lock.release()
#print "end wait call return"
s = call_list[lseq][1]
del call_list[lseq]
x.fromstr(s)
if not outParm is None:
outParm['errno'] = x.fget(FML32.SVC_ERR_NO32, 0)
outParm['errmsg'] = x.fget(FML32.SVC_ERR_MSG32, 0)
for i in range(50):
k = x.focc(FML32.GPARM32_0 + i)
for j in range(k):
outParm['%d.%d'%(i, j)] = x.fget(FML32.GPARM32_0 + i, j)
return x
def call_service_asyn (self, wsn, service_name, inParm):
"异步调用服务, 返回调用序列号"
if not wsn.startswith("WSNADDR=//"):
print "not a valid wsnaddr: %s"%wsn
return None
self.lockpool.acquire()
if wsn in self.connPool:
cc = self.connPool[wsn]
else:
cc = self.create_conn(wsn)
self.tux_seq += 1
lseq = self.tux_seq
self.lockpool.release()
x = FML32()
x.fadd(FML32.F_Wsnaddr32, str(lseq))
x.fadd(FML32.F_ServiceName32, service_name)
for (k, v) in inParm.items():
if k == 'in_num':
x.fadd(FML32.SEND_PARMS_NUM32, str(v))
elif k == 'out_num':
x.fadd(FML32.RECP_PARMS_NUM32, str(v))
else:
try:
pos = k.find('.')
if pos < 0:
col = int(k)
row = 0
else:
col = int(k[0:pos])
row = int(k[pos+1:])
x.fchg(FML32.GPARM32_0+col, row, str(v))
except ValueError,x:
continue
cc['lock'].acquire()
cc['asyn_list'][lseq] = 0
r = cc['sock'].sendall(x.tostr())
cc['lock'].release()
print "tuxcall asyn, lseq: %d, sendall return %s"%(lseq, repr(r))
return lseq
# end of class TuxConn
if __name__ == "__main__":
print "test me haha"
tux = TuxConn(('10.105.3.231', 4777))
wsn = 'WSNADDR=//10.109.2.206:28000'
mylock = threading.Lock()
x = tux.call_service(wsn, 'sSmsQuery', {'in_num':4, 'out_num':6, '0':'13908098379', '1':'9000', '2':'0', '3':'0'}, mylock)
if True:
print 'errno:', x.fget(FML32.SVC_ERR_NO32, 0)
print 'errms:', x.fget(FML32.SVC_ERR_MSG32, 0)
print 'F_Wsnaddr32:', x.fget(FML32.F_Wsnaddr32, 0)
print '0.0:', x.fget(FML32.GPARM32_0, 0)
print '1.0:', x.fget(FML32.GPARM32_0+1, 0)
exit(0)
def test11():
x=FML32()
# f=file('e:/tmp/xxx1.dat', 'rb')
# x.fromstr(f.read())
# f.close()
#
# k = x.fget(FML32.GPARM32_0, 1)
# print len(k), k
# k = x.fget(FML32.GPARM32_0+1, 8)
# print len(k), k
### test for write
# x.clear()
# x.fadd(FML32.GPARM32_0, 'hello')
# x.fadd(FML32.GPARM32_0, 'dddddxxxxxxxxxx')
# x.fadd(FML32.GPARM32_0+1, 'xxxxx')
# x.fadd(FML32.GPARM32_0+1, 'dddddd')
# x.fchg(FML32.GPARM32_0+1, 6, 'jlkjlkjl')
# x.fchg(FML32.GPARM32_0+1, 8, 'hello123')
#
# f = file('e:/tmp/xxx2.dat', 'wb')
# f.write(x.tostr())
# f.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("10.105.3.231", 4777))
x.clear()
x.fadd(FML32.F_Wsnaddr32, "WSNADDR=//10.109.2.206:28000")
sock.sendall(x.tostr())
import time
t1 = time.time()
tux_seq = 1
for i in range(10):
x.clear();
x.fadd(FML32.SEND_PARMS_NUM32, "4")
x.fadd(FML32.RECP_PARMS_NUM32, "6")
x.fadd(FML32.GPARM32_0, "13908098379")
x.fadd(FML32.GPARM32_0+1, "9000")
x.fadd(FML32.GPARM32_0+2, "0")
x.fadd(FML32.GPARM32_0+3, "0")
x.fadd(FML32.F_ServiceName32, "sSmsQuery")
x.fadd(FML32.F_Wsnaddr32, str(tux_seq))
tux_seq += 1
sock.sendall(x.tostr())
s = readTuxPkt(sock)
x.clear()
x.fromstr(s)
print 'errno:', x.fget(FML32.SVC_ERR_NO32, 0)
print 'errms:', x.fget(FML32.SVC_ERR_MSG32, 0)
print 'F_Wsnaddr32:', x.fget(FML32.F_Wsnaddr32, 0)
print '0.0:', x.fget(FML32.GPARM32_0, 0)
print '1.0:', x.fget(FML32.GPARM32_0+1, 0)
sock.close()
t2 = time.time()
print (t2 - t1) * 1000.0
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?