📄 niodevice.java
字号:
/* Some error conditions */ if(mpi.MPI.DEBUG && logger.isDebugEnabled()) { if(ww > (request.sBufSize+request.bufoffset)) { logger.fatal(" Fatal-Bug (1) <" + request.tag + ">"); logger.fatal("request.staticBuffer " + request.staticBuffer); System.exit(1); } if (request.staticBuffer.hasRemaining()) { logger.fatal(" Bug (1) <" + request.tag + ">"); logger.fatal("request.staticBuffer " + request.staticBuffer); System.exit(1); } if (request.staticBuffer.position() != (request.sBufSize + request.bufoffset)) { logger.fatal(" Bug (2) <" + request.tag + ">"); logger.fatal("request.staticBuffer " + request.staticBuffer); System.exit(1); } if (request.staticBuffer.position() != request.staticBuffer.limit()) { logger.fatal(" Bug (3) <" + request.tag + ">"); logger.fatal("request.staticBuffer " + request.staticBuffer); System.exit(1); } } //error condition }//end while writing. } //end writing static section. //stop = System.nanoTime() ; //intv = stop - strt ; //strt = stop; //logger.debug("isend_packing_time_route1 <"+intv/1000); //strt = System.nanoTime() - strt ; //logger.debug("isend_writing_time <"+intv/1000); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("request.dBufSize <" + request.dBufSize + ">"); } /* Writing the dynamic section of the buffer */ if (request.dynamicBuffer != null && request.dBufSize > 0) { RawBuffer rawBuffer = BufferFactory.create(request.dBufSize) ; ByteBuffer buffer = ((NIOBuffer) rawBuffer).getBuffer() ; buffer.position(0); buffer.limit(request.dBufSize) ; buffer.put(request.dynamicBuffer, 0, request.dBufSize); buffer.flip(); ww = 0; w = 0; while (buffer.hasRemaining()) { if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("buffer (1)<" + buffer + ">"); } if ( (w = socketChannel.write(buffer)) == -1) { throw new ClosedChannelException(); } ww += w; if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("buffer (2)<" + buffer + ">"); } } //end while. if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { if (buffer.hasRemaining()) { logger.fatal(" Bug (4) <" + request.tag + ">"); logger.fatal("buffer " + buffer); System.exit(1); } if (buffer.position() != (request.dBufSize)) { logger.fatal("Bug (5) <" + request.tag + ">"); logger.fatal("buffer " + buffer); System.exit(1); } if (buffer.position() != buffer.limit()) { logger.fatal("Bug (6) <" + request.tag + ">"); logger.fatal("buffer " + buffer); System.exit(1); } } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("written down bytes " + buffer.position()); } BufferFactory.destroy(rawBuffer); } //end writing dynamic section if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("--eagerSend finishes--"); } } //end eagerSend public mpjdev.Request peek() throws XDevException { return completedList.remove() ; } static CompletedList completedList = new CompletedList() ; static class CompletedList { NIORequest front, back ; int size ; /** * Remove request from any position in the completedList */ synchronized void remove(NIORequest request) { if(request.inCompletedList) { if(front == back) { front = null; back = null; } else if(front == request) { front.prevCompleted.nextCompleted= front.nextCompleted; front.nextCompleted.prevCompleted = front.prevCompleted; front = front.prevCompleted; } else if(back == request) { back.prevCompleted.nextCompleted = back.nextCompleted; back.nextCompleted.prevCompleted = back.prevCompleted; back = back.nextCompleted; } else { request.prevCompleted.nextCompleted = request.nextCompleted; request.nextCompleted.prevCompleted = request.prevCompleted; } request.inCompletedList = false; size-- ; System.out.println(" size "+size); } } /** * Remove request from the front of completedList * Wait until a request is found */ synchronized NIORequest remove() { while(listEmpty()) { try { wait(); }catch(Exception e) {} } NIORequest oldFront = null ; oldFront = front ; if(front == back) { front = null ; back = null ; } else { front.prevCompleted.nextCompleted = front.nextCompleted ; front.nextCompleted.prevCompleted = front.prevCompleted ; front = front.prevCompleted ; } oldFront.inCompletedList = false ; size -- ; System.out.println(" size "+size); return oldFront ; } /** * Add request at the front of completedList */ synchronized void add(NIORequest request) { if(listEmpty()) { front = request; back = request; request.nextCompleted = request; request.prevCompleted = request; } else { front.nextCompleted.prevCompleted = request; request.nextCompleted = front.nextCompleted ; front.nextCompleted = request ; request.prevCompleted = front ; back = request ; } size++ ; System.out.println(" size "+size); request.inCompletedList = true ; notify(); } boolean listEmpty() { return(front==null && back==null); } } /** * iwaitany public static mpjdev.Status iwaitany(mpjdev.Request[] requests) { boolean found = false; boolean inActive = true ; mpjdev.Status completedStatus = null ; // check if there is a valid request which could be peeked for(int i=0 ; i< requests.length ; i++) { if(requests[i] != null) { inActive = false; } } if(inActive) { return null; } do { for(int j=0 ; j <requests.length ; j++) { if(requests[j] == null) { continue; } completedStatus = requests[j].itest() ; if(completedStatus == null) { continue; } completedStatus = requests[j].iwait() ; completedStatus.index = j; found = true ; break ; } } while(!found); return completedStatus ; } */ /** * This method is the non-blocking recv method. * @param buf The mpjbuf.Buffer object where the user wishes to receive * the actual message * @param srcID The process id of the sending process * @param tag The unique identifier of the message * @return Status The status object containing the details of recv * @throws MPJException If the buffer is null or the src is insane * @throws IOException If some I/O error occurs * @throws java.lang.IllegalArgumentException */ public mpjdev.Status recv(mpjbuf.Buffer buf, ProcessID srcID, int tag, int context) throws XDevException { //System.out.println("recv calling "+tag); mpjdev.Status status = new mpjdev.Status(srcID.uuid(), tag, -1); Request request = irecv(buf, srcID, tag, context, status); //System.out.println("recv called "+tag); return request.iwait(); } /** * Blocking receive method. * @param buf The mpjbuf.Buffer objereceive the actual message * @param srcID The process id of the sending process * @param tag The unique identifier of the message * @param context An integer that provides "safe communication" universe * @return mpjdev.Status The status object containing the details of recv */ public Request irecv(mpjbuf.Buffer buf, ProcessID srcID, int tag, int context, mpjdev.Status status) throws XDevException { UUID dstUUID = id.uuid(); UUID srcUUID = srcID.uuid(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("---irecv---<" + tag + ">"); logger.debug("Looking whether this req has been posted or not"); } //long strt = System.nanoTime() ; //long stop = 0L, intv = 0L; try { sem.acquire(); }catch(Exception e){ throw new XDevException(e); } NIORecvRequest request = null; try { request = arrQue.rem(context, srcUUID, tag); } catch (Exception e) { throw new XDevException(e); } if (request != null) { //stop = System.nanoTime() ; //intv = stop - strt ; //strt = stop; //logger.debug("irecv_determing_its_posted <"+intv/1000); /* * some stuff is only known when the recv is posted ...so setting * that kinda stuff in the next few lines */ request.staticBuffer = ( (NIOBuffer) buf.getStaticBuffer()).getBuffer(); request.status = status; //request.rank_source = srcID.rank(); if (request.srcUUID.equals(dstUUID)) { sem.signal(); if(mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug(" request.sendRequest.staticBuffer "+ request.sendRequest.staticBuffer) ; logger.debug(" request.sendRequest.bufoffset "+ request.sendRequest.bufoffset) ; logger.debug(" request.sBufSize "+request.sBufSize ); } buf.copy(request.sendRequest.staticBuffer, request.sendRequest.bufoffset, request.sBufSize, 0, request.sendRequest.dynamicBuffer, request.dBufSize); //completedList.add(request); //completedList.add(request.sendRequest); request.setCompleted(true); request.sendRequest.setCompleted(true); return request; } else if ( ( (request.sBufSize + request.dBufSize) <= psl) && request.commMode == STD_COMM_MODE) { /* * (Eager-Send), * The message has already been copied to xdev buffad so we * just need to copy it from the xdevBuffer to the user buffer */ if (request.sBufSize > 0) { ByteBuffer eagerBuffer = ((NIOBuffer)request.eagerBuffer).getBuffer(); request.staticBuffer.position(0); request.staticBuffer.limit(request.sBufSize); eagerBuffer.limit(request.sBufSize) ; eagerBuffer.position(0) ; request.staticBuffer.put(eagerBuffer); BufferFactory.destroy( request.eagerBuffer) ; } //end if if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("setting the buf size " + request.sBufSize); } buf.setSize(request.sBufSize); if (request.dBufSize > 0) { buf.setDynamicBuffer(request.dynamicBuffer); } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("removed "); } //completedList.add( request ); request.notifyMe(); sem.signal(); //stop = System.nanoTime() ; //intv = stop - strt ; //strt = stop; //logger.debug("irecv_copying_from_devM <"+intv/1000); return request; } //end eager-send looop. else if ( ( ( (request.sBufSize + request.dBufSize) > psl) && request.commMode == STD_COMM_MODE) || (request.commMode == SYNC_COMM_MODE)) { /* * (Rendezous), writing the ctrl msg back, because * (1) we have received the control message from the sender, * (2)a matching receive is also posted */ request.buffer = buf; SocketChannel tc = worldReadableTable.get(request.srcUUID); SocketChannel c = worldWritableTable.get(request.srcUUID); recvMap.put(new Integer(request.recvCounter),request); sem.signal(); CustomSemaphore wLock = writeLockTable.get(c); try { wLock.acquire(); }catch(Exception e){} rendezCtrlMsgR2S(c, request); wLock.signal(); return request; } } /* * There is no matchin
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -