⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tutils.py

📁 OpenWFE是一个开放源码的Java工作流引擎。它是一个完整的业务处理管理套件:一个引擎
💻 PY
字号:
## Copyright (c) 2005, John Mettraux, OpenWFE.org# All rights reserved.# # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met:# # . Redistributions of source code must retain the above copyright notice, this#   list of conditions and the following disclaimer.  # # . Redistributions in binary form must reproduce the above copyright notice, #   this list of conditions and the following disclaimer in the documentation #   and/or other materials provided with the distribution.# # . Neither the name of the "OpenWFE" nor the names of its contributors may be#   used to endorse or promote products derived from this software without#   specific prior written permission.# # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE.## $Id: tutils.py 2451 2006-03-20 13:22:43Z jmettraux $### testing various flows#import sysfrom time import sleep as tsleepfrom openwfe.workitem import Launchitemfrom openwfe.workitem import StringAttributefrom openwfe.workitem import StringMapAttributefrom openwfe.rest.control import ControlSessionfrom openwfe.rest.wlclient import WorkSessionALICE = 'alice'BOB = 'bob'HOST = 'localhost'#HOST = '192.168.4.101'#HOST = '192.168.4.102'PORT = 5080CPORT = 6080TEST = '__test__'MAIN_ENGINE = 'mainEngine'## a few help methodsdef loginAs (participant):    ws = WorkSession(HOST, PORT, participant, participant)    print '    <> logged in as %s' % participant    return wsdef controlLogin ():    ws = ControlSession(HOST, CPORT, 'admin', 'admin')    print '    <> control logged in as admin'    return wsdef lookupWorkitem (workSession, storeName, testName, attempt=1, lock=1):    #print 'testName >%s<' % testName    headers = workSession.getHeaders(storeName)    #print '     headers count : %i' % len(headers)    for header in headers:        workitem = workSession.getWorkitem(storeName, header.flowExpressionId)        for field in workitem.attributes.keys():            #print 'field name >%s<' % str(field.value).strip()            #print 'field value >%s<' % workitem.attributes[field].value            if field.value == TEST and workitem.attributes[field].value == testName:                #print 'found. getting and locking...'                if lock:                    workitem = workSession.getAndLockWorkitem(storeName, header.flowExpressionId)                else:                    workitem = workSession.getWorkitem(storeName, header.flowExpressionId)                if not workitem: print 'lookupWorkitem() null workitem...'                #print '                 a %i' % attempt                return workitem    if attempt <= 40:        tsleep(0.02)        #print 'lookupWorkitem() attempt #%i' % attempt        return lookupWorkitem(workSession, storeName, testName, attempt+1, lock)    print 'lookupWorkitem() no workitem found after %i attempts' % attempt    #print '..did not find any workitem in store %s' % storeName    return Nonedef lookupWorkitem2 \    (workSession, storeName, testName, fieldName, fieldValue, attempt=1):    headers = workSession.getHeaders(storeName)    for header in headers:        workitem = workSession.getWorkitem(storeName, header.flowExpressionId)        #print '    : "%s"' % workitem.attributes.gets(TEST)        if workitem.attributes.gets(TEST) != testName:             continue        for field in workitem.attributes.keys():            if field.value == fieldName and workitem.attributes[field].value == fieldValue:                workitem = workSession.getAndLockWorkitem(storeName, header.flowExpressionId)                return workitem    if attempt <= 40:        tsleep(0.02)        return lookupWorkitem2(workSession, storeName, testName, fieldName, fieldValue, attempt+1)    return Nonedef die (message=None):    if message: print '\n !!! %s\n' % message    sys.exit(1)def printWorkitemInfo (wi):    print '  ~ workitem info :'    print '     wfid    : %s' % wi.lastExpressionId.workflowInstanceId    print '     wfeg    : %s' % wi.lastExpressionId.engineId    print '     wfd     : %s %s' % (wi.lastExpressionId.workflowDefinitionName, wi.lastExpressionId.workflowDefinitionRevision)    print '     wfdurl  : %s' % wi.lastExpressionId.workflowDefinitionUrl    #print '     flowStack size : %s' % len(wi.flowStack)def printHistory (wi):    for hi in wi.history:        print '   ~~ history item'        print '       author         : %s' % hi.author        print '       text           : %s' % hi.text        print '       date           : %s' % hi.date        print '       host           : %s' % hi.host        print '       wfd_name       : %s' % hi.workflowDefinitionName        print '       wfd_revision   : %s' % hi.workflowDefinitionRevision        print '       wf_instance_id : %s' % hi.workflowInstanceIddef sleep (seconds):    every = 5    for i in xrange(int(seconds / every)):        tsleep(every)        sys.stdout.write('.')        sys.stdout.flush()    sys.stdout.write('\n')    sys.stdout.flush()def foundWorkitem (workSession, storeName, testName, present=1):    wi = lookupWorkitem(workSession, storeName, testName)    if (present and wi == None):         die("%s failed ! (nothing in %s)" % (testName, storeName))    if (not present and wi != None):        die("%s failed ! (something in %s)" % (testName, storeName))    return widef checkstring (s1, s2):    if s1 == s2: return None    #if len(s1) != len(s2): return "Not the same length"    for i in xrange(len(s1)):        c1 = s1[i]        c2 = s2[i]        if c1 != c2:             return "Position %i : |%s| != |%s|" % (i, c1, c2)        print "  ~ '%s' == '%s' ok" % (c1, c2)    return "Unknown problem"def checkControllableCount (count, shouldBeCount):    if count != shouldBeCount:        die('wrong count of controllable expressions (%i, should be %i)' % (count, shouldBeCount))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -