📄 mpjdaemon.java
字号:
now = true; noSwitch = false; } } if(noSwitch) { jvmArgs.add("-cp"); if(loader.equals("useLocalLoader")) { jvmArgs.add("."+File.pathSeparator+""+ mpjHomeDir+"/lib/loader2.jar"+ File.pathSeparator+""+mpjHomeDir+"/lib/mpj.jar"+ File.pathSeparator+""+mpjHomeDir+"/lib/log4j-1.2.11.jar"+ File.pathSeparator+""+mpjHomeDir+"/lib/wrapper.jar" ); } else if(loader.equals("useRemoteLoader")) { jvmArgs.add("."+File.pathSeparator+""+ mpjHomeDir+"/lib/loader2.jar"+ File.pathSeparator+""+mpjHomeDir+"/lib/log4j-1.2.11.jar"+ File.pathSeparator+""+mpjHomeDir+"/lib/wrapper.jar" ); } } jArgs = jvmArgs.toArray(new String[0]); for(int e=0 ; e<jArgs.length; e++) { if(DEBUG && logger.isDebugEnabled()) { logger.debug("modified: jArgs["+e+"]="+jArgs[e]); } } String[] aArgs = appArgs.toArray(new String[0]); String[] ex = new String[ (8+jArgs.length+aArgs.length) ]; ex[0] = "java"; for(int i=0 ; i< jArgs.length ; i++) { ex[i+1] = jArgs[i]; } int indx = jArgs.length+1; ex[indx] = "runtime.daemon.ThreadedWrapper" ; indx++ ; ex[indx] = URL; indx++ ; ex[indx] = Integer.toString(processes); indx++ ; ex[indx] = deviceName; indx++; ex[indx] = loader; indx++; ex[indx] = mpjURL ; indx++; if(className != null) { ex[indx] = className; } else { ex[indx] = "dummy" ; //this is JAR case ..this arg will never //be used ... } for(int i=0 ; i< aArgs.length ; i++) { ex[i+8+jArgs.length] = aArgs[i]; } for (int i = 0; i < ex.length; i++) { if(DEBUG && logger.isDebugEnabled()) { logger.debug(i+": "+ ex[i]); } } /*... Making the command finishes here ...*/ if(DEBUG && logger.isDebugEnabled()) { logger.debug("creating process-builder object "); } ProcessBuilder pb = new ProcessBuilder(ex); pb.directory(new File(wdir)); //Map<String, String> m = pb.environment(); //for(String str : m.values()) { // if(DEBUG && logger.isDebugEnabled()) { // logger.debug("str : "+str); // } //} pb.redirectErrorStream(true); if(DEBUG && logger.isDebugEnabled()) { logger.debug("starting the ThreadedWrapper."); } Process p = null; try { p = pb.start(); } catch (Exception e) { e.printStackTrace(); } synchronized (processVector) { processVector[0] = p; } if(DEBUG && logger.isDebugEnabled()) { logger.debug ("started the ThreadedWrapper."); } outputHandlerSem.acquireUninterruptibly() ; pos = 0; workers[0] = new Thread(outputHandler); workers[0].start(); //System.out.println("calling join" + hostName); workers[0].join(); //System.out.println("called join" + hostName); } //end else (ThreadedWrapper case) MPJProcessPrintStream.stop(); synchronized (processVector) { for (int i = 0; i < processVector.length; i++) { processVector[i].destroy(); } kill_signal = false; } workers = null; if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Stopping the output"); } try { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Checking whether peerChannel is closed or what ?" + peerChannel.isOpen()); } if (peerChannel.isOpen()) { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Closing it ..."+peerChannel ); } peerChannel.close(); } if(DEBUG && logger.isDebugEnabled()) { logger.debug("Was already closed, or i closed it"); } } catch (Exception e) { e.printStackTrace() ; continue; } restoreVariables() ; if(DEBUG && logger.isDebugEnabled()) { logger.debug("\n\n ** .. execution ends .. ** \n\n"); } } //end while(true) } private void restoreVariables() { pos = 0; jvmArgs.clear(); appArgs.clear(); wdir = null ; URL = null; mpjURL = null; deviceName = null; className = null ; processes = 0; processVector = null; loader = null; } private synchronized void waitToStartExecution () { while (waitToStartExecution) { try { this.wait(); } catch (Exception e) { e.printStackTrace(); } } waitToStartExecution = true ; } static boolean matchMe(String line) throws Exception { if(!line.contains("@") || line.startsWith("#") ) { return false; } StringTokenizer token = new StringTokenizer(line, "@"); boolean found = false; String hostName = token.nextToken() ; InetAddress host = null ; try { host = InetAddress.getByName(hostName) ; } catch(Exception e){ return false; } Enumeration<NetworkInterface> cards = NetworkInterface.getNetworkInterfaces() ; foundIt: while(cards.hasMoreElements()) { NetworkInterface card = cards.nextElement() ; Enumeration<InetAddress> addresses = card.getInetAddresses(); while(addresses.hasMoreElements()) { InetAddress address = addresses.nextElement() ; if(host.getHostName().equals(address.getHostName()) || host.getHostAddress().equals(address.getHostAddress())) { found = true; break foundIt; } } } return found; } private synchronized void startExecution () { waitToStartExecution = false; this.notify(); } private void createLogger(String homeDir) throws MPJRuntimeException { if(logger == null) { DailyRollingFileAppender fileAppender = null ; try { fileAppender = new DailyRollingFileAppender( new PatternLayout( " %-5p %c %x - %m\n" ), homeDir+"/logs/daemon-"+hostName+".log", "yyyy-MM-dd-a" ); Logger rootLogger = Logger.getRootLogger() ; rootLogger.addAppender( fileAppender); LoggerRepository rep = rootLogger.getLoggerRepository() ; rootLogger.setLevel ((Level) Level.ALL ); logger = Logger.getLogger( "mpjdaemon" ); } catch(Exception e) { throw new MPJRuntimeException(e) ; } } } private void serverSocketInit() { ServerSocketChannel serverChannel; try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Binding the serverSocketChannel @" + D_SER_PORT); } serverChannel.socket().bind(new InetSocketAddress(D_SER_PORT)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception cce) { cce.printStackTrace(); if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Exception in serverSocketInit()" + cce.getMessage()); } System.exit(0); } } private void doAccept(SelectableChannel keyChannel) { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("---doAccept---"); } try { peerChannel = ( (ServerSocketChannel) keyChannel).accept(); if(DEBUG && logger.isDebugEnabled()) { logger.debug ("peerChannel " + peerChannel); } } catch (IOException ioe) { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("IOException in doAccept"); } System.exit(0); } try { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("configuring the channel to be non-blocking"); } peerChannel.configureBlocking(false); } catch (IOException ioe) { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("IOException in doAccept"); } System.exit(0); } try { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Registering for OP_READ & OP_WRITE event"); } peerChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } catch (ClosedChannelException cce) { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("Closed Channel Exception in doAccept"); } System.exit(0); } try { peerChannel.socket().setTcpNoDelay(true); } catch (Exception e) {} } Runnable selectorThread = new Runnable() { /* This is selector thread */ public void run() { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("selector Thread started "); } Set readyKeys = null; Iterator readyItor = null; SelectionKey key = null; SelectableChannel keyChannel = null; /* why are these required here? */ SocketChannel socketChannel = null; ByteBuffer lilBuffer = ByteBuffer.allocateDirect(8); ByteBuffer lilBuffer2 = ByteBuffer.allocateDirect(4); ByteBuffer buffer = ByteBuffer.allocateDirect(1000); byte[] lilArray = new byte[4]; try { while (selector.select() > -1) { readyKeys = selector.selectedKeys(); readyItor = readyKeys.iterator(); while (readyItor.hasNext()) { key = (SelectionKey) readyItor.next(); readyItor.remove(); keyChannel = (SelectableChannel) key.channel(); if(DEBUG && logger.isDebugEnabled()) { logger.debug ("\n---selector EVENT---"); } if (key.isAcceptable() && selectorAcceptConnect) { doAccept(keyChannel); } else if (key.isConnectable()) { /* * why would this method be called? * At the daemon, this event is not generated .. */ try { socketChannel = (SocketChannel) keyChannel; } catch (NoConnectionPendingException e) { continue; } if (socketChannel.isConnectionPending()) { try { socketChannel.finishConnect(); } catch (IOException e) { continue; } } //doConnect(socketChannel); } else if (key.isReadable()) { if(DEBUG && logger.isDebugEnabled()) { logger.debug ("READ_EVENT"); } socketChannel = (SocketChannel) keyChannel; int readInt = -1 ; if(DEBUG && logger.isDebugEnabled()) { logger.debug("lilBuffer "+ lilBuffer); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -