⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpdman.py

📁 刚才是说明 现在是安装程序在 LINUX环境下进行编程的MPICH安装文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
#!/usr/bin/env pythonfrom os     import environ, getpid, pipe, fork, fdopen, read, write, close, dup2, \                   chdir, execvpe, kill, waitpid, _exitfrom sys    import exitfrom socket import gethostname, fromfd, AF_INET, SOCK_STREAMfrom select import select, errorfrom re     import findall, subfrom signal import signal, SIGKILL, SIGUSR1, SIGTSTP, SIGCONT, SIGCHLD, SIG_DFL, SIG_IGNfrom md5    import newfrom mpdlib import mpd_set_my_id, mpd_print, mpd_print_tb, mpd_get_ranks_in_binary_tree, \                   mpd_send_one_line, mpd_recv_one_line, mpd_send_one_msg, mpd_recv_one_msg, \                   mpd_get_inet_listen_socket, mpd_get_inet_socket_and_connect, \                   mpd_get_my_username, mpd_raise, mpdError, mpd_versionglobal get_sigtype_from_mpddef mpdman():    global get_sigtype_from_mpd    get_sigtype_from_mpd = 0    signal(SIGUSR1,sigusr1_handler)    signal(SIGCHLD,SIG_DFL)  # reset mpd's values    myHost = environ['MPDMAN_MYHOST']    myRank = int(environ['MPDMAN_RANK'])    myId = myHost + '_mpdman_' + str(myRank)    spawned = int(environ['MPDMAN_SPAWNED'])    if spawned:        myId = myId + '_s'    mpd_set_my_id(myId)    try:        chdir(environ['MPDMAN_CWD'])    except Exception, errmsg:        errmsg =  '%s: invalid dir: %s' % (myId,environ['MPDMAN_CWD'])        # print errmsg    ## may syslog it in some cases ?    clientPgm = environ['MPDMAN_CLI_PGM']    clientPgmArgs = environ['MPDMAN_PGM_ARGS']    clientPgmArgs = findall('\S+',clientPgmArgs)    mpd_print(0000, "ARGS=", clientPgmArgs )    clientPgmEnv = environ['MPDMAN_PGM_ENVVARS']    clientPgmEnv = findall('\S+',clientPgmEnv)    mpd_print(0000, 'entering mpdman to exec %s' % (clientPgm) )    jobid = environ['MPDMAN_JOBID']    nprocs = int(environ['MPDMAN_NPROCS'])    mpdPort = int(environ['MPDMAN_MPD_LISTEN_PORT'])    mpdConfPasswd = environ['MPDMAN_MPD_CONF_PASSWD']    environ['MPDMAN_MPD_CONF_PASSWD'] = ''  ## do NOT pass it on to clients    conHost = environ['MPDMAN_CONHOST']    conPort = int(environ['MPDMAN_CONPORT'])    lhsHost = environ['MPDMAN_LHSHOST']    lhsPort = int(environ['MPDMAN_LHSPORT'])    host0 = environ['MPDMAN_HOST0']        # only used by right-most man    port0 = int(environ['MPDMAN_PORT0'])   # only used by right-most man    myPort = int(environ['MPDMAN_MY_LISTEN_PORT'])    listenFD = int(environ['MPDMAN_MY_LISTEN_FD'])    mpd_print(0000, "lhost=%s lport=%d h0=%s p0=%d" % (lhsHost,lhsPort,host0,port0) )    listenSocket = fromfd(listenFD,AF_INET,SOCK_STREAM)    close(listenFD)    socketsToSelect = { listenSocket : 1 }    lineLabels = int(environ['MPDMAN_LINE_LABELS'])    startStdoutLineLabel = 1    startStderrLineLabel = 1    myLineLabel = str(myRank) + ': '    # set up pmi stuff early in case I was spawned    kvsname_template = 'kvs_' + host0 + '_' + str(port0) + '_'    default_kvsname = kvsname_template + '0'    default_kvsname = sub('\.','_',default_kvsname)  # chg magpie.cs to magpie_cs    exec('%s = {}' % (default_kvsname) )    kvs_next_id = 1    pmiCollectiveJob = 0    if nprocs == 1:  # one-man ring        lhsSocket = mpd_get_inet_socket_and_connect(host0,port0)  # to myself        (rhsSocket,rhsAddr) = listenSocket.accept()    else:        if myRank == 0:            for i in range(2):    # accept lhs and rhs                (tempSocket,tempAddr) = listenSocket.accept()                msg = mpd_recv_one_msg(tempSocket)                if msg['cmd'] == 'i_am_lhs':                    (lhsSocket,lhsAddr) = (tempSocket,tempAddr)                else:                    (rhsSocket,rhsAddr) = (tempSocket,tempAddr)        else:            lhsSocket = mpd_get_inet_socket_and_connect(lhsHost,lhsPort)            mpd_send_one_msg(lhsSocket, { 'cmd' : 'i_am_rhs' } )            if myRank == (nprocs-1):              # right-most man                rhsSocket = mpd_get_inet_socket_and_connect(host0,port0)                mpd_send_one_msg(rhsSocket, { 'cmd' : 'i_am_lhs' } )            else:                (rhsSocket,rhsAddr) = listenSocket.accept()                msg = mpd_recv_one_msg(rhsSocket)  # drain out the i_am_... msg    socketsToSelect[lhsSocket] = 1    socketsToSelect[rhsSocket] = 1    if myRank == 0:        conSocket = mpd_get_inet_socket_and_connect(conHost,conPort)  # for cntl msgs        socketsToSelect[conSocket] = 1        if spawned:            msgToSend = { 'cmd' : 'spawned_child' }            mpd_send_one_msg(conSocket,msgToSend)        stdoutToConSocket = mpd_get_inet_socket_and_connect(conHost,conPort)        if spawned:            msgToSend = { 'cmd' : 'child_in_stdout_tree', 'from_rank' : myRank }            mpd_send_one_msg(stdoutToConSocket,msgToSend)        stderrToConSocket = mpd_get_inet_socket_and_connect(conHost,conPort)        if spawned:            msgToSend = { 'cmd' : 'child_in_stderr_tree', 'from_rank' : myRank }            mpd_send_one_msg(stderrToConSocket,msgToSend)    else:        conSocket = 0    (clientListenSocket,clientListenPort) = mpd_get_inet_listen_socket('',0)    (pipe_read_cli_stdout,pipe_write_cli_stdout) = pipe()    (pipe_read_cli_stderr,pipe_write_cli_stderr) = pipe()    (pipe_cli_end,pipe_man_end) = pipe()    clientPid = fork()    if clientPid == 0:        mpd_set_my_id(gethostname() + '_man_before_exec_client_' + `getpid()`)        lhsSocket.close()        rhsSocket.close()        listenSocket.close()        if conSocket:            conSocket.close()        # to simply print on the mpd's tty:        #     comment out the next lines        close(pipe_read_cli_stdout)        dup2(pipe_write_cli_stdout,1)  # closes fd 1 (stdout) if open        close(pipe_write_cli_stdout)        close(pipe_read_cli_stderr)        dup2(pipe_write_cli_stderr,2)  # closes fd 2 (stderr) if open        close(pipe_write_cli_stderr)        msg = read(pipe_cli_end,2)        if msg != 'go':            mpd_raise('%s: invalid go msg from man :%s:' % (myId,msg) )        close(pipe_cli_end)        (pmiSocket,pmiAddr) = clientListenSocket.accept()        pmiFile = fdopen(pmiSocket.fileno(),'r')	msg = mpd_recv_one_line(pmiFile)        ## mpd_print(0000, "recvd pmi handshake=:%s:" % msg )        if not msg  or  msg != 'cmd=pmi_handler\n':    # handshake            mpd_raise('%d: invalid msg from handler :%s:' % (myRank,msg) )        clientPgmArgs = [clientPgm] + clientPgmArgs        environ['PATH'] = environ['MPDMAN_CLI_PATH']        environ['PMI_FD'] = str(pmiSocket.fileno())        environ['PMI_SIZE'] = str(nprocs)        environ['PMI_RANK'] = str(myRank)        environ['PMI_DEBUG'] = str(0)        for envvar in clientPgmEnv:            (envkey,envval) = envvar.split('=')            environ[envkey] = envval        ## mpd_print(0000, 'execing clientPgm=:%s:' % (clientPgm) )        try:            execvpe(clientPgm,clientPgmArgs,environ)    # client        except Exception, errmsg:            ## mpd_raise('execvpe failed for client %s; errmsg=:%s:' % (clientPgm,errmsg) )            print '%s: could not run %s; probably executable file not found' % (myId,clientPgm)            exit(0)        _exit(0)  # just in case (does no cleanup)    close(pipe_write_cli_stdout)    close(pipe_write_cli_stderr)    clientStdoutFD = pipe_read_cli_stdout    clientStdoutFile = fdopen(clientStdoutFD,'r')    socketsToSelect[clientStdoutFD] = 1    clientStderrFD = pipe_read_cli_stderr    clientStderrFile = fdopen(clientStderrFD,'r')    socketsToSelect[clientStderrFD] = 1    clientListenSocket.close()    numWithIO = 2    # stdout and stderr so far    waitPids = [clientPid]    # connect to the client telling it that we are providing pmi service    pmiSocket = mpd_get_inet_socket_and_connect('localhost',clientListenPort)    pmiFile = fdopen(pmiSocket.fileno(),'r')    mpd_send_one_line(pmiSocket,'cmd=pmi_handler\n')  # handshake    socketsToSelect[pmiSocket] = 1    # begin setup of stdio tree    (parent,lchild,rchild) = mpd_get_ranks_in_binary_tree(myRank,nprocs)    spawnedChildSockets = []    childrenStdoutTreeSockets = []    childrenStderrTreeSockets = []    if lchild >= 0:        numWithIO += 2    # stdout and stderr from child        msgToSend = { 'cmd' : 'info_for_parent_in_tree',                      'to_rank' : str(lchild),                      'parent_host' : myHost,                      'parent_port' : myPort }        mpd_send_one_msg(rhsSocket,msgToSend)    if rchild >= 0:        numWithIO += 2    # stdout and stderr from child        msgToSend = { 'cmd' : 'info_for_parent_in_tree',                      'to_rank' : str(rchild),                      'parent_host' : myHost,                      'parent_port' : myPort }        mpd_send_one_msg(rhsSocket,msgToSend)    if myRank == 0:        parentStdoutSocket = stdoutToConSocket        parentStderrSocket = stderrToConSocket        msgToSend = { 'cmd' : 'jobgo' }        mpd_send_one_msg(rhsSocket,msgToSend)    else:        parentStdoutSocket = 0        parentStderrSocket = 0    if environ.has_key('MPDMAN_RSHIP'):        rship = environ['MPDMAN_RSHIP']        # (rshipSocket,rshipPort) = mpd_get_inet_listen_socket('',0)        rshipPid = fork()        if rshipPid == 0:            environ['MPDCP_MSHIP_HOST'] = environ['MPDMAN_MSHIP_HOST']            environ['MPDCP_MSHIP_PORT'] = environ['MPDMAN_MSHIP_PORT']            environ['MPDCP_MSHIP_NPROCS'] = str(nprocs)            environ['MPDCP_CLI_PID'] = str(clientPid)            try:                execvpe(rship,[rship],environ)            except Exception, errmsg:                # make sure my error msgs get to console                dup2(parentStdoutSocket.fileno(),1)  # closes fd 1 (stdout) if open                dup2(parentStderrSocket.fileno(),2)  # closes fd 2 (stderr) if open                mpd_raise('execvpe failed for copgm %s; errmsg=:%s:' % (rship,errmsg) )            _exit(0);  # do NOT do cleanup        # rshipSocket.close()        waitPids.append(rshipPid)    pmiBarrierInRecvd = 0    holdingPMIBarrierLoop1 = 0    holdingEndBarrierLoop1 = 0    endBarrierDone = 0    numDone = 0    mpdSocket = 0    while not endBarrierDone:        if get_sigtype_from_mpd:            mpdSocket = mpd_get_inet_socket_and_connect('localhost',mpdPort)            msgToSend = { 'cmd' : 'manager_needs_help', 'host' : myHost, 'port' : myPort }            mpd_send_one_msg(mpdSocket,msgToSend)            msg = mpd_recv_one_msg(mpdSocket)            if (not msg.has_key('cmd')) or  \               (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')):                mpd_raise('%s: failed to recv challenge from rhs; msg=:%s:' % (myId,msg) )            response = new(''.join([mpdConfPasswd,msg['randnum']])).digest()            msgToSend = { 'cmd' : 'challenge_response', 'response' : response,                          'host' : myHost, 'port' : myPort }            mpd_send_one_msg(mpdSocket,msgToSend)            msg = mpd_recv_one_msg(mpdSocket)            if (not msg.has_key('cmd'))  or  (msg['cmd'] != 'OK_to_send_requests'):                mpd_raise('%s: NOT OK to send requests to mpd; msg=:%s:' % (myId,msg) )            msgToSend = { 'cmd' : 'get_signal_to_deliver', 'pid' : `getpid()`,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -