📄 mpd.py
字号:
#!/usr/bin/env pythonfrom sys import stdout, argv, settrace, exitfrom os import environ, getpid, fork, execvpe, setpgrp, waitpid, kill, chdir, \ setsid, getuid, setuid, setreuid, setregid, setgroups, \ umask, close, access, path, stat, unlink, \ R_OK, X_OK, WNOHANG, _exitfrom pwd import getpwnamfrom socket import socket, AF_UNIX, SOCK_STREAM, gethostnamefrom select import select, errorfrom getopt import getoptfrom types import FunctionTypefrom signal import signal, SIGCHLD, SIGKILL, SIGUSR1, SIGHUP, SIG_IGNfrom atexit import registerfrom time import sleepfrom random import seed, randrangefrom syslog import syslogfrom md5 import newfrom threading import Threadfrom mpdlib import mpd_print, mpd_print_tb, mpd_get_ranks_in_binary_tree, \ mpd_send_one_msg, mpd_recv_one_msg, \ mpd_get_inet_listen_socket, mpd_get_inet_socket_and_connect, \ mpd_set_procedures_to_trace, mpd_trace_calls, mpd_raise, mpdError, \ mpd_get_my_username, mpd_get_groups_for_username, \ mpd_set_my_id, mpd_check_python_version, mpd_versionclass _ActiveSockInfo: passclass g: # global data items passdef _mpd_init(): global stdout close(0) g.myPid = getpid() # print "mpd version 0.8 for Edi" (g.mySocket,g.myPort) = mpd_get_inet_listen_socket('',0) if g.echoPortNum: # do this before becoming a daemon print g.myPort stdout.flush() g.myId = '%s_%d' % (g.myHost,g.myPort) mpd_set_my_id(g.myId) # get manager path before doing chdir as daemon below g.fullDirName = path.abspath(path.split(argv[0])[0]) # normalize for platform also g.manager = path.normpath(g.fullDirName + '/mpdman.py') if not access(g.manager,X_OK): mpd_raise('cannot execute manager %s' % (g.manager) ) if g.daemon: # see if I should become a daemon with no controlling tty rc = fork() if rc != 0: # parent exits; child in background exit(0) setsid() # become session leader; no controlling tty signal(SIGHUP,SIG_IGN) # make sure no sighup when leader ends ## leader exits; svr4: make sure do not get another controlling tty rc = fork() if rc != 0: exit(0) chdir("/") # free up filesys for umount umask(0) g.logfileName = '/tmp/mpd2.logfile_' + mpd_get_my_username() logfile = open(g.logfileName,'w') stdout = logfile stderr = logfile print >>stdout, 'logfile for mpd with pid %d' % getpid() stdout.flush() mpd_print(0, 'starting ') g.activeSockets = {} _add_active_socket(g.mySocket, 'my (%s) listener socket' % g.myId, # name '_handle_new_connection', # handler '',0) # host,port g.nextJobInt = 1 g.activeJobs = {} seed() g.correctChallengeResponse = {} g.conListenSocket = 0 g.conSocket = 0 g.allExiting = 0 if g.allowConsole: g.conListenName = '/tmp/mpd2.console_' + mpd_get_my_username() consoleAlreadyExists = 0 if access(g.conListenName,R_OK): # if console is there, see if mpd is listening tempSocket = socket(AF_UNIX,SOCK_STREAM) # note: UNIX socket try: tempSocket.connect(g.conListenName) consoleAlreadyExists = 1 except Exception, errmsg: tempSocket.close() unlink(g.conListenName) if consoleAlreadyExists: # mpd_raise('an mpd is already running with console at %s' % (g.conListenName) ) print 'An mpd is already running with console at %s on %s. ' % (g.conListenName, g.myHost) print 'Start mpd with the -n option for second mpd on same host.' _exit(0) g.conListenSocket = socket(AF_UNIX,SOCK_STREAM) # UNIX g.conListenSocket.bind(g.conListenName) g.conListenSocket.listen(1) _add_active_socket(g.conListenSocket, 'my (%s) console listen socket' % g.myId, # name 'handled-inline', # handler g.conListenName,0) # host,port g.generation = 0 # will chg when enter the ring if g.entryHost: _enter_existing_ring() else: _create_ring_of_one_mpd() signal(SIGCHLD,sigchld_handler)def _mpd(): global stdout # Main Loop done = 0 while not done: socketsToSelect = g.activeSockets.keys() try: (inReadySockets,None,None) = select(socketsToSelect,[],[],30) except error, errmsg: if isinstance(errmsg,Exception) and errmsg[0] == 4: # interrupted system call continue else: mpd_raise('select failed: errmsg=:%s:' % (errmsg) ) for readySocket in inReadySockets: if readySocket not in g.activeSockets.keys(): # deleted on another iteration ? continue if readySocket == g.mySocket: _handle_new_connection() elif readySocket == g.lhsSocket: _handle_lhs_input() if g.allExiting: # got mpdallexit command done = 1 break # out of for loop, then out of while elif readySocket == g.rhsSocket: _handle_rhs_input() # ignoring rc=1 which means we re-entered ring elif readySocket == g.conListenSocket: _handle_console_connection() elif readySocket == g.conSocket: _handle_console_input() elif g.activeSockets[readySocket].name == 'rhs_being_challenged': _handle_rhs_challenge_response(readySocket) elif g.activeSockets[readySocket].name == 'lhs_being_challenged': _handle_lhs_challenge_response(readySocket) elif g.activeSockets[readySocket].name == 'man_being_challenged': _handle_man_challenge_response(readySocket) elif g.activeSockets[readySocket].name == 'man_msgs': _handle_man_msgs(readySocket) else: mpd_raise('unknown ready socket %s' % \ (`g.activeSockets[readySocket].name`) )def _handle_console_connection(): if not g.conSocket: (g.conSocket,newConnAddr) = g.conListenSocket.accept() _add_active_socket(g.conSocket, 'my (%s) console socket' % g.myId, # name '_handle_console_input', # handler g.conSocket,0) # host,port else: mpd_print(1, 'rejecting console; already have one' ) (tempSocket,newConnAddr) = g.conListenSocket.accept() msgToSend = { 'cmd' : 'already_have_a_console' } mpd_send_one_msg(tempSocket,msgToSend) tempSocket.close()def _handle_console_input(): msg = mpd_recv_one_msg(g.conSocket) if not msg: mpd_print(0000, 'console has disappeared; closing it') del g.activeSockets[g.conSocket] g.conSocket.close() g.conSocket = 0 return if not msg.has_key('cmd'): mpd_print(1, 'console sent bad msg :%s:' % msg) mpd_send_one_msg(g.rhsSocket,'cmd','invalid_msg_received_from_you') del g.activeSockets[g.conSocket] g.conSocket.close() g.conSocket = 0 return if msg['cmd'] == 'mpdrun': msg['mpdid_mpdrun_start'] = g.myId msg['nstarted_on_this_loop'] = 0 msg['first_loop'] = 1 if msg.has_key('try_0_locally'): _do_mpdrun(msg) else: mpd_send_one_msg(g.rhsSocket,msg) # send ack after job is going elif msg['cmd'] == 'get_mpd_version': msg = { 'cmd' : 'mpd_version_response', 'mpd_version' : mpd_version } mpd_send_one_msg(g.conSocket,msg) elif msg['cmd'] == 'mpdtrace': msgToSend = { 'cmd' : 'mpdtrace_info', 'dest' : g.myId, 'id' : g.myId, 'lhs' : '%s_%d' % (g.lhsHost,g.lhsPort), 'rhs' : '%s_%d' % (g.rhsHost,g.rhsPort) } mpd_send_one_msg(g.rhsSocket,msgToSend) msgToSend = { 'cmd' : 'mpdtrace_trailer', 'dest' : g.myId } mpd_send_one_msg(g.rhsSocket,msgToSend) # do not send an ack to console now; will send trace info later elif msg['cmd'] == 'mpdallexit': g.allExiting = 1 mpd_send_one_msg(g.rhsSocket, {'cmd' : 'mpdallexit', 'src' : g.myId} ) mpd_send_one_msg(g.conSocket, {'cmd' : 'mpdallexit_ack'} ) elif msg['cmd'] == 'mpdringtest': msg['src'] = g.myId mpd_send_one_msg(g.rhsSocket, msg) # do not send an ack to console now; will send ringtest info later elif msg['cmd'] == 'mpdlistjobs': msgToSend = { 'cmd' : 'local_mpdid', 'id' : g.myId } mpd_send_one_msg(g.conSocket,msgToSend) for jobid in g.activeJobs.keys(): for pid in g.activeJobs[jobid]: msgToSend = { 'cmd' : 'mpdlistjobs_info', 'dest' : g.myId, 'jobid' : jobid, 'username' : g.activeJobs[jobid][pid]['username'], 'host' : g.myHost, 'pid' : str(pid), 'pgm' : g.activeJobs[jobid][pid]['pgm'], 'rank' : g.activeJobs[jobid][pid]['rank'] } mpd_send_one_msg(g.rhsSocket, msgToSend) msgToSend = { 'cmd' : 'mpdlistjobs_trailer', 'dest' : g.myId } mpd_send_one_msg(g.rhsSocket,msgToSend) # do not send an ack to console now; will send listjobs info later elif msg['cmd'] == 'mpdkilljob': msg['src'] = g.myId if msg['mpdid'] == '': msg['mpdid'] = g.myId mpd_send_one_msg(g.rhsSocket, msg) # send ack to console after I get this msg back and do the kill myself elif msg['cmd'] == 'mpdsigjob': msg['src'] = g.myId if msg['mpdid'] == '': msg['mpdid'] = g.myId mpd_send_one_msg(g.rhsSocket, msg) # send ack to console after I get this msg back else: msgToSend = { 'cmd' : 'invalid_msg_received_from_you' } mpd_send_one_msg(g.conSocket,msgToSend) badMsg = 'invalid msg received from console: %s' % (str(msg)) mpd_print(1, badMsg) syslog(badMsg)def _handle_lhs_input(): msg = mpd_recv_one_msg(g.lhsSocket) mpd_print(0000, "MPD LHS GOT MSG :%s:" % msg) if not msg: # lost lhs; don't worry mpd_print(0, "CLOSING g.lhsSocket ", g.lhsSocket ) del g.activeSockets[g.lhsSocket] g.lhsSocket.close() return if msg['cmd'] == 'mpdrun': if msg.has_key('mpdid_mpdrun_start') and msg['mpdid_mpdrun_start'] == g.myId: if msg['nstarted'] == msg['nprocs']: if g.conSocket: mpd_send_one_msg(g.conSocket, {'cmd' : 'mpdrun_ack', } ) return if not msg['first_loop'] and msg['nstarted_on_this_loop'] == 0: if msg.has_key('jobid'): mpd_send_one_msg(g.rhsSocket, {'cmd':'abortjob', 'src' : g.myId, 'jobid' : msg['jobid']}) if g.conSocket: mpd_send_one_msg(g.conSocket, {'cmd' : 'job_failed', 'reason' : 'some_procs_not_started'} ) return msg['first_loop'] = 0 _do_mpdrun(msg) elif msg['cmd'] == 'mpdtrace_info': if msg['dest'] == g.myId: mpd_send_one_msg(g.conSocket,msg) else: mpd_send_one_msg(g.rhsSocket,msg) elif msg['cmd'] == 'mpdtrace_trailer': if msg['dest'] == g.myId: mpd_send_one_msg(g.conSocket,msg) else: msgToSend = { 'cmd' : 'mpdtrace_info', 'dest' : msg['dest'], 'id' : g.myId, 'lhs' : '%s_%d' % (g.lhsHost,g.lhsPort), 'rhs' : '%s_%d' % (g.rhsHost,g.rhsPort) } mpd_send_one_msg(g.rhsSocket, msgToSend) mpd_send_one_msg(g.rhsSocket, msg) elif msg['cmd'] == 'mpdlistjobs_info': if msg['dest'] == g.myId: mpd_send_one_msg(g.conSocket,msg) else: mpd_send_one_msg(g.rhsSocket,msg) elif msg['cmd'] == 'mpdlistjobs_trailer': if msg['dest'] == g.myId: mpd_send_one_msg(g.conSocket,msg) else: for jobid in g.activeJobs.keys(): for pid in g.activeJobs[jobid]: msgToSend = { 'cmd' : 'mpdlistjobs_info', 'dest' : msg['dest'], 'jobid' : jobid,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -