⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 niodevice.java

📁 MPI for java for Distributed Programming
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
     */    while (temp < nprocs - 1) {      if (rank == rankList[temp]) {        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {          logger.debug("Dont connect to itself, so contine;");        }        temp++;        continue;      }      if (rankList[temp] < rank) {        while (!connected) {          try {            wChannels[index] = SocketChannel.open();            wChannels[index].configureBlocking(true);            if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {              logger.debug("Connecting to " + nodeList[temp] +                           "@" + (pList[temp] + 1));            }          }          catch (Exception e) {            throw new XDevException(e);          }          try {            connected = wChannels[index].connect(                new InetSocketAddress(nodeList[temp], (pList[temp] + 1)));          }          catch (AlreadyConnectedException ace) {            throw new XDevException(ace);          }          catch (ConnectionPendingException cpe) {            throw new XDevException(cpe);          }          catch (ClosedChannelException cce) {            throw new XDevException(cce);          }          catch (UnresolvedAddressException uae) {            throw new XDevException(uae);          }          catch (UnsupportedAddressTypeException uate) {            throw new XDevException(uate);          }          catch (SecurityException se) {            throw new XDevException(se);          }          catch (IOException ioe) {            // this is continuing coz process 1 alwayz connect to process 0            // server socket. If process 0 is not up, then this exception            connected = false;            if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {              logger.debug("connecting error ->" + ioe.getMessage());            }            continue;          }          try {            wChannels[index].configureBlocking(true);            wChannels[index].socket().setTcpNoDelay(true);	    //these are useful if running MPJ on gigabit ethernet            wChannels[index].socket().setSendBufferSize(524288);            wChannels[index].socket().setReceiveBufferSize(524288);          }          catch (Exception e) {            throw new XDevException(e);          }          synchronized (writableChannels) {            writableChannels.add(wChannels[index]);            if (writableChannels.size() == nprocs - 1) {              writableChannels.notify();            }          } //end synch          connected = true;        } //end while        connected = false;      } //end if      index++;      temp++;    } //end while    index = rank;    root = 0;    procTree = new ProcTree();    extent = nprocs;    places = ProcTree.PROCTREE_A * index;    for (int i = 1; i <= ProcTree.PROCTREE_A; i++) {      ++places;      int ch = (ProcTree.PROCTREE_A * index) + i + root;      ch %= extent;      if (places < extent) {        procTree.child[i - 1] = ch;        procTree.numChildren++;      }    }    if (index == root) {      procTree.isRoot = true;    }    else {      procTree.isRoot = false;      int pr = (index - 1) / ProcTree.PROCTREE_A;      procTree.parent = pr;    }    procTree.root = root;    selectorThreadStarter = new Thread(selectorThread);    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("Starting the selector thread ");    }    selectorThreadStarter.start();    //addShutdownHook();    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("testing if all peers are connected?");    }    count = 0;    /* doAccept() and/or while     * loop above adds SocketChannels to writableChannels     * so access to writableChannels should be synchronized.     */    synchronized (writableChannels) {      if (writableChannels.size() != nprocs - 1) {        try {          writableChannels.wait();        }        catch (Exception e) {          throw new XDevException(e);        }      }    } //end sync.    /* This is for control-channels. */    synchronized (readableChannels) {      if (readableChannels.size() != nprocs - 1) {        try {          readableChannels.wait();        }        catch (Exception e) {          throw new XDevException(e);        }      }    } //end sync.    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info(" Yes all nodes are connected to each other ");    }    /*     * At this point, all-to-all connectivity has been acheived. Each process     * has all SocketChannels (for peers) in writableChannels (Vector object). Now     * each process will send rank(this rank is the one read from config-file),     * msb (most significant bits), lsb(least significant bits) to all the     * other processes. After receiving this info, all processes will have     * constructed worldTable (Hashtable object), which contains <k,v>, where     * k=UUID of a process, and v=SocketChannel object. This worldTable     * is used extensively throughout the niodev.     */    SocketChannel socketChannel = null;    ByteBuffer initMsgBuffer = ByteBuffer.allocate(24);    long msb = myuuid.getMostSignificantBits();    long lsb = myuuid.getLeastSignificantBits();    initMsgBuffer.putInt(INIT_MSG_HEADER_DATA_CHANNEL);    initMsgBuffer.putInt(rank);    initMsgBuffer.putLong(msb);    initMsgBuffer.putLong(lsb);    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("rank<" + rank +                   ">is sending its rank,msb,lsb, to all data channels");    }    /* Writing stuff into writable-channels */    for (int i = 0; i < writableChannels.size(); i++) {      socketChannel = writableChannels.get(i);      initMsgBuffer.flip();      /* Do we need to iterate here? */      while (initMsgBuffer.hasRemaining()) {        try {          if (socketChannel.write(initMsgBuffer) == -1) {            throw new XDevException(new ClosedChannelException());          }        }        catch (Exception e) {          throw new XDevException(e);        }      } //end while.      _wcb.clear();    } //end for.    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("rank<" + rank + "> testing if everything is received? ");    }    /* worldTable is accessed from doBarrierRead or here, so their access     * should be synchronized */    synchronized (worldReadableTable) {      if ( (worldReadableTable.size() != nprocs - 1)) {        try {          worldReadableTable.wait();        }        catch (Exception e) {          throw new XDevException(e);        }      }    } //end sync    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("worldReadableTable is filled ");    }    /* Writing stuff into readable-channels */    for (int i = 0; i < readableChannels.size(); i++) {      socketChannel = readableChannels.get(i);      initMsgBuffer.flip();      /* Do we need to iterate here? */      while (initMsgBuffer.hasRemaining()) {        try {          if (socketChannel.write(initMsgBuffer) == -1) {            throw new XDevException(new ClosedChannelException());          }        }        catch (Exception e) {          throw new XDevException(e);        }      } //end while.    } //end for.    /* Do blocking-reads, is this correct? will work but wont scale i think.     */    for (int i = 0; i < writableChannels.size(); i++) {      socketChannel = writableChannels.get(i);      try {        doBarrierRead(socketChannel, worldWritableTable, true);      }      catch (XDevException xde) {        throw xde;      }    }    synchronized (worldWritableTable) {      if ( (worldWritableTable.size() != nprocs - 1)) {        try {          worldWritableTable.wait();        }        catch (Exception e) {          throw new XDevException(e);        }      }    } //end sync    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("worldWritable is filled ");    }    //writableServerChannel.close();    //readableServerChannel.close();    pids[rank] = id;    for (int k = 0; k < writableChannels.size(); k++) {      writeLockTable.put(writableChannels.elementAt(k),                         new CustomSemaphore(1));    }    try {      writableServerChannel.close();      readableServerChannel.close();    }    catch (Exception e) {      throw new XDevException(e);    }//System.out.println(" init "+rank);    return pids;  } //end init  /**   * Returns the id of this process.   * @return ProcessID An object containing UUID of the process   */  public ProcessID id() {    return id;  }  public int getSendOverhead() {    return SEND_OVERHEAD ; 	    }    public int getRecvOverhead() {    return RECV_OVERHEAD ; 	    }  /**   * Non-Blocking probe method.   * @param srcID   * @param tag   * @param context   * @return mpjdev.Status   */  public mpjdev.Status iprobe(ProcessID srcID, int tag,                              int context) throws XDevException {    UUID dstUUID = id.uuid(), srcUUID = srcID.uuid();    mpjdev.Status status = null;    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("---iprobe---");      logger.debug("srcUUID:" + srcUUID + "tag:" + tag);      logger.debug("id.uuid():" + id.uuid());      //logger.debug("srcID.rank():" + srcID.rank());      logger.debug("ANY_SOURCE:" + ANY_SOURCE);      logger.debug("Looking whether this req has been posted or not");    }    try {      sem.acquire();    }    catch (Exception e) {      throw new XDevException(e);    }    NIORecvRequest request = arrQue.check(context, srcUUID, tag);    if (request != null) {        //now this is a tricky one ...        status = new mpjdev.Status(request.srcUUID, //srcID.rank(),                                   request.tag, -1, request.type,                                   request.numEls);    }    sem.signal();    return status;  }  /**   * Blocking probe method   * @param srcID The sourceID of the sender   * @param tag The tag of the message   * @param context The integer specifying the context   * @return mpjdev.Status The status object   */  public mpjdev.Status probe(ProcessID srcID, int tag,                             int context) throws XDevException {    mpjdev.Status status = null;    boolean comp = false;    while (!comp) {      status = this.iprobe(srcID, tag, context);      if (status != null) {        comp = true;      }    }    return status;  }  private synchronized int sendCounter() {    return++sendCounter;  }  private synchronized int recvCounter() {    return++recvCounter;  }  private synchronized int hashCode(int tag, int context, int srcHash,                                    int dstHash) {    return tag + context * 5 + dstHash * 11 + srcHash * 17;  }  /**   * Non-blocking send method   * @param buf              The mpjbuf.Buffer object containing the data.   * @param dstID            ProcessID of the destination process.   * @param tag              The unique identifier of the message.   * @param context          An integer providing "safe universe" for messages.   * @return mpjdev.Request  The Request object, which is later   *                         used to check the status of the message.   */  public mpjdev.Request isend(mpjbuf.Buffer buf, ProcessID dstID, int tag,                              int context) throws XDevException {    UUID dstUUID = dstID.uuid();        UUID srcUUID = id.uuid();    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info("---isend---<" + tag + ">");      logger.debug("sender :" + id.uuid());      logger.debug("receiver :" + dstUUID);      logger.debug("tag :" + tag);      //logger.debug("staticBufferSize :" + req.sBufSize );      //logger.debug("dynamicBufferSize :" + req.dBufSize );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -