📄 niodevice.java
字号:
//logger.debug("req.sendCounter :" + req.sendCounter ); } if (dstUUID.equals(srcUUID)) { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("sender and receiver are same process "); } try { sem.acquire(); }catch(Exception e){ e.printStackTrace() ; } NIORecvRequest recvRequest = recvQueue.rem(context, srcUUID, tag); NIOSendRequest sendRequest = new NIOSendRequest(tag, //NO_ACK_RECEIVED, id(), dstID, buf, context, STD_COMM_MODE, -1); if (recvRequest != null) { sem.signal(); recvRequest.type = sendRequest.type; recvRequest.numEls = sendRequest.numEls; recvRequest.buffer.setSize(sendRequest.sBufSize); recvRequest.sBufSize = sendRequest.sBufSize; recvRequest.dBufSize = sendRequest.dBufSize; /* copy the dynamic portion */ recvRequest.buffer.setDynamicBuffer(sendRequest.dynamicBuffer); /* copy the static portion */ recvRequest.staticBuffer.limit(recvRequest.sBufSize); recvRequest.staticBuffer.position(0); sendRequest.staticBuffer.limit(recvRequest.sBufSize + sendRequest.bufoffset ); sendRequest.staticBuffer.position(sendRequest.bufoffset); recvRequest.staticBuffer.put(sendRequest.staticBuffer); recvRequest.staticBuffer.flip(); /* comms complete */ //completedList.add(sendRequest); //completedList.add(recvRequest); recvRequest.setCompleted(true); sendRequest.setCompleted(true); return sendRequest; } else { recvRequest = new NIORecvRequest(id.uuid(), tag, false, context, sendRequest.sBufSize, sendRequest.dBufSize, sendRequest.commMode, null // (socketChannel) , sendRequest.numEls, sendRequest.type, -1, -1, srcUUID); recvRequest.sendRequest = sendRequest; arrQue.add(recvRequest); sem.signal(); return sendRequest; } } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("isend with remote process connections"); } NIOSendRequest req = new NIOSendRequest(tag, id(), dstID, buf, context, STD_COMM_MODE, sendCounter()); SocketChannel channel = worldWritableTable.get(dstUUID); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("channel :" + channel); } if ( (req.sBufSize + req.dBufSize) <= psl) { /* Eager-Send Procotol */ if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("get writeLock for this channel"); } CustomSemaphore wLock = writeLockTable.get(channel); try { wLock.acquire(); eagerSend(req, channel); wLock.signal(); //completedList.add( req ); req.notifyMe(); } catch (Exception e) { throw new XDevException(e); } } /* Rendezvous Protocol */ else if ( (req.sBufSize + req.dBufSize) > psl) { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("rendezvous protocol."); logger.debug(" get send-comms set lock "); } try { sLock.acquire(); }catch(Exception e){ } sendMap.put(new Integer(req.sendCounter), req); sLock.signal(); CustomSemaphore wLock = writeLockTable.get(channel); try { wLock.acquire(); rendezCtrlMsgSend(req, channel); wLock.signal(); } catch (Exception e) { throw new XDevException(e); } } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("---isend ends---<" + tag + ">"); } return req; } //end isend. /** * Non-blocking synchronous send. */ public mpjdev.Request issend(mpjbuf.Buffer buf, ProcessID dstID, int tag, int context) throws XDevException { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("---issend---"); } NIOSendRequest req = null; SocketChannel channel = null; UUID dstUUID = dstID.uuid(); UUID srcUUID = id.uuid(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("---isend---<" + tag + ">"); logger.debug("sender :" + id.uuid()); logger.debug("receiver :" + dstUUID); logger.debug("tag :" + tag); //logger.debug("staticBufferSize :" + req.sBufSize ); //logger.debug("dynamicBufferSize :" + req.dBufSize ); //logger.debug("req.sendCounter :" + req.sendCounter ); } if (dstUUID.equals(srcUUID)) { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("sender and receiver are same process "); } try { sem.acquire(); }catch(Exception e){} NIORecvRequest recvRequest = recvQueue.rem(context, srcUUID, tag); NIOSendRequest sendRequest = new NIOSendRequest(tag, id(), dstID, buf, context, SYNC_COMM_MODE, -1); if (recvRequest != null) { sem.signal(); recvRequest.type = sendRequest.type; recvRequest.numEls = sendRequest.numEls; recvRequest.buffer.setSize(sendRequest.sBufSize); recvRequest.sBufSize = sendRequest.sBufSize; recvRequest.dBufSize = sendRequest.dBufSize; /* copy the dynamic portion */ recvRequest.buffer.setDynamicBuffer(sendRequest.dynamicBuffer); /* copy the static portion */ recvRequest.staticBuffer.limit(recvRequest.sBufSize); recvRequest.staticBuffer.position(0); sendRequest.staticBuffer.limit(recvRequest.sBufSize + sendRequest.bufoffset ); sendRequest.staticBuffer.position(sendRequest.bufoffset); recvRequest.staticBuffer.put(sendRequest.staticBuffer); recvRequest.staticBuffer.flip(); /* comms complete */ //completedList.add(sendRequest); //completedList.add(recvRequest); recvRequest.setCompleted(true); sendRequest.setCompleted(true); return sendRequest; } else { recvRequest = new NIORecvRequest(id.uuid(), tag, false, context, sendRequest.sBufSize, sendRequest.dBufSize, sendRequest.commMode, null // (socketChannel) , sendRequest.numEls, sendRequest.type, -1, -1, srcUUID); recvRequest.sendRequest = sendRequest; arrQue.add(recvRequest); sem.signal(); return sendRequest; } } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("isend with remote process connections"); } req = new NIOSendRequest(tag, id(), dstID, buf, context, SYNC_COMM_MODE, sendCounter()); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("sender :" + id.uuid()); logger.debug("receiver :" + dstUUID); logger.debug("tag :" + tag); logger.debug("staticBufferSize :" + req.sBufSize); logger.debug("dynamicBufferSize :" + req.dBufSize); logger.debug("buffset :" + 0); logger.debug("Rendezous(isend), calling rendezCtrlMsgSend"); } channel = worldWritableTable.get(dstUUID); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("channel (can never be null) " + channel); } try { sLock.acquire(); }catch(Exception e){} sendMap.put(new Integer(req.sendCounter), req); sLock.signal(); CustomSemaphore wLock = writeLockTable.get(channel); try { wLock.acquire(); rendezCtrlMsgSend(req, channel); wLock.signal(); } catch (Exception e) { throw new XDevException(e); } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("---issend ends---<" + tag + ">"); } return req; } //end issend /** * Blocking send method. * @param buf The mpjbuf.Buffer object containing the data. * @param dstID ProcessID of the destination * @param tag The unique identifier of the message * @param context An integer providing "safe universe" for messages. * @throws MPJException If the buffer is null, dest process ID is insane. * @throws java.nio.BufferOverflowException * @throws ReadOnlyBufferException * @throws IOException If some I/O error occurs */ public void send(mpjbuf.Buffer buf, ProcessID dstID, int tag, int context) throws XDevException { Request request = isend(buf, dstID, tag, context); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Calling request.iwait() in send method, it may not return"); } request.iwait(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Called request.iwait() in sng this, means it returned"); } } /** * Blocking synchronous send */ public void ssend(mpjbuf.Buffer buf, ProcessID dstID, int tag, int context) throws XDevException { Request request = issend(buf, dstID, tag, context); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Calling request.iwait() in send method, it may not return"); } request.iwait(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Called request.iwait()eeing this, means it returned"); } } /** * This method is used by the sender to send the control message to * the receiver */ private void rendezCtrlMsgSend(NIOSendRequest request, SocketChannel socketChannel) throws Exception { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("---rendezCtrlMsgSend---"); logger.debug(" request.tag " + request.tag); } request.staticBuffer.limit( request.bufoffset ) ; request.staticBuffer.position( 0 ); request.staticBuffer.putInt(READY_TO_SEND); request.staticBuffer.putLong(id().uuid().getMostSignificantBits()); request.staticBuffer.putLong(id().uuid().getLeastSignificantBits()); request.staticBuffer.putInt(request.tag); request.staticBuffer.putInt(request.sBufSize); request.staticBuffer.putInt(request.dBufSize); request.staticBuffer.putInt(request.commMode); request.staticBuffer.putInt(request.context); request.staticBuffer.putInt(request.numEls); request.staticBuffer.putInt(request.sendCounter); request.staticBuffer.put( (byte) request.type.getCode()); request.staticBuffer.limit( request.bufoffset) ; request.staticBuffer.position( 0 ); int w = 0; int ww = 0; while (request.staticBuffer.hasRemaining()) { if ( (w = socketChannel.write(request.staticBuffer)) == -1) { throw new ClosedChannelException(); } ww += w; } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("---rendezCtrlMsgSend ENDS ---"); } } //end rendezCtrlMsgSend private void eagerSend(NIOSendRequest request, SocketChannel socketChannel) throws Exception { //long strt = System.nanoTime() ; //long stop = 0L, intv = 0L; int w = 0, ww = 0; if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("---eagerSend---"); logger.debug(" request.bufoffset "+request.bufoffset); } request.staticBuffer.limit( request.bufoffset ) ; request.staticBuffer.position( 0 ); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("sendBuffer "+request.staticBuffer) ; } request.staticBuffer.putInt(READY_TO_SEND); request.staticBuffer.putLong(id().uuid().getMostSignificantBits()); request.staticBuffer.putLong(id().uuid().getLeastSignificantBits()); request.staticBuffer.putInt(request.tag); request.staticBuffer.putInt(request.sBufSize); request.staticBuffer.putInt(request.dBufSize); request.staticBuffer.putInt(request.commMode); request.staticBuffer.putInt(request.context); request.staticBuffer.putInt(request.numEls); request.staticBuffer.putInt(request.sendCounter); request.staticBuffer.put( (byte) request.type.getCode() ); //stop = System.nanoTime() ; //intv = stop - strt ; //strt = stop; //logger.debug("isend_packing_time_route1 <"+intv/1000); /* Writing the static section of the buffer */ if (request.sBufSize > 0) { request.staticBuffer.limit(request.sBufSize + request.bufoffset ); request.staticBuffer.position(0); w = 0; ww = 0; while (request.staticBuffer.hasRemaining()) { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("request.staticBuffer (1)<" + request.staticBuffer + "> w=" + w); } if ( (w = socketChannel.write(request.staticBuffer)) == -1) { throw new ClosedChannelException(); } ww += w ; if(mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("request.staticBuffer (2)<"+request.staticBuffer + "> w=" + w); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -