⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 niodevice.java

📁 MPI for java for Distributed Programming
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        /* Some error conditions */        if(mpi.MPI.DEBUG && logger.isDebugEnabled()) {	  if(ww > (request.sBufSize+request.bufoffset)) {            logger.fatal(" Fatal-Bug (1) <" + request.tag + ">");	    logger.fatal("request.staticBuffer " + request.staticBuffer);	    System.exit(1);          }	  if (request.staticBuffer.hasRemaining()) {            logger.fatal(" Bug (1) <" + request.tag + ">");	    logger.fatal("request.staticBuffer " + request.staticBuffer);	    System.exit(1);	  }          if (request.staticBuffer.position() !=	                 (request.sBufSize + request.bufoffset)) {	    logger.fatal(" Bug (2) <" + request.tag + ">");	    logger.fatal("request.staticBuffer " + request.staticBuffer);	    System.exit(1);	  }          if (request.staticBuffer.position() !=                           request.staticBuffer.limit()) {            logger.fatal(" Bug (3) <" + request.tag + ">");            logger.fatal("request.staticBuffer " + request.staticBuffer);            System.exit(1);           }        } //error condition       }//end while writing.    } //end writing static section.    //stop = System.nanoTime() ;     //intv = stop - strt ;    //strt = stop;    //logger.debug("isend_packing_time_route1 <"+intv/1000);    //strt = System.nanoTime() - strt ;    //logger.debug("isend_writing_time <"+intv/1000);    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("request.dBufSize <" + request.dBufSize + ">");    }    /* Writing the dynamic section of the buffer */    if (request.dynamicBuffer != null && request.dBufSize > 0) {      RawBuffer rawBuffer = BufferFactory.create(request.dBufSize) ;       ByteBuffer buffer = ((NIOBuffer) rawBuffer).getBuffer() ;      buffer.position(0);      buffer.limit(request.dBufSize) ;            buffer.put(request.dynamicBuffer, 0, request.dBufSize);      buffer.flip();      ww = 0;      w = 0;      while (buffer.hasRemaining()) {        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {          logger.debug("buffer (1)<" + buffer + ">");        }        if ( (w = socketChannel.write(buffer)) == -1) {          throw new ClosedChannelException();        }        ww += w;        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {          logger.debug("buffer (2)<" + buffer + ">");        }      } //end while.      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        if (buffer.hasRemaining()) {          logger.fatal(" Bug (4) <" + request.tag + ">");          logger.fatal("buffer " + buffer);          System.exit(1);        }        if (buffer.position() != (request.dBufSize)) {          logger.fatal("Bug (5) <" + request.tag + ">");          logger.fatal("buffer " + buffer);          System.exit(1);        }        if (buffer.position() != buffer.limit()) {          logger.fatal("Bug (6) <" + request.tag + ">");          logger.fatal("buffer " + buffer);          System.exit(1);        }      }      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.debug("written down bytes " + buffer.position());      }      BufferFactory.destroy(rawBuffer);    } //end writing dynamic section    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("--eagerSend finishes--");    }  } //end eagerSend    public mpjdev.Request peek() throws XDevException {     return completedList.remove() ;   }  static CompletedList completedList = new CompletedList() ;   static class CompletedList { 	      NIORequest front, back ; 	     int size ;     /**      * Remove request from any position in the completedList     */    synchronized void remove(NIORequest request) {       if(request.inCompletedList) {         if(front == back) {          front = null;          back = null;        } else if(front == request) {          front.prevCompleted.nextCompleted= front.nextCompleted;          front.nextCompleted.prevCompleted = front.prevCompleted;          front = front.prevCompleted;        } else if(back == request) {          back.prevCompleted.nextCompleted = back.nextCompleted;          back.nextCompleted.prevCompleted = back.prevCompleted;          back = back.nextCompleted;        } else {          request.prevCompleted.nextCompleted = request.nextCompleted;          request.nextCompleted.prevCompleted = request.prevCompleted;        }	request.inCompletedList = false;  	size-- ; 	System.out.println(" size "+size);       }    }    /**      * Remove request from the front of completedList     * Wait until a request is found     */    synchronized NIORequest remove() {       while(listEmpty()) {         try {	                 wait();  	}catch(Exception e) {}       }      NIORequest oldFront = null ;      oldFront = front ;       if(front == back) {         front = null ;	back  = null ;       }      else {        front.prevCompleted.nextCompleted = front.nextCompleted ;         front.nextCompleted.prevCompleted = front.prevCompleted ;	front = front.prevCompleted ;       }            oldFront.inCompletedList = false ;       size -- ;	System.out.println(" size "+size);       return oldFront ;      }    /**     * Add request at the front of completedList      */    synchronized void add(NIORequest request) {       if(listEmpty()) {        front = request;	back = request;	request.nextCompleted = request;	request.prevCompleted = request;      }      else {	front.nextCompleted.prevCompleted = request;	request.nextCompleted = front.nextCompleted ; 	front.nextCompleted = request ; 	request.prevCompleted = front ;         back = request ;       }      size++ ;	System.out.println(" size "+size);       request.inCompletedList = true ;       notify();     }    boolean listEmpty() {       return(front==null && back==null); 	        }  }		    /**    * iwaitany   public static mpjdev.Status iwaitany(mpjdev.Request[] requests) {     boolean found = false;    boolean inActive = true ;    mpjdev.Status completedStatus = null ;     // check if there is a valid request which could be peeked    for(int i=0 ; i< requests.length ; i++) {      if(requests[i] != null) {        inActive = false;      }    }    if(inActive) {      return null;    }    do {       for(int j=0 ; j <requests.length ; j++) {         if(requests[j] == null) {  	  continue; 		}	        completedStatus = requests[j].itest() ; 	        if(completedStatus == null) { 	  continue; 		}	        completedStatus = requests[j].iwait() ; 	completedStatus.index = j;         found = true ;            break ; 	      }     } while(!found);    return completedStatus ;   }  */  /**   * This method is the non-blocking recv method.   * @param buf The mpjbuf.Buffer object where the user wishes to receive   *            the actual message   * @param srcID The process id of the sending process   * @param tag The unique identifier of the message   * @return Status The status object containing the details of recv   * @throws MPJException If the buffer is null or the src is insane   * @throws IOException If some I/O error occurs   * @throws java.lang.IllegalArgumentException   */  public mpjdev.Status recv(mpjbuf.Buffer buf, ProcessID srcID,                            int tag, int context) throws XDevException {    //System.out.println("recv calling "+tag);    mpjdev.Status status = new mpjdev.Status(srcID.uuid(), tag, -1);    Request request = irecv(buf, srcID, tag, context, status);    //System.out.println("recv called "+tag);    return request.iwait();  }  /**   * Blocking receive method.   * @param buf The mpjbuf.Buffer objereceive the actual message   * @param srcID The process id  of the sending process   * @param tag The unique identifier of the message   * @param context An integer that provides "safe communication" universe   * @return mpjdev.Status The status object containing the details of recv   */  public Request irecv(mpjbuf.Buffer buf, ProcessID srcID, int tag,                       int context, mpjdev.Status status) throws XDevException {    UUID dstUUID = id.uuid();    UUID srcUUID = srcID.uuid();     if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info("---irecv---<" + tag + ">");      logger.debug("Looking whether this req has been posted or not");    }    //long strt = System.nanoTime()  ;    //long stop = 0L, intv = 0L;    try {      sem.acquire();    }catch(Exception e){      throw new XDevException(e); 	        }    NIORecvRequest request = null;    try {      request = arrQue.rem(context, srcUUID, tag);    }    catch (Exception e) {      throw new XDevException(e);     }    if (request != null) {      //stop = System.nanoTime() ;       //intv = stop - strt ;      //strt = stop;      //logger.debug("irecv_determing_its_posted <"+intv/1000);            /*       * some stuff is only known when the recv is posted ...so setting       * that kinda stuff in the next few lines       */      request.staticBuffer = ( (NIOBuffer) buf.getStaticBuffer()).getBuffer();      request.status = status;      //request.rank_source = srcID.rank();      if (request.srcUUID.equals(dstUUID)) {        sem.signal();        if(mpi.MPI.DEBUG && logger.isDebugEnabled()) { 	  logger.debug(" request.sendRequest.staticBuffer "+  		     	request.sendRequest.staticBuffer) ;  	  logger.debug(" request.sendRequest.bufoffset "+			request.sendRequest.bufoffset) ;	  logger.debug(" request.sBufSize "+request.sBufSize ); 	}        buf.copy(request.sendRequest.staticBuffer, 			request.sendRequest.bufoffset, request.sBufSize,                 0, request.sendRequest.dynamicBuffer,                 request.dBufSize);        //completedList.add(request);         //completedList.add(request.sendRequest);         request.setCompleted(true);        request.sendRequest.setCompleted(true);        return request;      }      else if ( ( (request.sBufSize + request.dBufSize) <= psl)               && request.commMode == STD_COMM_MODE) {        /*         * (Eager-Send),         * The message has already been copied to xdev buffad so we         * just need to copy it from the xdevBuffer to the user buffer         */        if (request.sBufSize > 0) {          ByteBuffer eagerBuffer = 		  ((NIOBuffer)request.eagerBuffer).getBuffer();		          request.staticBuffer.position(0);          request.staticBuffer.limit(request.sBufSize);	  eagerBuffer.limit(request.sBufSize) ;	  eagerBuffer.position(0) ;          request.staticBuffer.put(eagerBuffer);	  BufferFactory.destroy( request.eagerBuffer) ;        } //end if        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {          logger.debug("setting the buf size " + request.sBufSize);        }        buf.setSize(request.sBufSize);        if (request.dBufSize > 0) {          buf.setDynamicBuffer(request.dynamicBuffer);        }        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {          logger.debug("removed ");        }        //completedList.add( request );         request.notifyMe();        sem.signal();    //stop = System.nanoTime() ;     //intv = stop - strt ;    //strt = stop;        //logger.debug("irecv_copying_from_devM <"+intv/1000);        return request;      } //end eager-send looop.      else if ( ( ( (request.sBufSize + request.dBufSize) > psl) &&                 request.commMode == STD_COMM_MODE)               || (request.commMode == SYNC_COMM_MODE)) {        /*         * (Rendezous), writing the ctrl msg back, because         * (1) we have received the control message from the sender,         * (2)a matching receive is also posted         */        request.buffer = buf;        SocketChannel tc = worldReadableTable.get(request.srcUUID);        SocketChannel c = worldWritableTable.get(request.srcUUID);        recvMap.put(new Integer(request.recvCounter),request);	sem.signal();        CustomSemaphore wLock = writeLockTable.get(c);	try {          wLock.acquire();	}catch(Exception e){}	        rendezCtrlMsgR2S(c, request);        wLock.signal();        return request;      }    }    /*     * There is no matchin

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -