⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpjdaemon.java

📁 MPI for java for Distributed Programming
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            now = true;	    noSwitch = false;	  }	  	}	if(noSwitch) {	  jvmArgs.add("-cp");	  if(loader.equals("useLocalLoader")) {	    jvmArgs.add("."+File.pathSeparator+""+	              mpjHomeDir+"/lib/loader2.jar"+                      File.pathSeparator+""+mpjHomeDir+"/lib/mpj.jar"+                      File.pathSeparator+""+mpjHomeDir+"/lib/log4j-1.2.11.jar"+                      File.pathSeparator+""+mpjHomeDir+"/lib/wrapper.jar"		  	  );	  }	  else if(loader.equals("useRemoteLoader")) {	    jvmArgs.add("."+File.pathSeparator+""+	      mpjHomeDir+"/lib/loader2.jar"+              File.pathSeparator+""+mpjHomeDir+"/lib/log4j-1.2.11.jar"+              File.pathSeparator+""+mpjHomeDir+"/lib/wrapper.jar"	    );	  }	}	jArgs = jvmArgs.toArray(new String[0]);	for(int e=0 ; e<jArgs.length; e++) {          if(DEBUG && logger.isDebugEnabled()) {             logger.debug("modified: jArgs["+e+"]="+jArgs[e]);			  }	}		String[] aArgs = appArgs.toArray(new String[0]);        String[] ex =		new String[ (8+jArgs.length+aArgs.length) ];	ex[0] = "java";		for(int i=0 ; i< jArgs.length ; i++) {	  ex[i+1] = jArgs[i];	}	int indx = jArgs.length+1;		ex[indx] = "runtime.daemon.ThreadedWrapper" ; indx++ ; 	ex[indx] = URL; indx++ ; 	ex[indx] = Integer.toString(processes); indx++ ; 	ex[indx] = deviceName; indx++;	ex[indx] = loader; indx++;	ex[indx] = mpjURL ; indx++;		if(className != null) {	  ex[indx] = className;   	}	else {	  ex[indx] = "dummy" ; //this is JAR case ..this arg will never 	                               //be used ...	}		for(int i=0 ; i< aArgs.length ; i++) {	  ex[i+8+jArgs.length] = aArgs[i];	}		          for (int i = 0; i < ex.length; i++) {          if(DEBUG && logger.isDebugEnabled()) {             logger.debug(i+": "+ ex[i]);	  }        } 		/*... Making the command finishes here ...*/        if(DEBUG && logger.isDebugEnabled()) {           logger.debug("creating process-builder object ");	}        ProcessBuilder pb = new ProcessBuilder(ex);	pb.directory(new File(wdir)); 	//Map<String, String> m = pb.environment(); 	//for(String str : m.values()) {        //  if(DEBUG && logger.isDebugEnabled()) {         //    logger.debug("str : "+str);			//  }	//}        pb.redirectErrorStream(true);        if(DEBUG && logger.isDebugEnabled()) {           logger.debug("starting the ThreadedWrapper.");	}        Process p = null;        try {          p = pb.start();        }        catch (Exception e) {          e.printStackTrace();        }        synchronized (processVector) {          processVector[0] = p;        }        if(DEBUG && logger.isDebugEnabled()) {           logger.debug ("started the ThreadedWrapper.");	}        outputHandlerSem.acquireUninterruptibly() ;         pos = 0;        workers[0] = new Thread(outputHandler);        workers[0].start();                //System.out.println("calling join" + hostName);        workers[0].join();        //System.out.println("called join" + hostName);      } //end else (ThreadedWrapper case)      MPJProcessPrintStream.stop();      synchronized (processVector) {        for (int i = 0; i < processVector.length; i++) {          processVector[i].destroy();        }        kill_signal = false;      }      workers = null;      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("Stopping the output");      }      try {        if(DEBUG && logger.isDebugEnabled()) {           logger.debug ("Checking whether peerChannel is closed or what ?" +                    peerChannel.isOpen());	}        if (peerChannel.isOpen()) {          if(DEBUG && logger.isDebugEnabled()) {             logger.debug ("Closing it ..."+peerChannel );	  }          peerChannel.close();        }        if(DEBUG && logger.isDebugEnabled()) {           logger.debug("Was already closed, or i closed it");	}      }      catch (Exception e) {         e.printStackTrace() ;         continue;      }      restoreVariables() ;       if(DEBUG && logger.isDebugEnabled()) {         logger.debug("\n\n ** .. execution ends .. ** \n\n");      }    } //end while(true)  }  private void restoreVariables() {    pos = 0;     jvmArgs.clear();    appArgs.clear();     wdir = null ;     URL = null;    mpjURL = null;     deviceName = null;    className = null ;    processes = 0;    processVector = null;    loader = null;   }   private synchronized void waitToStartExecution () {    while (waitToStartExecution) {      try {        this.wait();      }      catch (Exception e) {        e.printStackTrace();      }    }     waitToStartExecution = true ;   }  static boolean matchMe(String line) throws Exception {    if(!line.contains("@") || line.startsWith("#") ) {      return false;    }    StringTokenizer token = new StringTokenizer(line, "@");	      boolean found = false;     String hostName = token.nextToken() ;    InetAddress host = null ;        try {          host = InetAddress.getByName(hostName) ;    }    catch(Exception e){      return false;   	        }    Enumeration<NetworkInterface> cards =            NetworkInterface.getNetworkInterfaces() ;        foundIt:     while(cards.hasMoreElements()) {      NetworkInterface card = cards.nextElement() ;      Enumeration<InetAddress> addresses = card.getInetAddresses();      while(addresses.hasMoreElements()) {        InetAddress address = addresses.nextElement() ;        if(host.getHostName().equals(address.getHostName()) ||            host.getHostAddress().equals(address.getHostAddress())) {          found = true;          break foundIt;        }      }    }    return found;   }  private synchronized void startExecution () {    waitToStartExecution = false;    this.notify();  }    private void createLogger(String homeDir) throws MPJRuntimeException {      if(logger == null) {      DailyRollingFileAppender fileAppender = null ;      try {        fileAppender = new DailyRollingFileAppender(                            new PatternLayout(                            " %-5p %c %x - %m\n" ),                            homeDir+"/logs/daemon-"+hostName+".log",                            "yyyy-MM-dd-a" );        Logger rootLogger = Logger.getRootLogger() ;        rootLogger.addAppender( fileAppender);        LoggerRepository rep =  rootLogger.getLoggerRepository() ;        rootLogger.setLevel ((Level) Level.ALL );        logger = Logger.getLogger( "mpjdaemon" );      }      catch(Exception e) {        throw new MPJRuntimeException(e) ;      }    }  }  private void serverSocketInit() {    ServerSocketChannel serverChannel;    try {      selector = Selector.open();      serverChannel = ServerSocketChannel.open();      serverChannel.configureBlocking(false);      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("Binding the serverSocketChannel @" + D_SER_PORT);      }      serverChannel.socket().bind(new InetSocketAddress(D_SER_PORT));      serverChannel.register(selector, SelectionKey.OP_ACCEPT);    }    catch (Exception cce) {      cce.printStackTrace();      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("Exception in serverSocketInit()" + cce.getMessage());      }      System.exit(0);    }  }  private void doAccept(SelectableChannel keyChannel) {    if(DEBUG && logger.isDebugEnabled()) {       logger.debug ("---doAccept---");    }    try {      peerChannel = ( (ServerSocketChannel) keyChannel).accept();      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("peerChannel " + peerChannel);      }    }    catch (IOException ioe) {      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("IOException in doAccept");      }      System.exit(0);    }    try {      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("configuring the channel to be non-blocking");      }      peerChannel.configureBlocking(false);    }    catch (IOException ioe) {      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("IOException in doAccept");      }      System.exit(0);    }    try {      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("Registering for OP_READ & OP_WRITE event");      }      peerChannel.register(selector,                           SelectionKey.OP_READ | SelectionKey.OP_WRITE);    }    catch (ClosedChannelException cce) {      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("Closed Channel Exception in doAccept");      }      System.exit(0);    }    try {      peerChannel.socket().setTcpNoDelay(true);    }    catch (Exception e) {}  }  Runnable selectorThread = new Runnable() {    /* This is selector thread */    public void run() {      if(DEBUG && logger.isDebugEnabled()) {         logger.debug ("selector Thread started ");      }      Set readyKeys = null;      Iterator readyItor = null;      SelectionKey key = null;      SelectableChannel keyChannel = null;      /* why are these required here? */      SocketChannel socketChannel = null;      ByteBuffer lilBuffer = ByteBuffer.allocateDirect(8);      ByteBuffer lilBuffer2 = ByteBuffer.allocateDirect(4);      ByteBuffer buffer = ByteBuffer.allocateDirect(1000);      byte[] lilArray = new byte[4];      try {        while (selector.select() > -1) {          readyKeys = selector.selectedKeys();          readyItor = readyKeys.iterator();          while (readyItor.hasNext()) {            key = (SelectionKey) readyItor.next();            readyItor.remove();            keyChannel = (SelectableChannel) key.channel();            if(DEBUG && logger.isDebugEnabled()) {               logger.debug ("\n---selector EVENT---");	    }            if (key.isAcceptable() && selectorAcceptConnect) {              doAccept(keyChannel);            }            else if (key.isConnectable()) {              /*               * why would this method be called?               * At the daemon, this event is not generated ..               */              try {                socketChannel = (SocketChannel) keyChannel;              }              catch (NoConnectionPendingException e) {                continue;              }              if (socketChannel.isConnectionPending()) {                try {                  socketChannel.finishConnect();                }                catch (IOException e) {                  continue;                }              }              //doConnect(socketChannel);            }            else if (key.isReadable()) {              if(DEBUG && logger.isDebugEnabled()) {                 logger.debug ("READ_EVENT");	      }	                    socketChannel = (SocketChannel) keyChannel;	                    int readInt = -1 ; 	                    if(DEBUG && logger.isDebugEnabled()) {                 logger.debug("lilBuffer "+ lilBuffer);         	      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -