📄 niodevice.java
字号:
NIORecvRequest next = send.arrNext[i]; NIORecvRequest prev = send.arrPrev[i]; prev.arrNext[i] = next; next.arrPrev[i] = prev; } } NIORecvRequest check(int context, UUID srcUUID, int tag) { Key key = new Key(context, srcUUID, tag); return get(key); } void add(NIORecvRequest request) { request.arrKeys = new NIODevice.Key[] { new NIODevice.Key(request.context, request.srcUUID, request.tag), new NIODevice.Key(request.context, request.srcUUID, xdev.Device.ANY_TAG), new NIODevice.Key(request.context, xdev.Device.ANY_SRC.uuid(), request.tag), new NIODevice.Key(request.context, xdev.Device.ANY_SRC.uuid(), xdev.Device.ANY_TAG) }; for (int i = 0; i < request.arrKeys.length; i++) { add(i, request.arrKeys[i], request); } } } class Key { private int context, tag; private UUID srcUUID; Key(int context, UUID srcUUID, int tag) { this.context = context; this.srcUUID = srcUUID; this.tag = tag; } public int hashCode() { return tag + context * 5 + srcUUID.hashCode() * 17; } public boolean equals(Object obj) { if (obj instanceof Key) { Key other = (Key) obj; return (other.context == context) && (srcUUID.equals(other.srcUUID)) && (other.tag == tag); } return false; } } ArrvQueue arrQue = new ArrvQueue(); /* * Name of machine where this xdev process is running */ String localHostName = null; /* Server Socket Channel */ ServerSocketChannel writableServerChannel = null; ServerSocketChannel readableServerChannel = null; ByteBuffer rcb = ByteBuffer.allocate(45); ByteBuffer rendezBuffer = ByteBuffer.allocate(8); ByteBuffer rendez_send_buffer = ByteBuffer.allocate(17); ByteBuffer wcb = ByteBuffer.allocate(49); ByteBuffer e_wcb = ByteBuffer.allocate(49); static ByteBuffer _wcb = ByteBuffer.allocate(21); //eendezCtrlMsgR2S ... ByteBuffer s_wcb = ByteBuffer.allocate(20); //rendezCtrlMsgR2S ... /* Threads for two selectors */ Thread selectorThreadStarter = null; int psl = 0, nprocs = 0, rank = 0, size = 0, my_server_port = 0; ProcessID[] pids = null; ProcessID id = null; /* * This integer is used as the header to send initial control messages */ private final int INIT_MSG_HEADER_DATA_CHANNEL = -21; private final int INIT_MSG_HEADER_CTRL_CHANNEL = -20; private final int RENDEZ_CTRL_MSG_LENGTH = 4; private final int ACK_LENGTH = 17; private final int CTRL_MSG_LENGTH = 45; int SEND_OVERHEAD = CTRL_MSG_LENGTH + 4 ; int RECV_OVERHEAD = 0; private final int STD_COMM_MODE = 3; private final int SYNC_COMM_MODE = 2; private final boolean NO_ACK_RECEIVED = false; private final boolean REQ_NOT_COMPLETED = false; private final boolean RECV_POSTED = true; private final int READY_TO_SEND = -24; private static final int ACK_HEADER = -23; private final int RENDEZ_HEADER = -22; private final int SEND_ACK_TO_SENDER = -80; private final int RECV_IN_USER_MEMORY = -81; private final int RECV_IN_DEV_MEMORY = -82; private final int MORE_TO_WRITE = -83; private final int MORE_TO_READ = -84; private String mpjHomeDir = null; SocketChannel msgReceivedFrom; //what is this doing here? boolean finished = false; public NIODevice() { this.deviceName = "niodev"; } /** * Initializes niodev. * @param args Arguments to NIODevice. * @return ProcessID[] An array of ProcessIDs. */ public ProcessID[] init(String args[]) throws XDevException { /* * * The init method reads names/ports/ranks from a config file. It finds * its own entry in the config file (by comparing ranks), and creates * a server socket at the port specified for that entry. Also, it creates * another server socket at (portspecified+1). It connects to server * sockets (not one, two server sockets) of processes with rank higher * than its own. * * At the end of this process, each process is connected to every * other process with two socketChannels. The reason for two * channels is that every process has writable and reable channel. * The writable channel is in blocking mode, whereas, the readable * channel is in non-blocking mode. In terms of datastructures, * 'writableChannels' (Vector) contains all writable channels, and * 'readableChannels' (Vector) contains all readable channels for * every process. The next step is that each process send its own rank, * ProcessID to all the other processes. At the end of this, each process * knows about all the peers and have ProcessID (key), SocketChannel (val) * in 'worldWritableTable' and 'worldReadableTable'. * * As the name suggests, worldWritableTable is used for writing messages * into channels, and worldReadableTable is used for receiving. The * selector-thread would generate events for worldReadableTable * SocketChannels whereas, the ones (SocketChannels) in * worldWritableTable have nothing to do with selector thread as they * are in blocking mode. * */ if (args.length < 3) { throw new XDevException("Usage: " + "java NIODevice <myrank> <conf_file> <device_name>"+ "conf_file can be, ../conf/xdev.conf <Local>"+ "OR http://holly.dsg.port.ac.uk:15000/xdev.conf <Remote>"); } rank = Integer.parseInt(args[0]); UUID myuuid = UUID.randomUUID(); id = new ProcessID(myuuid); //, rank); Map<String,String> map = System.getenv() ; mpjHomeDir = map.get("MPJ_HOME"); try { localaddr = InetAddress.getLocalHost(); localHostName = localaddr.getHostName(); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("--init method of niodev is called--"); logger.info("Address: " + localaddr); logger.info("Name :" + localHostName); logger.info("rank :" + rank); } } catch (UnknownHostException unkhe) { throw new XDevException(unkhe); } ConfigReader reader = null; try { reader = new ConfigReader(args[1]); nprocs = (new Integer(reader.readNoOfProc())).intValue(); psl = (new Integer(reader.readIntAsString())).intValue(); if(psl < 12) { logger.debug("lowest possible psl is 12 bytes"); psl = 12; } } catch (Exception config_error) { throw new XDevException(config_error); } pids = new ProcessID[nprocs]; if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.info("total processes:<" + nprocs); logger.info("protocolSwitchLimit :<" + psl); } String[] nodeList = new String[nprocs]; int[] pList = new int[nprocs]; int[] rankList = new int[nprocs]; int count = 0; while (count < nprocs) { String line = null; try { line = reader.readLine(); } catch (IOException ioe) { throw new XDevException(ioe); } if (line == null || line.equals("") || line.equals("#")) { continue; } line = line.trim(); StringTokenizer tokenizer = new StringTokenizer(line, "@"); nodeList[count] = tokenizer.nextToken(); pList[count] = (new Integer(tokenizer.nextToken())).intValue(); rankList[count] = (new Integer(tokenizer.nextToken())).intValue(); count++; } reader.close(); /* Open the selector */ try { selector = Selector.open(); } catch (IOException ioe) { throw new XDevException(ioe); } /* Create server socket */ SocketChannel[] rChannels = new SocketChannel[nodeList.length - 1]; try { writableServerChannel = ServerSocketChannel.open(); writableServerChannel.configureBlocking(false); writableServerChannel.socket().bind(new InetSocketAddress(pList[rank])); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("created writableServerChannel on port " + pList[rank]); } writableServerChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException ioe) { throw new XDevException(ioe); } my_server_port = pList[rank]; /* Create control server socket */ SocketChannel[] wChannels = new SocketChannel[nodeList.length - 1]; try { readableServerChannel = ServerSocketChannel.open(); readableServerChannel.configureBlocking(false); readableServerChannel.socket().bind( new InetSocketAddress( (pList[rank] + 1))); readableServerChannel.register(selector, SelectionKey.OP_ACCEPT); if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("created readableServerChannel on port " + (pList[rank] + 1)); } } catch (IOException ioe) { throw new XDevException(ioe); } /* This is connection-code for data-channels. */ boolean connected = false; int temp = 0, index = 0; /* * This while loop is connecting to server sockets of other * peers. If there are 4 processes, process 0 will not connect * to any process, process 1 will connect to process 0, process * 2 will connect to pro 0&1, and process 3 will connect to pro * 0&1&2 */ while (temp < nprocs - 1) { if (rank == rankList[temp]) { temp++; continue; } if (rankList[temp] < rank) { while (!connected) { try { rChannels[index] = SocketChannel.open(); rChannels[index].configureBlocking(true); } catch (Exception e) { throw new XDevException(e); } if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("Connecting to " + nodeList[temp] + "@" + pList[temp]); } try { connected = rChannels[index].connect( new InetSocketAddress(nodeList[temp], pList[temp])); } catch (AlreadyConnectedException ace) { throw new XDevException(ace); } catch (ConnectionPendingException cpe) { throw new XDevException(cpe); } catch (ClosedChannelException cce) { throw new XDevException(cce); } catch (UnresolvedAddressException uae) { throw new XDevException(uae); } catch (UnsupportedAddressTypeException uate) { throw new XDevException(uate); } catch (SecurityException se) { throw new XDevException(se); } catch (IOException ioe) { // this is continuing coz process 1 alwayz connect to process 0 // server socket. If process 0 is not up, then this exception connected = false; if (mpi.MPI.DEBUG && logger.isDebugEnabled()) { logger.debug("connecting error ->" + ioe.getMessage()); } continue; } try { rChannels[index].configureBlocking(false); rChannels[index].register(selector, SelectionKey.OP_READ); rChannels[index].socket().setTcpNoDelay(true); //these are useful if running MPJ on gigabit ethernet. rChannels[index].socket().setSendBufferSize(524288); rChannels[index].socket().setReceiveBufferSize(524288); } catch (Exception e) { throw new XDevException(e); } synchronized (readableChannels) { readableChannels.add(rChannels[index]); if (readableChannels.size() == nprocs - 1) { readableChannels.notify(); } } //end synch connected = true; } //end while connected = false; } //end if index++; temp++; } //end while /* This is connection-code for control-channels. */ connected = false; temp = 0; index = 0; /* * This while loop is connecting to server sockets of other * peers. If there are 4 processes, process 0 will not connect * to any process, process 1 will connect to process 0, process * 2 will connect to pro 0&1, and process 3 will connect to pro * 0&1&2
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -