📄 mpd.py
字号:
'username' : g.activeJobs[jobid][pid]['username'], 'host' : g.myHost, 'pgm' : g.activeJobs[jobid][pid]['pgm'], 'rank' : g.activeJobs[jobid][pid]['rank'] } mpd_send_one_msg(g.rhsSocket, msgToSend) mpd_send_one_msg(g.rhsSocket, msg) elif msg['cmd'] == 'mpdallexit': g.allExiting = 1 if msg['src'] != g.myId: mpd_send_one_msg(g.rhsSocket, msg) elif msg['cmd'] == 'mpdringtest': if msg['src'] != g.myId: mpd_send_one_msg(g.rhsSocket, msg) else: numLoops = msg['numloops'] - 1 if numLoops > 0: msg['numloops'] = numLoops mpd_send_one_msg(g.rhsSocket, msg) else: mpd_send_one_msg(g.conSocket, {'cmd' : 'mpdringtest_done' }) elif msg['cmd'] == 'mpdsigjob': if msg['src'] == g.myId: mpd_send_one_msg(g.conSocket, {'cmd' : 'mpdsigjob_ack' }) else: mpd_send_one_msg(g.rhsSocket,msg) for jobid in g.activeJobs.keys(): sjobid = jobid.split(' ') # jobnum and mpdid if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ or (msg['jobalias'] and sjobid[2] == msg['jobalias']): for pid in g.activeJobs[jobid].keys(): if g.activeJobs[jobid][pid]['username'] == msg['username'] \ or g.activeJobs[jobid][pid]['username'] == 'root': g.activeJobs[jobid][pid]['signal_to_deliver'] = msg['sigtype'] kill(pid, SIGUSR1) # tell man to contact me and ask about signal elif msg['cmd'] == 'mpdkilljob': if msg['src'] == g.myId: mpd_send_one_msg(g.conSocket, {'cmd' : 'mpdkilljob_ack' }) else: mpd_send_one_msg(g.rhsSocket,msg) for jobid in g.activeJobs.keys(): sjobid = jobid.split(' ') # jobnum and mpdid if (sjobid[0] == msg['jobnum'] and sjobid[1] == msg['mpdid']) \ or (msg['jobalias'] and sjobid[2] == msg['jobalias']): for pid in g.activeJobs[jobid].keys(): if g.activeJobs[jobid][pid]['username'] == msg['username'] \ or g.activeJobs[jobid][pid]['username'] == 'root': kill(pid * (-1), SIGKILL) # neg pid -> group # del g.activeJobs[jobid] ## handled by sigchld_handler elif msg['cmd'] == 'abortjob': if msg['src'] != g.myId: mpd_send_one_msg(g.rhsSocket,msg) for jobid in g.activeJobs.keys(): if jobid == msg['jobid']: for pid in g.activeJobs[jobid].keys(): kill(pid * (-1), SIGKILL) # neg pid -> group # del g.activeJobs[jobid] ## handled by sigchld_handler else: mpd_print(1, 'unrecognized cmd from lhs: %s' % (msg) )def _do_mpdrun(msg): mpd_print(0000, "DO_MPDRUN MSG=:%s:" % msg) handled_one__any_ = 0 while 1: if msg['nstarted'] >= msg['nprocs']: break if handled_one__any_: break hosts = msg['hosts'] if g.myHost not in hosts.values() and '_any_' not in hosts.values(): break currRank = msg['nstarted'] found = 0 for ranks in hosts.keys(): (lo,hi) = ranks if hosts[ranks] == g.myHost and (currRank >= lo and currRank <= hi): found = 1 break if not found: for ranks in hosts.keys(): (lo,hi) = ranks if hosts[ranks] == '_any_' and (currRank >= lo and currRank <= hi): handled_one__any_ = 1 found = 1 break if not found: break if lo < hi: msg['hosts'][(lo+1,hi)] = msg['hosts'][ranks] del msg['hosts'][ranks] msg['nstarted'] += 1 msg['nstarted_on_this_loop'] += 1 if currRank == 0: manLhsHost = 'dummy_host' manLhsPort = 0 else: manLhsHost = msg['lhshost'] manLhsPort = msg['lhsport'] (tempSocket,tempPort) = mpd_get_inet_listen_socket('',0) msg['lhshost'] = g.myHost msg['lhsport'] = tempPort if currRank == 0: msg['host0'] = g.myHost msg['port0'] = tempPort manHost0 = msg['host0'] manPort0 = msg['port0'] if currRank == 0: jobid = str(g.nextJobInt) + ' ' + g.myId + ' ' + msg['jobalias'] g.nextJobInt += 1 msg['jobid'] = jobid else: jobid = msg['jobid'] users = msg['users'] for ranks in users.keys(): if currRank >= lo and currRank <= hi: username = users[ranks] break execs = msg['execs'] for ranks in execs.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgm = execs[ranks] break paths = msg['paths'] for ranks in paths.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pathForExec = paths[ranks] break args = msg['args'] for ranks in args.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmargs = args[ranks] break envvars = msg['envvars'] for ranks in envvars.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: pgmenvvars = envvars[ranks] break cwds = msg['cwds'] for ranks in cwds.keys(): (lo,hi) = ranks if currRank >= lo and currRank <= hi: cwd = cwds[ranks] break rc = fork() if rc == 0: mpd_set_my_id('%s_man_before_exec_%d' % (g.myHost,g.myPid) ) for sock in g.activeSockets: sock.close() setpgrp() environ['MPDMAN_MYHOST'] = g.myHost environ['MPDMAN_JOBID'] = jobid environ['MPDMAN_CLI_PGM'] = pgm environ['MPDMAN_CLI_PATH'] = pathForExec environ['MPDMAN_PGM_ARGS'] = pgmargs environ['MPDMAN_PGM_ENVVARS'] = pgmenvvars environ['MPDMAN_CWD'] = cwd environ['MPDMAN_SPAWNED'] = str(msg['spawned']) environ['MPDMAN_NPROCS'] = str(msg['nprocs']) environ['MPDMAN_MPD_LISTEN_PORT'] = str(g.myPort) environ['MPDMAN_MPD_CONF_PASSWD'] = g.configParams['password'] environ['MPDMAN_CONHOST'] = msg['conhost'] environ['MPDMAN_CONPORT'] = str(msg['conport']) environ['MPDMAN_RANK'] = str(currRank) environ['MPDMAN_LHSHOST'] = manLhsHost environ['MPDMAN_LHSPORT'] = str(manLhsPort) environ['MPDMAN_HOST0'] = manHost0 environ['MPDMAN_PORT0'] = str(manPort0) environ['MPDMAN_MY_LISTEN_PORT'] = str(tempPort) environ['MPDMAN_MY_LISTEN_FD'] = str(tempSocket.fileno()) if msg.has_key('line_labels'): environ['MPDMAN_LINE_LABELS'] = '1' else: environ['MPDMAN_LINE_LABELS'] = '0' if msg.has_key('rship'): environ['MPDMAN_RSHIP'] = msg['rship'] environ['MPDMAN_MSHIP_HOST'] = msg['mship_host'] environ['MPDMAN_MSHIP_PORT'] = str(msg['mship_port']) if getuid() == 0: pwent = getpwnam(username) uid = pwent[2] gid = pwent[3] setgroups(mpd_get_groups_for_username(username)) setregid(gid,gid) setreuid(uid,uid) execvpe(g.manager,[g.manager],environ) _exit(0); # do NOT do cleanup else: tempSocket.close() _add_active_job(jobid,username,pgm,currRank,rc) mpd_print(0000, "FORWARDING MSG=:%s:" % msg) mpd_send_one_msg(g.rhsSocket,msg) # forward it on arounddef _handle_rhs_input(): if g.allExiting: return msg = mpd_recv_one_msg(g.rhsSocket) if not msg: # lost rhs; re-knit the ring del g.activeSockets[g.rhsSocket] g.rhsSocket.close() if g.activeSockets.has_key(g.lhsSocket): del g.activeSockets[g.lhsSocket] g.lhsSocket.close() mpd_print(0000, 're-entering the ring' ) if g.entryHost: inRing = 0 numTries = 5 while not inRing and numTries > 0: rc = _enter_existing_ring() if rc < 0: # fails if next g.generation <= current sleep(2) else: inRing = 1 numTries -= 1 else: _create_ring_of_one_mpd() return 1 mpd_print(1, 'unexpected from rhs; msg=:%s:' % (msg) ) return 0def _handle_new_connection(): randHiRange = 10000 (newConnSocket,newConnAddr) = g.mySocket.accept() msg = mpd_recv_one_msg(newConnSocket) if (not msg.has_key('cmd')) or (not msg.has_key('host')) or \ (not msg.has_key('port')): mpd_print(1, 'INVALID msg from new connection :%s: msg=:%s:' % (newConnAddr,msg) ) newConnSocket.close() return if msg['cmd'] == 'request_to_enter_as_rhs': if msg['mpd_version'] != mpd_version: msgToSend = { 'cmd' : 'entry_rejected_bad_mpd_version', 'your_version' : msg['mpd_version'], 'my_version' : mpd_version } mpd_send_one_msg(newConnSocket,msgToSend) return randNumStr = '%04d' % (randrange(1,randHiRange)) # 0001-(hi-1), inclusive g.correctChallengeResponse[newConnSocket] = \ new(''.join([g.configParams['password'],randNumStr])).digest() msgToSend = { 'cmd' : 'challenge', 'randnum' : randNumStr, 'g.generation' : g.generation } # only send to rhs mpd_send_one_msg(newConnSocket,msgToSend) _add_active_socket(newConnSocket,'rhs_being_challenged', '_handle_rhs_challenge_response', msg['host'],msg['port']) elif msg['cmd'] == 'request_to_enter_as_lhs': randNumStr = '%04d' % (randrange(1,randHiRange)) # 0001-(hi-1), inclusive g.correctChallengeResponse[newConnSocket] = \ new(''.join([g.configParams['password'],randNumStr])).digest() msgToSend = { 'cmd' : 'challenge', 'randnum' : randNumStr } mpd_send_one_msg(newConnSocket,msgToSend) _add_active_socket(newConnSocket,'lhs_being_challenged', '_handle_lhs_challenge_response', msg['host'],msg['port']) elif msg['cmd'] == 'manager_needs_help': randNumStr = '%04d' % (randrange(1,randHiRange)) # 0001-(hi-1), inclusive g.correctChallengeResponse[newConnSocket] = \ new(''.join([g.configParams['password'],randNumStr])).digest() msgToSend = { 'cmd' : 'challenge', 'randnum' : randNumStr } mpd_send_one_msg(newConnSocket,msgToSend) _add_active_socket(newConnSocket,'man_being_challenged', '_handle_man_challenge_response', msg['host'],msg['port']) else: mpd_print(1, 'INVALID msg from new connection :%s: msg=:%s:' % (newConnAddr,msg) ) newConnSocket.close()def _handle_lhs_challenge_response(responseSocket): msg = mpd_recv_one_msg(responseSocket) if (not msg) or \ (not msg.has_key('cmd')) or (not msg.has_key('response')) or \ (not msg.has_key('host')) or (not msg.has_key('port')) or \ (msg['response'] != g.correctChallengeResponse[responseSocket]): mpd_print(1, 'INVALID msg for lhs response msg=:%s:' % (msg) ) msgToSend = { 'cmd' : 'invalid_response' } mpd_send_one_msg(responseSocket,msgToSend) del g.correctChallengeResponse[responseSocket] del g.activeSockets[responseSocket] responseSocket.close() else: msgToSend = { 'cmd' : 'OK_to_enter_as_lhs' } mpd_send_one_msg(responseSocket,msgToSend) if g.activeSockets.has_key(g.lhsSocket): del g.activeSockets[g.lhsSocket] g.lhsSocket.close() g.lhsSocket = responseSocket g.lhsHost = msg['host'] g.lhsPort = int(msg['port']) _add_active_socket(g.lhsSocket,'lhs','_handle_lhs_input',g.lhsHost,g.lhsPort)def _handle_rhs_challenge_response(responseSocket): msg = mpd_recv_one_msg(responseSocket) if (not msg) or \ (not msg.has_key('cmd')) or (not msg.has_key('response')) or \ (not msg.has_key('host')) or (not msg.has_key('port')) or \ (msg['response'] != g.correctChallengeResponse[responseSocket]): mpd_print(1, 'INVALID msg for rhs response msg=:%s:' % (msg) ) msgToSend = { 'cmd' : 'invalid_response' } mpd_send_one_msg(responseSocket,msgToSend) del g.correctChallengeResponse[responseSocket] del g.activeSockets[responseSocket] responseSocket.close() else: msgToSend = { 'cmd' : 'OK_to_enter_as_rhs',
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -