⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 niodevice.java

📁 MPI for java for Distributed Programming
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        NIORecvRequest next = send.arrNext[i];        NIORecvRequest prev = send.arrPrev[i];        prev.arrNext[i] = next;        next.arrPrev[i] = prev;      }    }    NIORecvRequest check(int context, UUID srcUUID, int tag) {      Key key = new Key(context, srcUUID, tag);      return get(key);    }    void add(NIORecvRequest request) {      request.arrKeys = new NIODevice.Key[] {          new NIODevice.Key(request.context, request.srcUUID, request.tag),          new NIODevice.Key(request.context,                            request.srcUUID, xdev.Device.ANY_TAG),          new NIODevice.Key(request.context, xdev.Device.ANY_SRC.uuid(),                            request.tag),          new NIODevice.Key(request.context, xdev.Device.ANY_SRC.uuid(),                            xdev.Device.ANY_TAG)      };      for (int i = 0; i < request.arrKeys.length; i++) {        add(i, request.arrKeys[i], request);      }    }  }  class Key {    private int context, tag;    private UUID srcUUID;    Key(int context, UUID srcUUID, int tag) {      this.context = context;      this.srcUUID = srcUUID;      this.tag = tag;    }    public int hashCode() {      return tag + context * 5 + srcUUID.hashCode() * 17;    }    public boolean equals(Object obj) {      if (obj instanceof Key) {        Key other = (Key) obj;        return (other.context == context) && (srcUUID.equals(other.srcUUID))            && (other.tag == tag);      }      return false;    }  }  ArrvQueue arrQue = new ArrvQueue();  /*   * Name of machine where this xdev process is running   */  String localHostName = null;  /* Server Socket Channel */  ServerSocketChannel writableServerChannel = null;  ServerSocketChannel readableServerChannel = null;  ByteBuffer rcb = ByteBuffer.allocate(45);  ByteBuffer rendezBuffer = ByteBuffer.allocate(8);  ByteBuffer rendez_send_buffer = ByteBuffer.allocate(17);  ByteBuffer wcb = ByteBuffer.allocate(49);  ByteBuffer e_wcb = ByteBuffer.allocate(49);  static ByteBuffer _wcb = ByteBuffer.allocate(21); //eendezCtrlMsgR2S ...  ByteBuffer s_wcb = ByteBuffer.allocate(20); //rendezCtrlMsgR2S ...  /* Threads for two selectors */  Thread selectorThreadStarter = null;  int psl = 0, nprocs = 0, rank = 0, size = 0, my_server_port = 0;  ProcessID[] pids = null;  ProcessID id = null;  /*   * This integer is used as the header to send initial control messages   */  private final int INIT_MSG_HEADER_DATA_CHANNEL = -21;  private final int INIT_MSG_HEADER_CTRL_CHANNEL = -20;  private final int RENDEZ_CTRL_MSG_LENGTH = 4;  private final int ACK_LENGTH = 17;  private final int CTRL_MSG_LENGTH = 45;    int SEND_OVERHEAD = CTRL_MSG_LENGTH + 4 ;  int RECV_OVERHEAD = 0;   private final int STD_COMM_MODE = 3;  private final int SYNC_COMM_MODE = 2;  private final boolean NO_ACK_RECEIVED = false;  private final boolean REQ_NOT_COMPLETED = false;  private final boolean RECV_POSTED = true;  private final int READY_TO_SEND = -24;  private static final int ACK_HEADER = -23;  private final int RENDEZ_HEADER = -22;  private final int SEND_ACK_TO_SENDER = -80;  private final int RECV_IN_USER_MEMORY = -81;  private final int RECV_IN_DEV_MEMORY = -82;  private final int MORE_TO_WRITE = -83;  private final int MORE_TO_READ = -84;  private String mpjHomeDir = null;  SocketChannel msgReceivedFrom; //what is this doing here?  boolean finished = false;  public NIODevice() {    this.deviceName = "niodev";   }  /**   * Initializes niodev.   * @param args Arguments to NIODevice.   * @return ProcessID[] An array of ProcessIDs.   */  public ProcessID[] init(String args[]) throws XDevException {    /*     *     * The init method reads names/ports/ranks from a config file. It finds     * its own entry in the config file (by comparing ranks), and creates     * a server socket at the port specified for that entry. Also, it creates     * another server socket at (portspecified+1). It connects to server     * sockets (not one, two server sockets) of processes with rank higher     * than its own.     *     * At the end of this process, each process is connected to every     * other process with two socketChannels. The reason for two     * channels is that every process has writable and reable channel.     * The writable channel is in blocking mode, whereas, the readable     * channel is in non-blocking mode. In terms of datastructures,     * 'writableChannels' (Vector) contains all writable channels, and     * 'readableChannels' (Vector) contains all readable channels for     * every process. The next step is that each process send its own rank,     * ProcessID to all the other processes. At the end of this, each process     * knows about all the peers and have ProcessID (key), SocketChannel (val)     * in 'worldWritableTable' and 'worldReadableTable'.     *     * As the name suggests, worldWritableTable is used for writing messages     * into channels, and worldReadableTable is used for receiving. The     * selector-thread would generate events for worldReadableTable     * SocketChannels whereas, the ones (SocketChannels) in     * worldWritableTable have nothing to do with selector thread as they     * are in blocking mode.     *     */    if (args.length < 3) {      throw new XDevException("Usage: " +        "java NIODevice <myrank> <conf_file> <device_name>"+           "conf_file can be, ../conf/xdev.conf <Local>"+           "OR http://holly.dsg.port.ac.uk:15000/xdev.conf <Remote>");    }    rank = Integer.parseInt(args[0]);    UUID myuuid = UUID.randomUUID();    id = new ProcessID(myuuid); //, rank);    Map<String,String> map = System.getenv() ;    mpjHomeDir = map.get("MPJ_HOME");    try {      localaddr = InetAddress.getLocalHost();      localHostName = localaddr.getHostName();      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.info("--init method of niodev is called--");        logger.info("Address: " + localaddr);        logger.info("Name :" + localHostName);        logger.info("rank :" + rank);      }    }    catch (UnknownHostException unkhe) {      throw new XDevException(unkhe);    }    ConfigReader reader = null;    try {      reader = new ConfigReader(args[1]);      nprocs = (new Integer(reader.readNoOfProc())).intValue();      psl = (new Integer(reader.readIntAsString())).intValue();      if(psl < 12) {        logger.debug("lowest possible psl is 12 bytes"); 	              psl = 12;  	            }    }    catch (Exception config_error) {      throw new XDevException(config_error);    }    pids = new ProcessID[nprocs];    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info("total processes:<" + nprocs);      logger.info("protocolSwitchLimit :<" + psl);    }    String[] nodeList = new String[nprocs];    int[] pList = new int[nprocs];    int[] rankList = new int[nprocs];    int count = 0;    while (count < nprocs) {      String line = null;      try {        line = reader.readLine();      }      catch (IOException ioe) {        throw new XDevException(ioe);      }      if (line == null || line.equals("") || line.equals("#")) {        continue;      }      line = line.trim();      StringTokenizer tokenizer = new StringTokenizer(line, "@");      nodeList[count] = tokenizer.nextToken();      pList[count] = (new Integer(tokenizer.nextToken())).intValue();      rankList[count] = (new Integer(tokenizer.nextToken())).intValue();      count++;    }    reader.close();    /* Open the selector */    try {      selector = Selector.open();    }    catch (IOException ioe) {      throw new XDevException(ioe);    }    /* Create server socket */    SocketChannel[] rChannels = new SocketChannel[nodeList.length - 1];    try {      writableServerChannel = ServerSocketChannel.open();      writableServerChannel.configureBlocking(false);      writableServerChannel.socket().bind(new InetSocketAddress(pList[rank]));      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.debug("created writableServerChannel on port " + pList[rank]);      }      writableServerChannel.register(selector, SelectionKey.OP_ACCEPT);    }    catch (IOException ioe) {      throw new XDevException(ioe);    }    my_server_port = pList[rank];    /* Create control server socket */    SocketChannel[] wChannels = new SocketChannel[nodeList.length - 1];    try {      readableServerChannel = ServerSocketChannel.open();      readableServerChannel.configureBlocking(false);      readableServerChannel.socket().bind(          new InetSocketAddress( (pList[rank] + 1)));      readableServerChannel.register(selector, SelectionKey.OP_ACCEPT);      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.debug("created readableServerChannel on port " +                     (pList[rank] + 1));      }    }    catch (IOException ioe) {      throw new XDevException(ioe);    }    /* This is connection-code for data-channels. */    boolean connected = false;    int temp = 0, index = 0;    /*     * This while loop is connecting to server sockets of other     * peers. If there are 4 processes, process 0 will not connect     * to any process, process 1 will connect to process 0, process     * 2 will connect to pro 0&1, and process 3 will connect to pro     * 0&1&2     */    while (temp < nprocs - 1) {      if (rank == rankList[temp]) {        temp++;        continue;      }      if (rankList[temp] < rank) {        while (!connected) {          try {            rChannels[index] = SocketChannel.open();            rChannels[index].configureBlocking(true);          }          catch (Exception e) {            throw new XDevException(e);          }          if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {            logger.debug("Connecting to " + nodeList[temp] + "@" + pList[temp]);          }          try {            connected = rChannels[index].connect(                new InetSocketAddress(nodeList[temp], pList[temp]));          }          catch (AlreadyConnectedException ace) {            throw new XDevException(ace);          }          catch (ConnectionPendingException cpe) {            throw new XDevException(cpe);          }          catch (ClosedChannelException cce) {            throw new XDevException(cce);          }          catch (UnresolvedAddressException uae) {            throw new XDevException(uae);          }          catch (UnsupportedAddressTypeException uate) {            throw new XDevException(uate);          }          catch (SecurityException se) {            throw new XDevException(se);          }          catch (IOException ioe) {            // this is continuing coz process 1 alwayz connect to process 0            // server socket. If process 0 is not up, then this exception            connected = false;            if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {              logger.debug("connecting error ->" + ioe.getMessage());            }            continue;          }          try {            rChannels[index].configureBlocking(false);            rChannels[index].register(selector,                                      SelectionKey.OP_READ);            rChannels[index].socket().setTcpNoDelay(true);	    //these are useful if running MPJ on gigabit ethernet.            rChannels[index].socket().setSendBufferSize(524288);            rChannels[index].socket().setReceiveBufferSize(524288);          }          catch (Exception e) {            throw new XDevException(e);          }          synchronized (readableChannels) {            readableChannels.add(rChannels[index]);            if (readableChannels.size() == nprocs - 1) {              readableChannels.notify();            }          } //end synch          connected = true;        } //end while        connected = false;      } //end if      index++;      temp++;    } //end while    /* This is connection-code for control-channels. */    connected = false;    temp = 0;    index = 0;    /*     * This while loop is connecting to server sockets of other     * peers. If there are 4 processes, process 0 will not connect     * to any process, process 1 will connect to process 0, process     * 2 will connect to pro 0&1, and process 3 will connect to pro     * 0&1&2

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -