📄 niodevice.java
字号:
*/ while (temp < nprocs - 1) { if (rank == rankList[temp]) { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Dont connect to itself, so contine;"); } temp++; continue; } if (rankList[temp] < rank) { while (!connected) { try { wChannels[index] = SocketChannel.open(); wChannels[index].configureBlocking(true); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Connecting to " + nodeList[temp] + "@" + (pList[temp] + 1)); } } catch (Exception e) { throw new XDevException(e); } try { connected = wChannels[index].connect( new InetSocketAddress(nodeList[temp], (pList[temp] + 1))); } catch (AlreadyConnectedException ace) { throw new XDevException(ace); } catch (ConnectionPendingException cpe) { throw new XDevException(cpe); } catch (ClosedChannelException cce) { throw new XDevException(cce); } catch (UnresolvedAddressException uae) { throw new XDevException(uae); } catch (UnsupportedAddressTypeException uate) { throw new XDevException(uate); } catch (SecurityException se) { throw new XDevException(se); } catch (IOException ioe) { // this is continuing coz process 1 alwayz connect to process 0 // server socket. If process 0 is not up, then this exception connected = false; if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("connecting error ->" + ioe.getMessage()); } continue; } try { wChannels[index].configureBlocking(true); wChannels[index].socket().setTcpNoDelay(true); //these are useful if running MPJ on gigabit ethernet wChannels[index].socket().setSendBufferSize(524288); wChannels[index].socket().setReceiveBufferSize(524288); } catch (Exception e) { throw new XDevException(e); } synchronized (writableChannels) { writableChannels.add(wChannels[index]); if (writableChannels.size() == nprocs - 1) { writableChannels.notify(); } } //end synch connected = true; } //end while connected = false; } //end if index++; temp++; } //end while index = rank; root = 0; procTree = new ProcTree(); extent = nprocs; places = ProcTree.PROCTREE_A * index; for (int i = 1; i <= ProcTree.PROCTREE_A; i++) { ++places; int ch = (ProcTree.PROCTREE_A * index) + i + root; ch %= extent; if (places < extent) { procTree.child[i - 1] = ch; procTree.numChildren++; } } if (index == root) { procTree.isRoot = true; } else { procTree.isRoot = false; int pr = (index - 1) / ProcTree.PROCTREE_A; procTree.parent = pr; } procTree.root = root; selectorThreadStarter = new Thread(selectorThread); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Starting the selector thread "); } selectorThreadStarter.start(); //addShutdownHook(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("testing if all peers are connected?"); } count = 0; /* doAccept() and/or while * loop above adds SocketChannels to writableChannels * so access to writableChannels should be synchronized. */ synchronized (writableChannels) { if (writableChannels.size() != nprocs - 1) { try { writableChannels.wait(); } catch (Exception e) { throw new XDevException(e); } } } //end sync. /* This is for control-channels. */ synchronized (readableChannels) { if (readableChannels.size() != nprocs - 1) { try { readableChannels.wait(); } catch (Exception e) { throw new XDevException(e); } } } //end sync. if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info(" Yes all nodes are connected to each other "); } /* * At this point, all-to-all connectivity has been acheived. Each process * has all SocketChannels (for peers) in writableChannels (Vector object). Now * each process will send rank(this rank is the one read from config-file), * msb (most significant bits), lsb(least significant bits) to all the * other processes. After receiving this info, all processes will have * constructed worldTable (Hashtable object), which contains <k,v>, where * k=UUID of a process, and v=SocketChannel object. This worldTable * is used extensively throughout the niodev. */ SocketChannel socketChannel = null; ByteBuffer initMsgBuffer = ByteBuffer.allocate(24); long msb = myuuid.getMostSignificantBits(); long lsb = myuuid.getLeastSignificantBits(); initMsgBuffer.putInt(INIT_MSG_HEADER_DATA_CHANNEL); initMsgBuffer.putInt(rank); initMsgBuffer.putLong(msb); initMsgBuffer.putLong(lsb); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("rank<" + rank + ">is sending its rank,msb,lsb, to all data channels"); } /* Writing stuff into writable-channels */ for (int i = 0; i < writableChannels.size(); i++) { socketChannel = writableChannels.get(i); initMsgBuffer.flip(); /* Do we need to iterate here? */ while (initMsgBuffer.hasRemaining()) { try { if (socketChannel.write(initMsgBuffer) == -1) { throw new XDevException(new ClosedChannelException()); } } catch (Exception e) { throw new XDevException(e); } } //end while. _wcb.clear(); } //end for. if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("rank<" + rank + "> testing if everything is received? "); } /* worldTable is accessed from doBarrierRead or here, so their access * should be synchronized */ synchronized (worldReadableTable) { if ( (worldReadableTable.size() != nprocs - 1)) { try { worldReadableTable.wait(); } catch (Exception e) { throw new XDevException(e); } } } //end sync if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("worldReadableTable is filled "); } /* Writing stuff into readable-channels */ for (int i = 0; i < readableChannels.size(); i++) { socketChannel = readableChannels.get(i); initMsgBuffer.flip(); /* Do we need to iterate here? */ while (initMsgBuffer.hasRemaining()) { try { if (socketChannel.write(initMsgBuffer) == -1) { throw new XDevException(new ClosedChannelException()); } } catch (Exception e) { throw new XDevException(e); } } //end while. } //end for. /* Do blocking-reads, is this correct? will work but wont scale i think. */ for (int i = 0; i < writableChannels.size(); i++) { socketChannel = writableChannels.get(i); try { doBarrierRead(socketChannel, worldWritableTable, true); } catch (XDevException xde) { throw xde; } } synchronized (worldWritableTable) { if ( (worldWritableTable.size() != nprocs - 1)) { try { worldWritableTable.wait(); } catch (Exception e) { throw new XDevException(e); } } } //end sync if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("worldWritable is filled "); } //writableServerChannel.close(); //readableServerChannel.close(); pids[rank] = id; for (int k = 0; k < writableChannels.size(); k++) { writeLockTable.put(writableChannels.elementAt(k), new CustomSemaphore(1)); } try { writableServerChannel.close(); readableServerChannel.close(); } catch (Exception e) { throw new XDevException(e); }//System.out.println(" init "+rank); return pids; } //end init /** * Returns the id of this process. * @return ProcessID An object containing UUID of the process */ public ProcessID id() { return id; } public int getSendOverhead() { return SEND_OVERHEAD ; } public int getRecvOverhead() { return RECV_OVERHEAD ; } /** * Non-Blocking probe method. * @param srcID * @param tag * @param context * @return mpjdev.Status */ public mpjdev.Status iprobe(ProcessID srcID, int tag, int context) throws XDevException { UUID dstUUID = id.uuid(), srcUUID = srcID.uuid(); mpjdev.Status status = null; if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("---iprobe---"); logger.debug("srcUUID:" + srcUUID + "tag:" + tag); logger.debug("id.uuid():" + id.uuid()); //logger.debug("srcID.rank():" + srcID.rank()); logger.debug("ANY_SOURCE:" + ANY_SOURCE); logger.debug("Looking whether this req has been posted or not"); } try { sem.acquire(); } catch (Exception e) { throw new XDevException(e); } NIORecvRequest request = arrQue.check(context, srcUUID, tag); if (request != null) { //now this is a tricky one ... status = new mpjdev.Status(request.srcUUID, //srcID.rank(), request.tag, -1, request.type, request.numEls); } sem.signal(); return status; } /** * Blocking probe method * @param srcID The sourceID of the sender * @param tag The tag of the message * @param context The integer specifying the context * @return mpjdev.Status The status object */ public mpjdev.Status probe(ProcessID srcID, int tag, int context) throws XDevException { mpjdev.Status status = null; boolean comp = false; while (!comp) { status = this.iprobe(srcID, tag, context); if (status != null) { comp = true; } } return status; } private synchronized int sendCounter() { return++sendCounter; } private synchronized int recvCounter() { return++recvCounter; } private synchronized int hashCode(int tag, int context, int srcHash, int dstHash) { return tag + context * 5 + dstHash * 11 + srcHash * 17; } /** * Non-blocking send method * @param buf The mpjbuf.Buffer object containing the data. * @param dstID ProcessID of the destination process. * @param tag The unique identifier of the message. * @param context An integer providing "safe universe" for messages. * @return mpjdev.Request The Request object, which is later * used to check the status of the message. */ public mpjdev.Request isend(mpjbuf.Buffer buf, ProcessID dstID, int tag, int context) throws XDevException { UUID dstUUID = dstID.uuid(); UUID srcUUID = id.uuid(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("---isend---<" + tag + ">"); logger.debug("sender :" + id.uuid()); logger.debug("receiver :" + dstUUID); logger.debug("tag :" + tag); //logger.debug("staticBufferSize :" + req.sBufSize ); //logger.debug("dynamicBufferSize :" + req.dBufSize );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -