⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 niodevice.java

📁 MPI for java for Distributed Programming
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/* The MIT License Copyright (c) 2005 - 2007   1. Distributed Systems Group, University of Portsmouth (2005)   2. Aamir Shafi (2005 - 2007)   3. Bryan Carpenter (2005 - 2007)   4. Mark Baker (2005 - 2007) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *//* * File         : NIODevice.java * Author       : Aamir Shafi, Bryan Carpenter * Created      : Thu Apr  9 12:22:15 BST 2004 * Revision     : $Revision: 1.28 $ * Updated      : $Date: 2006/10/20 17:24:47 $ * */package xdev.niodev;import java.nio.channels.*;import java.nio.*;import java.net.*;import java.util.*;import mpjbuf.*;import mpjdev.*;import java.util.concurrent.Semaphore;import xdev.*;import java.io.IOException;import mpi.ProcTree;import org.apache.log4j.Logger;/** * <p> * This class is implementation of <i>xdev</i> based on the Java New * I/O package. * </p> * * <h3> Overview </h3> *   <p> *   Java New I/O adds non-blocking I/O to the Java language, which is *   extensively used in this device, to provide MPI functionality. Instead *   of directly using {@link java.net.Socket java.net.Socket}, niodev *   uses {@link java.nio.channels.SocketChannel *   java.nio.channels.SocketChannel}. This device alongwith the *   request classes like {@link xdev.niodev.NIORequest xdev.niodev.NIORequest}, *   {@link xdev.niodev.NIOSendRequest xdev.niodev.NIOSendRequest}, and *   {@link xdev.niodev.NIORecvRequest xdev.niodev.NIORecvRequest} forms the *   basis of communication functionality. *   </p> * * <h3> Initialization </h3> *   <p> *   'niodev' reads a configuration file which could be placed in the *   local/shared file system, or is accessbile through http server. *   The device *   reads this configuration file, and tries to find the *   IP@PORT@RANK entry. The basis for this search is the rank provided *   to the device by the runtime infrastructure. Once this entry is located *   , the device knows which ports to start the {@link ServerSocketChannel *   ServerSocketChannel} on. Once the server socket channels are started at *   this port and port+1, these are registered with the selector to accept *   connections. Every time a client socket connects to this server socket *   channel, an OP_ACCEPT event is generated. After starting these server *   sockets, a process connects to the process with rank lesser than its own. *   This essentially means that if there are four processes, then Process 0 *   will start two server sockets, Process 1 will start two server socket, *   and then try to connect to server sockets of Process 0. Similarly after *   starting two server sockets, Process 2 and 3 will connect to *   Processes 0&1, and Processes 0&1&2 respectively. *   </p> *   <p> *   Every time, niodev accepts or connects, it puts the {@link *   java.nio.channels.SocketChannel java.nio.channels.SocketChannel} into *   an instance of {@link java.util.Vector java.util.Vector} writableChannels *   (for writing messages) or readableChannels (for reading messages), *   depending on the serverSocketPort. Note that accepting client request *   is done in the selector thread, and *   connecting to server socket is done in the user-thread. This may *   result in concurrent access to writableChannels and readableChannels, *   and thus *   access to these should be synchronized. Once alltoall connectivity *   has been acheived, which means [writableChannels.size() == N-1] and *   [readableChannels.size() == N-1], then each process need to *   send information like its rank and UUID to every other process. *   These rank are the ones read from the configuration file provided *   by the MPJ runtime infrastructure. Once all the processes *   have exchanged this information, niodev has worldWritableTable and *   worldReadableTable, which are instances of {@link java.util.Hashtable *   java.util.Hashtable}. *   These two hashtables contain UUID as keys, and SocketChannels as *   values. Note that the channels in 'worldWritableTable' are in *   blocking mode and are only used for writing messages. For 'niodev', *   we have decided to keep different channels for reading and writing. *   The reason is that we want to use non-blocking reads and *   blocking writes. Non-blocking writes could hurt 'thread-safety' of *   niodev, or result in very complex code. *   These hashtables would be later used in send/recv method to *   obtain the reference of SocketChannel while providing key as the UUID *   of each process. These UUID, are contained within the *   {@link xdev.ProcessID xdev.ProcessID} objects. Again, while exchanging *   information, access to worldWriteTable, and worldReadableTable should be *   synchronized. Normally, the user thread sends all the information, *   and then waits to selector thread to receive similar messages from *   all the other processes. When the selector thread reads a message, *   it first looks at the first four bytes, and after looking at *   the header information, adds the information received appropriately to *   one of the hashtables. The value of headers could be *   INIT_MSG_HEADER_DATA_CHANNEL, and INIT_MSG_HEADER_CONTROL_CHANNEL. *   Once all of this is done, niodev has been initialized. *   </p> * * <h3> Modes of Send </h3> *   <p> *     <a href="http://www.mpi-forum.org"> MPI specifications </a> defines *     four modes of send operation. These are: standard mode of send, buffered *     mode of send, ready mode of send, and synchronous mode of send. *     <i> xdev </i> supports two modes of send -- standard and *     synchronous send. Ready send is similar to standard mode of send, *     and buffered mode is supported at the higher level alongwith *     the MPJ buffering API. *   </p> *   <h4> Standard Mode of Send </h4> *     <p> *     The standard mode of send uses two communication protocols. The *     first is 'Eager-Send Protocol' and the second is 'Rendezvous Protocol'. *     </p> *     <h5> EagerSend Protocol</h5> *       <p> *       niodev uses eager send protocol to communicate small messages. *       The rationale behind using this communication protcol is to *       minimize the latency for small messages. This protocol assumes *       that the receiver has buffer space to store the messages in case *       the matching recv is not posted. Eager-send protocol is used *       for messages of size less than and equal to 128K bytes. *       </p> *       <img src="../../res/eagersend.png"/> *     <h5> Rendezvous Protocol </h5> *       <p> *       niodev uses rendezvous protocol to communicate large messages. Before *       communicating large messages, there is an exchange of control messages *       to make sure that a matching recv is posted. This is necessary to *       avoid additional copying to temporary xdev buffer. *       </p> *       <img src="../../res/rendezvous.png"/> *   <h4> Synchronous Mode of Send </h4> *       <p> *       The synchronous mode of send uses rendezvous protocol described *       above for communication. *       </p> *       <img src="../../res/syncmode.png" /> * <h3> User and Selector Threads </h3> *    <p> *    During the initialization of xdev, xdev.NIODevice.init( ...) creates *    a selector thread which is used to first accept connections. Once *    all-to-all connectivity has been acheived, then the channels (both *    control and data) register with the selectors for READ_EVENT. This *    essentially means that whenever a channel receives some data, it *    generates OP_READ event, which basically informs that there is *    some data to read on this channel. Thus, the selector-thread is used *    normally for reading data from the channels. Also, when there is a short *    write -- suppose a thread is trying to write 10K message and only *    succeeds to write 5K bytes, then the channel register with the selector *    for OP_WRITE event, and comes back to complete writing the message *    into the SocketChannel. *    </p> *    <p> *      The user thread is basically invoked when isend/issend/send/ssend/ *      recv/irecv methods are called. <i> xdev </i> also attempts to *      provide multiple thread functionality, which basically means there *      could be multiple user-threads and trying to make calls to *      these (non) blocking send/recv methods. *    </p> *    <p> *      This poses a great programming challenge, because user threads *      and selector threads should synchronize before accessing *      send/recv queues that contain pending messages that are waiting *      for the data to be written or read from the channel *    </p> * <h3> Send and Recv Queues </h3> *     <p> *     </p> * <h3> Same Process Communications </h3> *     <p> *       There is special case, when a process is trying to send and recv *       a message to itself. In this case, the message is just copied from *       the sender buffer into the receiver buffer. The complexity comes in *       when wild-card like ANY_SOURCE are used. *     </p> */public class NIODevice    extends Device {  int index, root, extent, places;  ProcTree procTree;  long nextSequenceNum = 1L;  /*   * This semaphore is used to hold lock on send communication-sets   */  CustomSemaphore sLock = new CustomSemaphore(1);  /*   * This semaphore is used to hold lock while reading data from   * the SocketChannel   */  CustomSemaphore sem = new CustomSemaphore(1) ;  /*   * For rendezvous protocol, selector thread receives the ACK messages   * and a new thread is started that actually sends the messages.   * Selector thread receives the message in a ByteBuffer which is read   * by rendezSend thread. This semaphore is used to synchronize access   * to the buffer   */  CustomSemaphore buffer_sem = new CustomSemaphore(1);  static Logger logger = Logger.getLogger("mpj");  Vector<SocketChannel> writableChannels = new Vector<SocketChannel> ();  Vector<SocketChannel> readableChannels = new Vector<SocketChannel> ();  Hashtable<UUID, SocketChannel> worldWritableTable =      new Hashtable<UUID, SocketChannel> ();  Hashtable<UUID, SocketChannel> worldReadableTable =      new Hashtable<UUID, SocketChannel> ();  Hashtable<SocketChannel, CustomSemaphore> writeLockTable =      new Hashtable<SocketChannel, CustomSemaphore> ();  //private static final boolean DEBUG = false ;  //static final boolean DEBUG = true ;  InetAddress localaddr = null;  Selector selector = null;  volatile boolean selectorFlag = true;  private HashMap<Integer, NIOSendRequest> sendMap =      new HashMap<Integer, NIOSendRequest> ();  private int sendCounter = 0;  private int recvCounter = 0;  HashMap<Integer, NIORecvRequest> recvMap = new      HashMap<Integer, NIORecvRequest> ();  class RecvQueue {    private HashMap<Key, NIORecvRequest> map =        new HashMap<Key, NIORecvRequest> ();    private NIORecvRequest get(Key key) {      return map.get(key);    }    private void add(Key key, NIORecvRequest recv) {      NIORecvRequest head = map.get(key);      if (head == null) {        recv.recvNext = recv;        recv.recvPrev = recv;        map.put(key, recv);      }      else {        NIORecvRequest last = head.recvPrev;        last.recvNext = recv;        head.recvPrev = recv;        recv.recvPrev = last;        recv.recvNext = head;      }    }    private void rem(Key key, NIORecvRequest recv) {      NIORecvRequest head = map.get(key);      if (recv == head) {        if (recv.recvNext == recv) {          map.remove(key);        }        else {          NIORecvRequest next = recv.recvNext;          NIORecvRequest last = recv.recvPrev;          last.recvNext = next;          next.recvPrev = last;          map.put(key, next);        }      }      else {        NIORecvRequest next = recv.recvNext;        NIORecvRequest prev = recv.recvPrev;        prev.recvNext = next;        next.recvPrev = prev;      }    }    void add(NIORecvRequest request) {      request.recvKey = new NIODevice.Key(request.context, request.srcUUID,                                          request.tag);      add(request.recvKey, request);    }    NIORecvRequest rem(int context, UUID srcUUID, int tag) {      Key[] keys = new NIODevice.Key[] {          new NIODevice.Key(context, srcUUID, tag),          new NIODevice.Key(context,                            srcUUID, xdev.Device.ANY_TAG),          new NIODevice.Key(context, xdev.Device.ANY_SRC.uuid(),                            tag),          new NIODevice.Key(context, xdev.Device.ANY_SRC.uuid(),                            xdev.Device.ANY_TAG)      };      NIORecvRequest matchingRecv = null;      long minSequenceNum = Long.MAX_VALUE;      for (int i = 0; i < keys.length; i++) {        NIORecvRequest recv = get(keys[i]);        if (recv != null && recv.sequenceNum < minSequenceNum) {          minSequenceNum = recv.sequenceNum;          matchingRecv = recv;        }      }      if (matchingRecv != null) {        rem(matchingRecv.recvKey, matchingRecv);      }      return matchingRecv;    }  }  RecvQueue recvQueue = new RecvQueue();  class ArrvQueue {    private HashMap<Key, NIORecvRequest> map =        new HashMap<Key, NIORecvRequest> ();    NIORecvRequest rem(int context, UUID srcUUID, int tag) {      Key key = new Key(context, srcUUID, tag);      NIORecvRequest matchingSend = get(key);      if (matchingSend != null) {        Key[] keys = matchingSend.arrKeys;        for (int i = 0; i < keys.length; i++) {          rem(i, keys[i], matchingSend);        }      }      return matchingSend;    }    private NIORecvRequest get(Key key) {      return map.get(key);    }    private void add(int i, Key key, NIORecvRequest send) {      NIORecvRequest head = map.get(key);      if (head == null) {        send.arrNext[i] = send;        send.arrPrev[i] = send;        map.put(key, send);      }      else {        NIORecvRequest last = head.arrPrev[i];        last.arrNext[i] = send;        head.arrPrev[i] = send;        send.arrPrev[i] = last;        send.arrNext[i] = head;      }    }    private void rem(int i, Key key, NIORecvRequest send) {      NIORecvRequest head = map.get(key);      if (send == head) {        if (send.arrNext[i] == send) {          map.remove(key);        }        else {          NIORecvRequest next = send.arrNext[i];          NIORecvRequest last = send.arrPrev[i];          last.arrNext[i] = next;          next.arrPrev[i] = last;          map.put(key, next);        }      }      else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -