📄 experserver.java
字号:
/* * Created on May 1, 2005 * * TODO To change the template for this generated file go to * Window - Preferences - Java - Code Style - Code Templates */import java.net.*;import java.io.*;import java.awt.*;import javax.swing.*;import java.awt.event.*;import java.util.*;/* This program automates experiments on the testbed for studying the Aqueduct protocol. * ExperServer makes sockets connections with a special shell called aqshell to collect data about sensor nodes. * For more information see the comments in the rest of this file and also take a look at the following: * src/apps/deluge_test/aqshell.c - the Aqueduct shell that this program connects to * src/mos/net/aqueduct_shell.c - runs on the sensor node sending data to aqshell and receiving commands * src/mos/net/aqueduct_shell.h - common header file for aqshell.c and aqueduct_shell.h * * I apologize for abusing the Java inner classes feature and not putting things in separate files, * but this started out as a quick-and-dirty hack. * * BTW, this code uses generic classes and varargs, you need JDK 1.5 to compile this. */public class ExperServer{ static final int TESTBED_SIZE = 25; /* Aqueduct shell commands. These are duplicated in src/mos/net/include/aqueduct_shell.h * See that file for descriptions. */ static final byte AQSHELL_MESSAGE = (byte)'#'; static final byte AQSHELL_START = (byte)'0'; static final byte AQSHELL_NEWVERSION = (byte)'1'; static final byte AQSHELL_COMPLETEPAGE = (byte)'2'; static final byte AQSHELL_COMPLETEPROG = (byte)'3'; static final byte AQSHELL_CLEARSTATS = (byte)'4'; static final byte AQSHELL_STARTSTATS = (byte)'5'; static final byte AQSHELL_STOPSTATS = (byte)'6'; static final byte AQSHELL_SAVESTATS = (byte)'7'; static final byte AQSHELL_GETSTATS = (byte)'8'; static final byte AQSHELL_GETSTATS_REPLY = (byte)'9'; static final byte AQSHELL_SETVERSION = (byte)'A'; static final byte AQSHELL_SETVERSION_REPLY = (byte)'B'; static final byte AQSHELL_SUMMARY = (byte)'C'; static final byte AQSHELL_PROFILE = (byte)'D'; static final byte AQSHELL_REQUEST = (byte)'E'; static final byte AQSHELL_DATA = (byte)'F'; static final byte AQSHELL_GETID = (byte)'G'; static final byte AQSHELL_GETID_REPLY = (byte)'H'; static final byte AQSHELL_SUMMARY_SEND = (byte)'I'; static final byte AQSHELL_PROFILE_SEND = (byte)'J'; static final byte AQSHELL_REQUEST_SEND = (byte)'K'; static final byte AQSHELL_DATA_SEND = (byte)'L'; static final byte AQSHELL_CLOSE = (byte)'M'; static final byte AQSHELL_SETLOG = (byte)'N'; static final byte AQSHELL_NOOP = (byte)'O'; static final byte AQSHELL_GETVERSION = (byte)'P'; static final byte AQSHELL_SETIMAGESIZE = (byte)'Q'; static final byte AQSHELL_SETCACHESIZE = (byte)'R'; static final byte AQSHELL_ALLQUIET = (byte)'S'; static final byte AQSHELL_NOTQUIET = (byte)'T'; static final byte AQSHELL_STOPUPDATE = (byte)'W'; static final byte AQSHELL_CACHEHIT = (byte)'X'; static final byte AQSHELL_CACHEMISS = (byte)'Y'; static final byte AQSHELL_CACHEHITFORWARD = (byte)'Z'; static final byte AQSHELL_CACHEHITOLDFORWARD = (byte)'a'; /* These bits control how a command packet is handled. * Acknowledging nodes copy these bits into their replies and set AQSHELL_F_ACK. */ static final byte AQSHELL_F_ACK = 0x01; // The acknowledging shell or node will set this bit when replying static final byte AQSHELL_F_PLEASEACK = 0x02; // The shell or node that receives this command should acknowledge static final byte AQSHELL_F_RESEND = 0x04; // Unused static final byte AQSHELL_F_FORWARDREPLY = 0x08; // This bit tells the shell to forward a node's ACK to ExperServer static final byte AQSHELL_F_SHELLCTL = 0x10; // This packet is a command for the shell static final byte AQSHELL_F_NODECTL = 0x20; // This packet is a command that should be forwarded to the node static final String HANDSHAKE = "aqshell Handshake"; // On creating a new connection both parties exchange this string static final int TEST_PORT = 58799; // ExperServer listens for connections on this TCP port static final int SERIAL_PORT = 58800; // Unused static final int USB_PORT = 58900; // The aqshell process monitoring /dev/ttyUSB0 will listen on TCP port USB_PORT, // aqshell monitoring /dev/ttyUSB1 will listen on USB_PORT+1, etc. static final byte SYNC_BYTE_1 = (byte)'#'; // All packets between aqshell and ExperServer start with these three bytes static final byte SYNC_BYTE_2 = (byte)'3'; static final byte SYNC_BYTE_3 = (byte)'C'; ControlPanel cp; // The GUI NodeSquare[] nodes; // GUIs for each node ArrayList<ShellConnection> connections = new ArrayList<ShellConnection>(); // Open connections to aqshell processes HashSet<Integer> ports = new HashSet<Integer>(); // Ports we are connected to ShellConnection seedNode; // Source for code updates Experiment expers = new Experiment(); // Runs experiments boolean logging = false; // We are logging. See src/apps/deluge_test/unlog.py. PrintWriter logStream = null; // Stream we are logging to. String logName = null; // Name of the log file. /* Use this like C printf. */ synchronized void log(String format, Object... args) { if (logging) { logStream.printf(format, args); logStream.flush(); } } /* Log packet counts in a format understood by unlog.py */ synchronized void logStats(ShellConnection sc) { int[] tot = new int[5]; log("Stats %d\n", sc.ui.node); log(" Node Summary Profile Request Data Total\n"); log("----- ------- ------- ------- ---- -----\n"); for (int r=0; r<sc.stats.length; r++) { int t = sc.stats[r][0] + sc.stats[r][1] + sc.stats[r][2] + sc.stats[r][3]; log("%5d %7d %7d %7d %4d %5d\n", r, sc.stats[r][0], sc.stats[r][1], sc.stats[r][2], sc.stats[r][3], t); tot[0] += sc.stats[r][0]; tot[1] += sc.stats[r][1]; tot[2] += sc.stats[r][2]; tot[3] += sc.stats[r][3]; tot[4] += t; } log("----- ------- ------- ------- ---- -----\n"); log("Total %7d %7d %7d %4d %5d\n", tot[0], tot[1], tot[2], tot[3], tot[4]); } /* Set whether we are logging. */ synchronized void setLogging(boolean logging) { this.logging = logging; if (!logging) logStream.flush(); } /* Set the stream we are logging to. */ synchronized void setLogStream(PrintWriter stream) { logStream = stream; } /* This function iterates over all the USB-to-serial convertors on the system and connects to the aqshell process. */ void findShells() { final String usbstart = "ttyUSB"; // Find all the USB-to-serial convertors File dev = new File("/dev"); String[] usbs = dev.list(new FilenameFilter() { public boolean accept(File dir, String name) { return name.startsWith(usbstart); } }); // No convertors found if (usbs == null) return; for (int i=0; i<usbs.length; i++) { try { System.out.println("Found USB device "+usbs[i]); // Figure out the port aqshell is listening on int port = USB_PORT + Integer.parseInt(usbs[i].substring(usbstart.length())); // Are we already connected? if (ports.contains(port)) { System.out.println("Already connected to node on "+usbs[i]); continue; } Socket s = new Socket("localhost", port); ShellConnection sc = new ShellConnection(s); // Test for a good handshake if (sc.doHandshake(false)) { System.out.println("Connected to "+s.getInetAddress().getHostName()+" on port "+port); // sc.key contains the port number aqshell was listening on // Don't try this port again ports.add(sc.key); // Add to our list of all connections connections.add(sc); // Start listening sc.start(); } } catch (NumberFormatException e) { e.printStackTrace(); } catch (ConnectException e) { // This is expected if no aqshell is listening on the port System.err.println(e.getMessage()); } catch (IOException e) { e.printStackTrace(); } } } /* Main ExperServer thread */ void run() { // Show the GUI cp = new ControlPanel(); JFrame f = new JFrame(); f.getContentPane().add(cp); f.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); f.pack(); f.setVisible(true); try { // Look for existing aqshells that we can connect to findShells(); ServerSocket ss = new ServerSocket(TEST_PORT); // Loop indefinitely and listen for aqshells that want to connect to us while (true) { Socket s = ss.accept(); try { ShellConnection sc = new ShellConnection(s); if (sc.doHandshake(true)) { System.out.println("Connected to "+s.getInetAddress().getHostName()+" on port "+TEST_PORT); // sc.key contains the port number aqshell would have been listening on if we had connected to it in findShells() // Don't try this port again ports.add(sc.key); // Add to our list of all connections connections.add(sc); // Start listening sc.start(); } } catch (IOException ioe) { ioe.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { // Start the server ExperServer es = new ExperServer(); es.run(); } /* Bounds check and return a node's GUI if the ID is good. */ NodeSquare getNodeUI(int node) { if (node<0 || node>=nodes.length) { System.err.println("Node index "+node+" out of range."); return null; } return nodes[node]; } /* This class manages the connection with one aqshell process. * There is one aqshell process per node. */ class ShellConnection implements Runnable { DataInputStream in; // For sending and receiving on the socket DataOutputStream out; Socket sock; // The socket connected to the aqshell int key; // The port number that the aqshell listens on when it isn't connected NodeSquare ui; // The GUI for this connection. It is null until we receive a valid node ID. // Every packet begins with these 3 bytes byte[] syncbytes = { SYNC_BYTE_1, SYNC_BYTE_2, SYNC_BYTE_3 }; // Packet for sending AqShellPacket spkt = new AqShellPacket(); int[][] stats = new int[32][4]; // Aqueduct packet counts gathered by aqshell long versionTime; // Time when this node was given a new version number long finishTime; // Time when this node completed receiving the new version int lastAck; // Last command that was acknowledged int version; // Version number of node int size; // Size of image on node boolean allquiet = true; // Haven't heard any requests or data from this node for 30 seconds boolean isTarget; // This node is interested in updates ShellConnection(Socket s) throws IOException { in = new DataInputStream(new BufferedInputStream(s.getInputStream())); out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); sock = s; } /* Handshake to initiate a connection between aqshell and ExperServer * isServer - is set to true if this is the process that got the socket from an accept() call * This process will go first in the handshaking * TODO Actually, it's not necessary to take turns, both could send right away, then both could listen */ boolean doHandshake(boolean isServer) { try { if (isServer) { // Server sends handshake string first out.writeUTF(HANDSHAKE); out.flush(); } else { // We're not the server so the 'key' is the port that we connected to // See findShells() to see how this value is used key = sock.getPort(); } // Receive and verify the handshake from the other end String hs = in.readUTF(); if (!hs.equals(HANDSHAKE)) return false; if (!isServer) { // Client sends handshake string second out.writeUTF(HANDSHAKE); out.flush(); } else { // If we're the server we have no idea what port the aqshell at the other end was listening on, // so he sends it to us key = in.readInt(); } System.out.println("Good handshake."); return true; } catch (IOException e) { e.printStackTrace(); } return false; } /* Launch this connection's listening thread. */ void start() { new Thread(this).start(); } /* This thread listens for packets from aqshell */ public void run() { // Packet for receiving AqShellPacket rpkt = new AqShellPacket(); try { // Ping the node so it will reply with its ID and version information sendNodeCommand(AQSHELL_GETVERSION); while (!sock.isClosed()) { // Look for the beginning of a packet // TODO Since TCP is reliable, I suppose this is unnecessary if (in.readUnsignedByte() != SYNC_BYTE_1) continue; if (in.readUnsignedByte() != SYNC_BYTE_2) continue; if (in.readUnsignedByte() != SYNC_BYTE_3) continue; rpkt.recv(in); // Read the packet from the socket // If this packet came from a node, check its address and find the corresponding GUI if ((rpkt.flags & AQSHELL_F_SHELLCTL)==0 && (ui == null || ui.node != rpkt.id)) { NodeSquare ns = getNodeUI(rpkt.id); if (ns != null) { ui = ns; ui.setConnection(this); } else { // We got a bad node address, print out the packet for debugging rpkt.dump(); } } switch (rpkt.command) { case AQSHELL_CLEARSTATS: case AQSHELL_STARTSTATS: case AQSHELL_STOPSTATS: case AQSHELL_SAVESTATS: case AQSHELL_GETID: case AQSHELL_MESSAGE: case AQSHELL_START: case AQSHELL_SUMMARY: case AQSHELL_PROFILE:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -