tux.py

来自「使用python调用tuxedo服务的代码, 中间以 c 调用tuxedo客户端」· Python 代码 · 共 468 行

PY
468
字号
# coding=gbk

import struct, types, socket, io, threading, Queue, time
from util import log_r


class FML32:
    GPARM32_0        = 0x0a000c80
    SEND_PARMS_NUM32 = 0x0a000ce4
    RECP_PARMS_NUM32 = 0x0a000ce6
    SVC_ERR_NO32     = 0x0a000cf8
    SVC_ERR_MSG32    = 0x0a000cf9
    F_ServiceName32  = 0x0a000d03
    F_Wsnaddr32      = 0x0a000d04
    __magic0 = '\xff\xff\xd8\xf0'
    __magic1 = '\x00\x00\x04\x00'
    __magic2 = '\x00\x00\x00\x00\x00\x00\x00\x10'
    __magic3 = '\x00\x00\x10\x00'
    
    def __init__(self, is_send_buf=1):
        self.data = {}
        self.fsize = 24
        self.issendbuf = is_send_buf
        self.fcount = 0
        
    def fadd(self, fid, value):
        "add a field value"
        if type(value) != types.StringType:
            return -1
        self.fcount += 1
        self.fsize += self.roundup4(len(value)+8+1)
        try:
            x = self.data[fid]
            x.append(value)
        except KeyError,x:
            x = [value]
            self.data[fid] = x
        return 0
            
    def fchg(self, fid, row, value):
        "change a field value"
        if type(value) != types.StringType:
            return -1
        try:
            x = self.data[fid]
        except KeyError,x:
            x = []
            self.data[fid] = x
        if len(x) > row:
            oldv = x[row]
            x[row] = value
            self.fsize -= self.roundup4(len(oldv)+8+1)
            self.fsize += self.roundup4(len(value)+8+1)
        else: #need fill blank
            gap = row - len(x)
            self.fcount += gap+1
            self.fsize  += 12 * gap
            for i in range(0, row-len(x)):
                x.append('')
            x.append(value)
            self.fsize += self.roundup4(len(value)+8+1)
        return 0
    
    def fget(self, fid, row):
        "get a field value, return type is string"
        try:
            x = self.data[fid]
            return x[row].rstrip()
        except KeyError,x:
            return None
    
    def focc(self, fid):
        try:
            x = self.data[fid]
            return len(x)
        except KeyError,x:
            return 0

    def str2int(self, str):
        #(r, ) = struct.unpack('I', str[::-1])
        (r, ) = struct.unpack('I', str)
        return socket.ntohl(r)

    def int2str(self, ii):
        #return (struct.pack('I',ii))[::-1]
        return (struct.pack('I',socket.htonl(ii)))

    def roundup4(self, ii):
        return ii + 3 & -4
        
    def fromstr(self, s):
        "从文件输入中读取一个fml32 buffer"
        fin = io.BytesIO(s)
        self.clear()
        m0 = fin.read(4)
        if m0 != FML32.__magic0:
            return False
        self.fsize = self.str2int(fin.read(4))
        x = fin.read(4)
        if x == FML32.__magic1:
            self.issendbuf = 1
        elif x == FML32.__magic3:
            self.issendbuf = 0
        else:
            self.issendbuf = 2
        self.fcount = self.str2int(fin.read(4))
        x = fin.read(8)
        if x != FML32.__magic2:
            pass
        fid  = 0
        fid1 = 0
        idx = []
        for i in range(0, self.fcount):
            x = fin.read(4)
            fid = self.str2int(x)
            #print 'fid', fid
            if fid != fid1:
                idx = []
                self.data[fid] = idx
            fid1 = fid
            flen0 = self.str2int(fin.read(4))
            flen1 = self.roundup4(flen0)
            ll = flen1 - 8
            d1=''
            while ll > 0:
                d2 = fin.read(int(ll))
                d1 = d1 + d2
                ll = ll - len(d2)
            idx.append(d1[0:(flen0-8-1)])
        return True
            
    def tostr(self):
        "将FML32 buffer中的数据写入到文件输出流中"
        #write header, header size 4 + 4 + 4 + 4 + 8
        pout = io.BytesIO()
        pout.write(FML32.__magic0)
        pout.write(self.int2str(self.fsize))
        if self.issendbuf == 1:
            pout.write(FML32.__magic1)
        else:
            pout.write(FML32.__magic3)
        pout.write(self.int2str(self.fcount))
        pout.write(FML32.__magic2)
        
        # write body
        ks = self.data.keys()
        ks = ks[0:len(ks)]
        ks.sort()
        fill = '\x00\x00\x00\x00'
        for k in ks:
            va = self.data[k]
            for v in va:
                pout.write(self.int2str(k)) #write fid
                pout.write(self.int2str(len(v)+8+1)) # write field length
                pout.write(v) #write the field value
                l1 = self.roundup4(len(v)+1) - len(v)
                pout.write(fill[0:l1])
        return pout.getvalue()
                
    def clear(self):
        self.data.clear()
        self.fsize = 24
        self.fcount = 0

# end of class FML32

class TuxConn:
    def __init__(self, tuxproxy):
        self.connPool = {}
        self.lockpool = threading.Lock()
        self.q_tuxrply = Queue.Queue()
        self.tux_proxy = tuxproxy
        self.tux_seq = 1

    def readTuxPkt (self, sock):
        if type(sock) != socket.socket:
            return None
        hlen = 8
        s = sock.recv(hlen)
        if len(s) != hlen:
            print 'invalid read header len', repr(s)
            return None
        ss = [s]
        (pkt_len, ) = struct.unpack('I', s[4:8])
        pkt_len = socket.ntohl(pkt_len)
        if pkt_len <= hlen:
            return
        pkt_len -= hlen
        while pkt_len > 0:
            sb = sock.recv(pkt_len)
            if len(sb) == 0:
                return
            ss.append(sb)
            pkt_len -= len(sb)
        return "".join(ss)
    
    def th_readtux_reply (self, cc):
        "接收tuxedo服务调用应答的线程函数"
        global q_tuxrply
        sock = cc['sock']
        call_list = cc['call_list']
        wsnaddr = cc['wsn']
        x = FML32()
        while True:
            #print "begin readTuxPkt"
            s = self.readTuxPkt(sock)
            cc['tm_active'] = time.time()
            #print "readTuxPkt returned haha"
            x.fromstr(s)
            lseq = 0
            try:
                lseq = int(x.fget(FML32.F_Wsnaddr32, 0))
            except TypeError,ex:
                # 获取sequenceid失败, 则认为连接已经出现问题, 需要重连,
                #一个遗留问题: 本次调用所产生的队列消息将不能被清理
                log_r( "wsnaddr:%s call failed, there is no call sequence no"%wsnaddr)
                cc['lock'].acquire()
                sock.close()
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                chost = self.tux_proxy
                sock.connect(chost)
                x = FML32()
                x.fadd(FML32.F_Wsnaddr32, wsnaddr)
                sock.sendall(x.tostr())
                cc['sock'] = sock
                cc['lock'].release()
                
                #通知所有在该连接上等待的异步调用序号,失败了
                for lseq in cc['asyn_list'].keys():
                    del cc['asyn_list'][lseq]
                    x = FML32()
                    x.fadd(FML32.F_Wsnaddr32, str(lseq))
                    x.fadd(FML32.SVC_ERR_NO32, "777701")
                    x.fadd(FML32.SVC_ERR_MSG32, "tuxedo连接中断,调用失败")
                    q_tuxrply.put(x)
                continue # need to reconnect to tuxedo
            if lseq in call_list:
                dc = call_list[lseq]
                print "syn call return"
                dc[1] = s
                dc[0].release() #同步调用, 等待结果的, 需要唤醒先
            else:
                del cc['asyn_list'][lseq]
                self.q_tuxrply.put(x) #非同步调用, 直接将结果写到存到应答队列中
                #print "reply lseq:", lseq, x.fget(FML32.F_Wsnaddr32, 0)

    def create_conn (self, wsnaddr):
        "新建到tuxedo代理的连接, 并启动接收应答线程"
        chost = self.tux_proxy
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        print repr(chost)
        ret = sock.connect(chost)
        #print "connect to %s, return %s"%(repr(chost), repr(ret))
        x = FML32()
        x.clear()
        x.fadd(FML32.F_Wsnaddr32, wsnaddr)
        ret = sock.sendall(x.tostr())
        #print "create conn, sendall return %s"%repr(ret)
        cc = {'sock':sock, 'seq':1, 'lock':threading.Lock(), 'call_list':{},
            'wsn':wsnaddr, 'asyn_list':{}}
        self.connPool[wsnaddr] = cc
        t = threading.Thread(target=self.th_readtux_reply, args=(cc,))
        t.daemon = True
        t.start()
        return cc


    def call_service (self, wsn, service_name, inParm, th_lock, outParm=None):
        "同步调用服务, 阻塞等待服务返回, 并将返回参数保存到 outParm中"
        if not wsn.startswith("WSNADDR=//"):
            print "not a valid wsnaddr: %s"%wsn
            return None
    
        self.lockpool.acquire()
        if wsn in self.connPool:
            cc = self.connPool[wsn]
        else:
            cc = self.create_conn(wsn)
        self.tux_seq += 1
        lseq = self.tux_seq
        self.lockpool.release()
        
        #print type(cc), wsn, repr(connPool)
    
        x = FML32()
        x.fadd(FML32.F_Wsnaddr32, str(lseq))
        x.fadd(FML32.F_ServiceName32, service_name)
        for (k, v) in inParm.items():
            if k == 'in_num':
                x.fadd(FML32.SEND_PARMS_NUM32, str(v))
            elif k == 'out_num':
                x.fadd(FML32.RECP_PARMS_NUM32, str(v))
            else:
                try:
                    pos = k.find('.')
                    if pos < 0:
                        col = int(k)
                        row = 0
                    else:
                        col = int(k[0:pos])
                        row = int(k[pos+1:])
                    x.fchg(FML32.GPARM32_0+col, row, str(v))
                except ValueError,x:
                    continue
        cc['lock'].acquire()
        ret = cc['sock'].sendall(x.tostr())
        cc['lock'].release()
        #print "call_service sendall return %s"%repr(ret)
    
        call_list = cc['call_list']
        call_list[lseq] = [th_lock, None]
        # 等待应答, 接收调用应答的工作有另外的线程来处理
        #print "begin wait call return"
        th_lock.acquire()
        th_lock.acquire()
        th_lock.release()
        #print "end wait call return"
        s = call_list[lseq][1]
        del call_list[lseq]
        x.fromstr(s)
        if not outParm is None:
            outParm['errno'] = x.fget(FML32.SVC_ERR_NO32, 0)
            outParm['errmsg'] = x.fget(FML32.SVC_ERR_MSG32, 0)
            for i in range(50):
                k = x.focc(FML32.GPARM32_0 + i)
                for j in range(k):
                    outParm['%d.%d'%(i, j)] = x.fget(FML32.GPARM32_0 + i, j)
        return x
    
    def call_service_asyn (self, wsn, service_name, inParm):
        "异步调用服务, 返回调用序列号"
        if not wsn.startswith("WSNADDR=//"):
            print "not a valid wsnaddr: %s"%wsn
            return None
        
        self.lockpool.acquire()
        if wsn in self.connPool:
            cc = self.connPool[wsn]
        else:
            cc = self.create_conn(wsn)
        self.tux_seq += 1
        lseq = self.tux_seq
        self.lockpool.release()
    
        x = FML32()
        x.fadd(FML32.F_Wsnaddr32, str(lseq))
        x.fadd(FML32.F_ServiceName32, service_name)
        for (k, v) in inParm.items():
            if k == 'in_num':
                x.fadd(FML32.SEND_PARMS_NUM32, str(v))
            elif k == 'out_num':
                x.fadd(FML32.RECP_PARMS_NUM32, str(v))
            else:
                try:
                    pos = k.find('.')
                    if pos < 0:
                        col = int(k)
                        row = 0
                    else:
                        col = int(k[0:pos])
                        row = int(k[pos+1:])
                    x.fchg(FML32.GPARM32_0+col, row, str(v))
                except ValueError,x:
                    continue
        cc['lock'].acquire()
        cc['asyn_list'][lseq] = 0
        r = cc['sock'].sendall(x.tostr())
        cc['lock'].release()
        print "tuxcall asyn, lseq: %d, sendall return %s"%(lseq, repr(r))
        return lseq
# end of class TuxConn

if __name__ == "__main__":
    print "test me haha"
    tux = TuxConn(('10.105.3.231', 4777))
    
    wsn = 'WSNADDR=//10.109.2.206:28000'
    mylock = threading.Lock()
    x = tux.call_service(wsn, 'sSmsQuery', {'in_num':4, 'out_num':6, '0':'13908098379', '1':'9000', '2':'0', '3':'0'}, mylock)
    if True:
        print 'errno:', x.fget(FML32.SVC_ERR_NO32, 0)
        print 'errms:', x.fget(FML32.SVC_ERR_MSG32, 0)
        print 'F_Wsnaddr32:', x.fget(FML32.F_Wsnaddr32, 0)
        print '0.0:', x.fget(FML32.GPARM32_0, 0)
        print '1.0:', x.fget(FML32.GPARM32_0+1, 0)
    exit(0)
















def test11():
    x=FML32()

#    f=file('e:/tmp/xxx1.dat', 'rb')
#    x.fromstr(f.read())
#    f.close()
#    
#    k = x.fget(FML32.GPARM32_0, 1)
#    print len(k), k
#    k = x.fget(FML32.GPARM32_0+1, 8)
#    print len(k), k
    
    ### test for write
#    x.clear()
#    x.fadd(FML32.GPARM32_0, 'hello')
#    x.fadd(FML32.GPARM32_0, 'dddddxxxxxxxxxx')
#    x.fadd(FML32.GPARM32_0+1, 'xxxxx')
#    x.fadd(FML32.GPARM32_0+1, 'dddddd')
#    x.fchg(FML32.GPARM32_0+1, 6, 'jlkjlkjl')
#    x.fchg(FML32.GPARM32_0+1, 8, 'hello123')
#    
#    f = file('e:/tmp/xxx2.dat', 'wb')
#    f.write(x.tostr())
#    f.close()


    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(("10.105.3.231", 4777))
    x.clear()
    x.fadd(FML32.F_Wsnaddr32, "WSNADDR=//10.109.2.206:28000")
    sock.sendall(x.tostr())

    import time
    t1 = time.time()
    tux_seq = 1
    for i in range(10):
        x.clear();
        x.fadd(FML32.SEND_PARMS_NUM32, "4")
        x.fadd(FML32.RECP_PARMS_NUM32, "6")
        x.fadd(FML32.GPARM32_0, "13908098379")
        x.fadd(FML32.GPARM32_0+1, "9000")
        x.fadd(FML32.GPARM32_0+2, "0")
        x.fadd(FML32.GPARM32_0+3, "0")
        x.fadd(FML32.F_ServiceName32, "sSmsQuery")
        x.fadd(FML32.F_Wsnaddr32, str(tux_seq))
        tux_seq += 1
        sock.sendall(x.tostr())

        s = readTuxPkt(sock)
        x.clear()
        x.fromstr(s)

        print 'errno:', x.fget(FML32.SVC_ERR_NO32, 0)
        print 'errms:', x.fget(FML32.SVC_ERR_MSG32, 0)
        print 'F_Wsnaddr32:', x.fget(FML32.F_Wsnaddr32, 0)
        print '0.0:', x.fget(FML32.GPARM32_0, 0)
        print '1.0:', x.fget(FML32.GPARM32_0+1, 0)

    sock.close()
    t2 = time.time()
    print (t2 - t1) * 1000.0



        

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?