⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 niodevice.java

📁 MPI for java for Distributed Programming
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      //logger.debug("req.sendCounter :" + req.sendCounter );    }    if (dstUUID.equals(srcUUID)) {      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.info("sender and receiver are same process ");      }      try {        sem.acquire();      }catch(Exception e){        e.printStackTrace() ;	            }      NIORecvRequest recvRequest = recvQueue.rem(context, srcUUID, tag);      NIOSendRequest sendRequest = new NIOSendRequest(tag, //NO_ACK_RECEIVED,          id(), dstID, buf, context, STD_COMM_MODE, -1);      if (recvRequest != null) {        sem.signal();        recvRequest.type = sendRequest.type;        recvRequest.numEls = sendRequest.numEls;        recvRequest.buffer.setSize(sendRequest.sBufSize);        recvRequest.sBufSize = sendRequest.sBufSize;        recvRequest.dBufSize = sendRequest.dBufSize;        /* copy the dynamic portion */        recvRequest.buffer.setDynamicBuffer(sendRequest.dynamicBuffer);        /* copy the static portion */        recvRequest.staticBuffer.limit(recvRequest.sBufSize);        recvRequest.staticBuffer.position(0);        sendRequest.staticBuffer.limit(recvRequest.sBufSize + 			sendRequest.bufoffset );        sendRequest.staticBuffer.position(sendRequest.bufoffset);        recvRequest.staticBuffer.put(sendRequest.staticBuffer);        recvRequest.staticBuffer.flip();        /* comms complete */        //completedList.add(sendRequest);         //completedList.add(recvRequest);         recvRequest.setCompleted(true);        sendRequest.setCompleted(true);        return sendRequest;      }      else {        recvRequest = new NIORecvRequest(id.uuid(), tag, false, 			context, sendRequest.sBufSize,			sendRequest.dBufSize,			sendRequest.commMode, null // (socketChannel)                        , sendRequest.numEls, sendRequest.type,			-1, -1, srcUUID);        recvRequest.sendRequest = sendRequest;        arrQue.add(recvRequest);        sem.signal();        return sendRequest;      }    }        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("isend with remote process connections");    }    NIOSendRequest req = new NIOSendRequest(tag, id(), dstID, buf, context,                                            STD_COMM_MODE,                                            sendCounter());    SocketChannel channel = worldWritableTable.get(dstUUID);    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("channel :" + channel);    }    if ( (req.sBufSize + req.dBufSize) <= psl) {      /* Eager-Send Procotol */      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.debug("get writeLock for this channel");      }      CustomSemaphore wLock = writeLockTable.get(channel);      try {        wLock.acquire();        eagerSend(req, channel);        wLock.signal();        //completedList.add( req );         req.notifyMe();      }      catch (Exception e) {        throw new XDevException(e);      }    }    /* Rendezvous Protocol */    else if ( (req.sBufSize + req.dBufSize) > psl) {      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.debug("rendezvous protocol.");        logger.debug(" get send-comms set lock ");      }      try {       sLock.acquire();      }catch(Exception e){      }            sendMap.put(new Integer(req.sendCounter), req);      sLock.signal();      CustomSemaphore wLock = writeLockTable.get(channel);      try {        wLock.acquire();        rendezCtrlMsgSend(req, channel);        wLock.signal();      }      catch (Exception e) {        throw new XDevException(e);      }    }    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info("---isend ends---<" + tag + ">");    }    return req;  } //end isend.  /**   * Non-blocking synchronous send.   */  public mpjdev.Request issend(mpjbuf.Buffer buf, ProcessID dstID, int tag,                               int context) throws XDevException {    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("---issend---");    }    NIOSendRequest req = null;    SocketChannel channel = null;    UUID dstUUID = dstID.uuid();    UUID srcUUID = id.uuid();    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info("---isend---<" + tag + ">");      logger.debug("sender :" + id.uuid());      logger.debug("receiver :" + dstUUID);      logger.debug("tag :" + tag);      //logger.debug("staticBufferSize :" + req.sBufSize );      //logger.debug("dynamicBufferSize :" + req.dBufSize );      //logger.debug("req.sendCounter :" + req.sendCounter );    }    if (dstUUID.equals(srcUUID)) {      if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {        logger.info("sender and receiver are same process ");      }      try {      sem.acquire();      }catch(Exception e){}      NIORecvRequest recvRequest = recvQueue.rem(context, srcUUID, tag);      NIOSendRequest sendRequest = new NIOSendRequest(tag,           id(), dstID, buf, context, SYNC_COMM_MODE, -1);      if (recvRequest != null) {        sem.signal();        recvRequest.type = sendRequest.type;        recvRequest.numEls = sendRequest.numEls;        recvRequest.buffer.setSize(sendRequest.sBufSize);        recvRequest.sBufSize = sendRequest.sBufSize;        recvRequest.dBufSize = sendRequest.dBufSize;        /* copy the dynamic portion */        recvRequest.buffer.setDynamicBuffer(sendRequest.dynamicBuffer);        /* copy the static portion */        recvRequest.staticBuffer.limit(recvRequest.sBufSize);        recvRequest.staticBuffer.position(0);        sendRequest.staticBuffer.limit(recvRequest.sBufSize + 			sendRequest.bufoffset );        sendRequest.staticBuffer.position(sendRequest.bufoffset);        recvRequest.staticBuffer.put(sendRequest.staticBuffer);        recvRequest.staticBuffer.flip();        /* comms complete */        //completedList.add(sendRequest);         //completedList.add(recvRequest);         recvRequest.setCompleted(true);        sendRequest.setCompleted(true);        return sendRequest;      }      else {        recvRequest = new NIORecvRequest(id.uuid(), tag, false,                                  context, sendRequest.sBufSize,                                 sendRequest.dBufSize,                                 sendRequest.commMode, null // (socketChannel)                                 , sendRequest.numEls, sendRequest.type,                                 -1, -1, srcUUID);        recvRequest.sendRequest = sendRequest;        arrQue.add(recvRequest);        sem.signal();        return sendRequest;      }    }            if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("isend with remote process connections");    }    req = new NIOSendRequest(tag, id(), dstID,                             buf, context, SYNC_COMM_MODE, sendCounter());    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("sender :" + id.uuid());      logger.debug("receiver :" + dstUUID);      logger.debug("tag :" + tag);      logger.debug("staticBufferSize :" + req.sBufSize);      logger.debug("dynamicBufferSize :" + req.dBufSize);      logger.debug("buffset :" + 0);      logger.debug("Rendezous(isend), calling rendezCtrlMsgSend");    }    channel = worldWritableTable.get(dstUUID);    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("channel (can never be null) " + channel);    }        try {      sLock.acquire();    }catch(Exception e){}    sendMap.put(new Integer(req.sendCounter), req);    sLock.signal();    CustomSemaphore wLock = writeLockTable.get(channel);    try {      wLock.acquire();      rendezCtrlMsgSend(req, channel);      wLock.signal();    }    catch (Exception e) {      throw new XDevException(e);    }    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.info("---issend ends---<" + tag + ">");    }    return req;  } //end issend  /**   * Blocking send method.   * @param buf The mpjbuf.Buffer object containing the data.   * @param dstID ProcessID of the destination   * @param tag The unique identifier of the message   * @param context An integer providing "safe universe" for messages.   * @throws MPJException If the buffer is null, dest process ID is insane.   * @throws java.nio.BufferOverflowException   * @throws ReadOnlyBufferException   * @throws IOException If some I/O error occurs   */  public void send(mpjbuf.Buffer buf, ProcessID dstID, int tag,                   int context) throws XDevException {    Request request = isend(buf, dstID, tag, context);        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("Calling request.iwait() in send method, it may not return");    }        request.iwait();    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("Called request.iwait() in sng this, means it returned");    }  }  /**   * Blocking synchronous send   */  public void ssend(mpjbuf.Buffer buf, ProcessID dstID, int tag,                    int context) throws XDevException {    Request request = issend(buf, dstID, tag, context);    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("Calling request.iwait() in send method, it may not return");    }    request.iwait();    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("Called request.iwait()eeing this, means it returned");    }  }  /**   * This method is used by the sender to send the control message to   * the receiver   */  private void rendezCtrlMsgSend(NIOSendRequest request,                                 SocketChannel socketChannel) throws Exception {    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("---rendezCtrlMsgSend---");      logger.debug(" request.tag " + request.tag);    }    request.staticBuffer.limit( request.bufoffset ) ;     request.staticBuffer.position( 0 );    request.staticBuffer.putInt(READY_TO_SEND);    request.staticBuffer.putLong(id().uuid().getMostSignificantBits());    request.staticBuffer.putLong(id().uuid().getLeastSignificantBits());    request.staticBuffer.putInt(request.tag);    request.staticBuffer.putInt(request.sBufSize);    request.staticBuffer.putInt(request.dBufSize);    request.staticBuffer.putInt(request.commMode);    request.staticBuffer.putInt(request.context);    request.staticBuffer.putInt(request.numEls);    request.staticBuffer.putInt(request.sendCounter);    request.staticBuffer.put( (byte) request.type.getCode());    request.staticBuffer.limit( request.bufoffset) ;     request.staticBuffer.position( 0 );    int w = 0;    int ww = 0;    while (request.staticBuffer.hasRemaining()) {      if ( (w = socketChannel.write(request.staticBuffer)) == -1) {        throw new ClosedChannelException();      }      ww += w;    }    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("---rendezCtrlMsgSend ENDS ---");    }  } //end rendezCtrlMsgSend  private void eagerSend(NIOSendRequest request,                         SocketChannel socketChannel) throws Exception {	      //long strt = System.nanoTime() ;    //long stop = 0L, intv = 0L;    int w = 0, ww = 0;    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("---eagerSend---");      logger.debug(" request.bufoffset "+request.bufoffset);     }    request.staticBuffer.limit( request.bufoffset ) ;     request.staticBuffer.position( 0 );    if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {      logger.debug("sendBuffer "+request.staticBuffer) ;    }    request.staticBuffer.putInt(READY_TO_SEND);    request.staticBuffer.putLong(id().uuid().getMostSignificantBits());    request.staticBuffer.putLong(id().uuid().getLeastSignificantBits());    request.staticBuffer.putInt(request.tag);    request.staticBuffer.putInt(request.sBufSize);    request.staticBuffer.putInt(request.dBufSize);    request.staticBuffer.putInt(request.commMode);    request.staticBuffer.putInt(request.context);    request.staticBuffer.putInt(request.numEls);    request.staticBuffer.putInt(request.sendCounter);    request.staticBuffer.put( (byte) request.type.getCode() );					    //stop = System.nanoTime() ;     //intv = stop - strt ;    //strt = stop;    //logger.debug("isend_packing_time_route1 <"+intv/1000);    /* Writing the static section of the buffer */    if (request.sBufSize > 0) {      request.staticBuffer.limit(request.sBufSize + request.bufoffset );      request.staticBuffer.position(0);      w = 0; ww = 0;      while (request.staticBuffer.hasRemaining()) {        if (mpi.MPI.DEBUG && logger.isDebugEnabled()) {          logger.debug("request.staticBuffer (1)<" +                       request.staticBuffer + "> w=" + w);        }	        if ( (w = socketChannel.write(request.staticBuffer)) == -1) {          throw new ClosedChannelException();        }	ww += w ;	if(mpi.MPI.DEBUG && logger.isDebugEnabled()) {	  logger.debug("request.staticBuffer (2)<"+request.staticBuffer +			  "> w=" + w);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -