📄 mpd.py
字号:
'rhshost' : g.rhsHost, 'rhsport' : g.rhsPort } mpd_send_one_msg(responseSocket,msgToSend) del g.activeSockets[g.rhsSocket] g.rhsSocket.close() g.rhsSocket = responseSocket g.rhsHost = msg['host'] g.rhsPort = int(msg['port']) _add_active_socket(g.rhsSocket,'rhs','_handle_rhs_input',g.rhsHost,g.rhsPort)def _handle_man_challenge_response(responseSocket): msg = mpd_recv_one_msg(responseSocket) if (not msg) or \ (not msg.has_key('cmd')) or (not msg.has_key('response')) or \ (not msg.has_key('host')) or (not msg.has_key('port')) or \ (msg['response'] != g.correctChallengeResponse[responseSocket]): mpd_print(1, 'INVALID msg for man response msg=:%s:' % (msg) ) msgToSend = { 'cmd' : 'invalid_response' } mpd_send_one_msg(responseSocket,msgToSend) del g.correctChallengeResponse[responseSocket] del g.activeSockets[responseSocket] responseSocket.close() else: msgToSend = { 'cmd' : 'OK_to_send_requests' } mpd_send_one_msg(responseSocket,msgToSend) _add_active_socket(responseSocket, 'man_msgs','_handle_man_input', msg['host'],msg['port'])def _handle_man_msgs(manSocket): msg = mpd_recv_one_msg(manSocket) if not msg: del g.activeSockets[manSocket] manSocket.close() return if not msg.has_key('cmd'): mpd_print(1, 'INVALID msg for man request msg=:%s:' % (msg) ) msgToSend = { 'cmd' : 'invalid_msg' } mpd_send_one_msg(manSocket,msgToSend) del g.activeSockets[manSocket] manSocket.close() return if msg['cmd'] == 'spawn': msg['cmd'] = 'mpdrun' # handle much like an mpdrun from a console msg['mpdid_mpdrun_start'] = g.myId msg['nstarted_on_this_loop'] = 0 msg['first_loop'] = 1 msg['jobalias'] = '' if msg.has_key('try_0_locally'): _do_mpdrun(msg) else: mpd_send_one_msg(g.rhsSocket,msg) ## mpd_send_one_msg(manSocket, {'cmd' : 'mpdrun_ack', } ) elif msg['cmd'] == 'get_signal_to_deliver': jobid = msg['jobid'] # jobid of the job being signaled pid = int(msg['pid']) # pid of manager dooing the request sigtype = g.activeJobs[jobid][pid]['signal_to_deliver'] del g.activeJobs[jobid][pid]['signal_to_deliver'] mpd_send_one_msg(manSocket, { 'cmd' : 'signal_to_deliver', 'sigtype' : sigtype } ) else: mpd_print(1, 'INVALID request from man msg=:%s:' % (msg) ) msgToSend = { 'cmd' : 'invalid_request' } mpd_send_one_msg(manSocket,msgToSend)def _add_active_socket(socket,name,handler,host,port): g.activeSockets[socket] = _ActiveSockInfo() g.activeSockets[socket].name = name g.activeSockets[socket].handler = handler g.activeSockets[socket].rhsHost = host g.activeSockets[socket].rhsPort = portdef _add_active_job(jobid,username,pgm,rank,pid): if not g.activeJobs.has_key(jobid): g.activeJobs[jobid] = {} g.activeJobs[jobid][pid] = { 'pgm' : pgm, 'rank' : rank, 'username' : username }def _enter_existing_ring(): # connect to lhs g.lhsHost = g.entryHost g.lhsPort = g.entryPort try: g.lhsSocket = mpd_get_inet_socket_and_connect(g.lhsHost,g.lhsPort) except Exception, errmsg: mpd_raise('unable to enter existing ring at: %s %d' % (g.entryHost,g.entryPort)) _add_active_socket(g.lhsSocket,'lhs','_handle_lhs_input',g.lhsHost,g.lhsPort) msgToSend = { 'cmd' : 'request_to_enter_as_rhs', 'host' : g.myHost, 'port' : g.myPort, 'mpd_version' : mpd_version } mpd_send_one_msg(g.lhsSocket,msgToSend) msg = mpd_recv_one_msg(g.lhsSocket) if (not msg.has_key('cmd')) or \ (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')) or \ (not msg.has_key('g.generation')): mpd_raise('invalid challenge msg: %s' % (msg) ) g.generationFromMsg = int(msg['g.generation']) if g.generationFromMsg > g.generation: g.generation = g.generationFromMsg else: del g.activeSockets[g.lhsSocket] g.lhsSocket.close() return -1 response = new(''.join([g.configParams['password'],msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'host' : g.myHost, 'port' : g.myPort } mpd_send_one_msg(g.lhsSocket,msgToSend) msg = mpd_recv_one_msg(g.lhsSocket) if (not msg.has_key('cmd')) or (msg['cmd'] != 'OK_to_enter_as_rhs'): mpd_raise('NOT OK to enter ring') if (not msg.has_key('rhshost')) or (not msg.has_key('rhsport')): mpd_raise('invalid OK msg: %s' % (msg) ) g.rhsHost = msg['rhshost'] g.rhsPort = int(msg['rhsport']) # connect to rhs g.rhsSocket = mpd_get_inet_socket_and_connect(g.rhsHost,g.rhsPort) _add_active_socket(g.rhsSocket,'rhs','_handle_rhs_input',g.rhsHost,g.rhsPort) msgToSend = { 'cmd' : 'request_to_enter_as_lhs', 'host' : g.myHost, 'port' : g.myPort } mpd_send_one_msg(g.rhsSocket,msgToSend) msg = mpd_recv_one_msg(g.rhsSocket) if (not msg.has_key('cmd')) or \ (msg['cmd'] != 'challenge') or (not msg.has_key('randnum')): mpd_raise('failed to recv challenge from rhs; msg=:%s:' % (msg) ) response = new(''.join([g.configParams['password'],msg['randnum']])).digest() msgToSend = { 'cmd' : 'challenge_response', 'response' : response, 'host' : g.myHost, 'port' : g.myPort } mpd_send_one_msg(g.rhsSocket,msgToSend) msg = mpd_recv_one_msg(g.rhsSocket) if (not msg.has_key('cmd')) or \ (msg['cmd'] != 'OK_to_enter_as_lhs'): mpd_raise('NOT OK to enter ring; msg=:%s:' % (msg) ) return 0def _create_ring_of_one_mpd(): # use a temp port and socket to avoid accidentally # accepting/handling connections by others (tempSocket,tempPort) = mpd_get_inet_listen_socket('',0) g.lhsSocket = mpd_get_inet_socket_and_connect(g.myHost,tempPort) g.lhsHost = g.myHost g.lhsPort = g.myPort _add_active_socket(g.lhsSocket,'lhs','_handle_lhs_input',g.lhsHost,g.lhsPort) (g.rhsSocket,addr) = tempSocket.accept() g.rhsHost = g.myHost g.rhsPort = g.myPort _add_active_socket(g.rhsSocket,'rhs','_handle_rhs_input',g.rhsHost,g.rhsPort) tempSocket.close() g.generation += 1def _process_configfile_params(): if getuid() == 0: # if ROOT configFilename = '/etc/mpd.conf' else: configFilename = environ['HOME'] + '/.mpd.conf' try: mode = stat(configFilename)[0] except: mode = '' if not mode: # mpd_raise('%s: config file not found' % (configFilename) ) print 'configuration file %s not found' % (configFilename) _exit(0) if (mode & 0x3f): # mpd_raise('%s: config file accessible by others' % (configFilename) ) print 'configuration file %s is accessible by others' % (configFilename) print 'change permissions to allow read and write access only by you' _exit(0) configFile = open(configFilename,'r') g.configParams = {} for line in configFile: line = line.rstrip() withoutComments = line.split('#')[0] splitLine = withoutComments.split('=') if len(splitLine) == 2: g.configParams[splitLine[0]] = splitLine[1] else: mpd_print(0, 'skipping config file line = :%s:' % (line) ) if 'password' not in g.configParams.keys(): mpd_raise('%s: configFile has no password' % (configFilename) )def _process_cmdline_args(): g.entryHost = '' g.entryPort = 0 g.tracingMPD = 0 g.allowConsole = 1 g.echoPortNum = 0 g.daemon = 0 g.bulletproof = 0 if g.configParams.has_key('idmyhost'): g.myHost = g.configParams['idmyhost'] else: g.myHost = gethostname() try: (opts,args) = getopt(argv[1:], 'h:p:i:tnedb', ['host=','port=','idmyhost','trace','noconsole','echo', 'daemon','bulletproof']) except: usage() for opt in opts: if opt[0] == '-h' or opt[0] == '--host': g.entryHost = opt[1] elif opt[0] == '-p' or opt[0] == '--port': g.entryPort = int(opt[1]) elif opt[0] == '-i' or opt[0] == '--idmyhost': g.myHost = opt[1] elif opt[0] == '-t' or opt[0] == '--trace': g.tracingMPD = 1 elif opt[0] == '-n' or opt[0] == '--noconsole': g.allowConsole = 0 elif opt[0] == '-e' or opt[0] == '--echo': g.echoPortNum = 1 elif opt[0] == '-d' or opt[0] == '--daemon': g.daemon = 1 elif opt[0] == '-b' or opt[0] == '--bulletproof': g.bulletproof = 1 else: pass ## getopt raises an exception if not recognized if (g.entryHost and not g.entryPort) or (not g.entryHost and g.entryPort): mpd_raise('host and port must be specified together')def sigchld_handler(signum,frame): (donePid,status) = waitpid(-1,WNOHANG) while donePid != 0: for jobid in g.activeJobs.keys(): if g.activeJobs[jobid].has_key(donePid): del g.activeJobs[jobid][donePid] if len(g.activeJobs[jobid]) == 0: del g.activeJobs[jobid] break try: (donePid,status) = waitpid(-1,WNOHANG) except: ## may occur if no more child processes donePid = 0def usage(): print 'mpd version %s' % str(mpd_version) print 'usage: %s -h -p -t -n -e -d -b -i' % argv[0] print ' or: %s --host --port --trace --noconsole --echo --daemon --bulletproof --idmyhost' % argv[0] print 'host and port must be specified together and tell where to enter a ring;' print ' if they are not coded, the mpd forms a stand-alone ring that other mpds' print ' may enter later' print 'trace yields lots of traces thru mpd routines; not generally useful' print 'noconsole is useful for running 2 mpds on the same machine; only one of' print ' them can have a unix socket which a console program can connect to' print 'echo says to echo the listener port for the mpd; useful in scripts' print 'daemon causes mpd to run backgrounded, with no controlling tty' print 'bulletproof says to turn bulletproofing on (experimental)' print 'idmyhost specifies an alternate hostname for the host this mpd is running on' print '.mpd.conf file must be present in home directory with read and write access' print ' only for user, and must contain at least a line with password=<password>' exit(-1)def _cleanup(): try: mpd_print(0, "CLEANING UP" ) if g.conListenSocket in g.activeSockets: # only delete if I put it there unlink(g.conListenName) except: passif __name__ == '__main__': try: g.myId = gethostname() + '_no_port_yet' + '_' + `getpid()` # chgd later mpd_check_python_version() mpd_set_my_id(g.myId) # chgd later proceduresToTrace = [] for (symbol,symtype) in globals().items(): if type(symtype) == FunctionType: proceduresToTrace.append(symbol) mpd_set_procedures_to_trace(proceduresToTrace) _process_configfile_params() _process_cmdline_args() if g.tracingMPD: settrace(mpd_trace_calls) register(_cleanup) _mpd_init() if g.bulletproof: # may use SIG_IGN on all but SIGCHLD and SIGHUP (handled above) while 1: mpdtid = Thread(target=_mpd) mpdtid.start() mpdtid.join() # only come out if exiting or thread fails if g.allExiting: break if g.conSocket: if g.activeSockets[g.conSocket]: msgToSend = { 'cmd' : 'restarting_mpd' } mpd_send_one_msg(g.conSocket,msgToSend) del g.activeSockets[g.conSocket] g.conSocket.close() g.conSocket = 0 else: # import profile # profile.run('_mpd()') _mpd() except mpdError, errmsg: print 'mpd failed (%s); cause: %s' % (g.myId,errmsg)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -