⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 trawler.java

📁 source code for fishnet
💻 JAVA
字号:
import java.net.ServerSocket;import java.net.Socket;import java.net.InetAddress;import java.util.HashMap;import java.util.Iterator;import java.io.IOException;import java.io.PrintWriter;import java.io.BufferedReader;import java.io.InputStreamReader;import java.io.FileNotFoundException;import java.lang.NumberFormatException;import java.lang.Integer;/** * <pre>    * Manages the emulated nodes.  * It works by listening for new TCP connections. Each new connecting node sends the UDP port * it is using for listening to peers. Also multiple emulated nodes might be running on the same machine * so this lets us disambiguate them. * The Trawler replies with the fishnet address that the emulated node should use, as well as the current  * neighbor list for that node as <fishnetAddress ipAddress udpPort> pairs. * The trawler updates this list as it changes. * * Usage: java Trawler <port to listen on> [topo file] *        *        Topo file is the topology file. It is an optional argument. By default all nodes will be neighbors. * </pre>    */public class Trawler {    private static Trawler trawler = null;    private ServerSocket socket;    private HashMap emulatedNodes;    private TrawlerCommandsParser parser;    /**     * Static method to get an instance of trawler.      * It will return null if the trawler has not been initialized with the port and topofile first     * @return An instance of this Trawler class     */    public static Trawler GetInstance() {	return trawler;    }    /**     * Static method to get an instance of trawler.      * @param port The port the trawler should listen for incoming connections on     * @return An instance of this Trawler class     * @throws IOException If an I/O error occurs when opening the socket     */    public static Trawler GetInstance(int port) throws IOException {	if(trawler == null) {	    trawler = new Trawler(port);	    	}	return trawler;    }    /*     * the next several routines track changes to the topology, and update     * the emulated nodes to reflect those changes.  Note these are changes     * to the *topology*, not the set of live emulated nodes.     */    /**     * An edge was removed. Notify both ends if they are alive other wise ignore. They will find out themselves anyway     * @param fishAddrA Address of one node of the edge     * @param fishAddrB Address of the other node of the edge     */    public void failEdge(int fishAddrA, int fishAddrB) {		EmulatedNode nodeA = this.getEmulatedNode(fishAddrA);	EmulatedNode nodeB = this.getEmulatedNode(fishAddrB);		if(nodeA != null && nodeB != null) {	    nodeA.removeEdge(fishAddrB);	    nodeB.removeEdge(fishAddrA);	}					      }    public void startEdge(int fishAddrA, int fishAddrB) {	EmulatedNode nodeA = this.getEmulatedNode(fishAddrA);	EmulatedNode nodeB = this.getEmulatedNode(fishAddrB);		if(nodeA != null && nodeB != null) {	    nodeA.putEdge(nodeB);	    nodeB.putEdge(nodeA);	}					      }    /**     * A node was removed from the topology.  We just remove the node's edges;     * the user has to kill the emulated node directly.     * NOTE: this must be called *BEFORE* the change to the topology!     * @param fishAddr Address of the node to be failed     */    public void failNode(int fishAddr) {	EmulatedNode node = this.getEmulatedNode(fishAddr);	if(node != null) {	    node.reset();	    this.removeAsNeighbor(node);	}    }    /**     * A node rejoined the topology, so we add back in all of its edges.     * @param fishAddr Address of node that is restarting     */    public void restartNode(int fishAddr) {	this.updateNeighbors(fishAddr);    }    /**     * <pre>               * Wait for a node to connect, then:     *	find out if any nodes have left while we were waiting     *		(and if so, update their neighbors so they stop sending them packets)     *	find a free fishnet Address to assign to the new node, and send it to them     *		(by writing to the node's TCP socket)     *	find out which UDP port they are listening to     *		(by reading from the node's TCP socket)     *	tell the node about all its neighbors' IP addresses and port #'s     *	tell all their neighbors with their IP address and port #     *	loop     * </pre>        * @param topofile Name of the topology filename. If it is null then all nodes are neighbors by default     * @throws FileNotFoundException If the filename could not be found     * @throws Exception Any other exception that might occur while the Trawler is running     */    public void start(String topofile) throws FileNotFoundException, Exception {		long deferParsingTill = -1;	if(topofile == null) {	    Topology.GetInstance(true);	}else {	    deferParsingTill = this.parser.parseFile(topofile, Utility.fishTime());	}		System.out.println("Trawler awaiting fish...");	Socket nodeSocket;	PrintWriter out;	while((nodeSocket = this.socket.accept()) != null) {	    try {		this.checkNodesQuit();				long now = Utility.fishTime();		if(deferParsingTill > -1 && deferParsingTill < now) {		    // Need to complete parsing topology file		    deferParsingTill = this.parser.parseRemainder(now);		}				BufferedReader in = new BufferedReader( new InputStreamReader( nodeSocket.getInputStream() ));		out = new PrintWriter(nodeSocket.getOutputStream(), true);				InetAddress ipAddress = nodeSocket.getInetAddress();		int port = Integer.parseInt(in.readLine());				if(port < 1024 || this.portConflict(ipAddress, port)) {		    System.err.println("Trawler: Illegal port: " + port);		    out.println(Packet.BROADCAST_ADDRESS);		    out.close();		    nodeSocket.close();		}else {		    // find a fishnet address to assign to the new node		    int fishAddr = this.freeFishAddr();		    		    if(fishAddr == -1) {			System.err.println("Trawler: out of addresses");			out.println(Packet.BROADCAST_ADDRESS);			out.close();			nodeSocket.close();		    		    }else {			//Disable Nagle			nodeSocket.setTcpNoDelay(true);			System.out.println("Got port " + port + ": assigning addr: " + fishAddr);			out.println(fishAddr);			this.emulatedNodes.put(new Integer(fishAddr), 					       new EmulatedNode(nodeSocket, out, fishAddr, ipAddress, port));			this.updateNeighbors(fishAddr);		    }		}	    }catch(NumberFormatException e) {		System.err.println("Msg received from node is not a port number. Socket: " + nodeSocket);	    }catch(IOException e) {		System.err.println("IOException occured while trying to creade new node. Exception: " + e);	    }catch(Exception e) {		System.err.println("Exception occured while trying to creade new node. Exception Stack Trace: ");		e.printStackTrace();			    }	}	        }    /**     * An emulated node has quit so notify its neighbors     * @param dyingNode The node that has quit     */    public void remove(EmulatedNode dyingNode) {	System.err.println("Removing node " + dyingNode.getFishAddr());	try {	    Iterator iter = this.emulatedNodes.values().iterator();	    while(iter.hasNext()) {		EmulatedNode node = (EmulatedNode)iter.next();		if(node.getFishAddr() == dyingNode.getFishAddr()) {		    iter.remove();		}else { 		    removeNeighbors(dyingNode, node);		}	    }	}catch(Exception e) {	    System.err.println("Exception occured while to remove emulated node: " + dyingNode.getFishAddr() + 			       " Exception: " + e);	}    }    /**     * Stop the Trawler     */    public void exit() {	System.out.println("Trawler exiting...");	System.exit(0);    }        private void removeAsNeighbor(EmulatedNode dyingNode) {	Iterator iter = this.emulatedNodes.values().iterator();	while(iter.hasNext()) {	    EmulatedNode node = (EmulatedNode)iter.next();	    removeNeighbors(dyingNode, node);	}    }    private void removeNeighbors(EmulatedNode dyingNode, EmulatedNode node) {	if( (Topology.GetInstance().getLiveEdge(node.getFishAddr(), dyingNode.getFishAddr()) != null) &&	    node.isAlive() ) {	    	    node.removeEdge(dyingNode.getFishAddr());	}    }    private void updateNeighbors(int fishAddr) {	EmulatedNode startingNode = this.getEmulatedNode(fishAddr);	if(startingNode != null) {	    Iterator iter = this.emulatedNodes.values().iterator();	    while(iter.hasNext()) {		EmulatedNode node = (EmulatedNode)iter.next();				if( node.getFishAddr() != fishAddr  && 		    (Topology.GetInstance().getLiveEdge(node.getFishAddr(), startingNode.getFishAddr()) != null) &&		    node.isAlive() ) {		    		    node.putEdge(startingNode);		    startingNode.putEdge(node);		}	    }	}    }        private void checkNodesQuit() {	try {	    Iterator iter = this.emulatedNodes.values().iterator();	    while(iter.hasNext()) {		EmulatedNode node = (EmulatedNode)iter.next();		if(!node.isAlive()) {		    node.close();		    iter.remove();		    this.removeAsNeighbor(node);		}	    }	}catch(Exception e) {	    System.err.println("Exception occured while to remove emulated node. Exception: " + e);	}    }         // returns -1 if no fish address is available    private int freeFishAddr() {	for(int i = 0; i < Packet.BROADCAST_ADDRESS; i++) {	    if(!this.emulatedNodes.containsKey(new Integer(i))) {		return i;	    }	}	return -1;    }    private boolean portConflict(InetAddress ipAddress, int port) {	Iterator iter = this.emulatedNodes.values().iterator();	while(iter.hasNext()) {	    EmulatedNode node = (EmulatedNode)iter.next();	    if(ipAddress.equals(node.getIPAddress()) && port == node.getPort()) {		return true;	    }	}	return false;    }    // return null if addr not in hash    private EmulatedNode getEmulatedNode(int fishAddr) {	Integer addr = new Integer(fishAddr);	EmulatedNode node = null;	if(this.emulatedNodes.containsKey(addr)) {	    node = (EmulatedNode)this.emulatedNodes.get(addr);	}	return node;    }    private Trawler(int port) throws IOException {	this.socket = new ServerSocket(port);	this.emulatedNodes = new HashMap();	this.parser = new TrawlerCommandsParser();    }    /**     * Entry point to start Trawler     */    public static void main(String[] args) {	if(args.length < 1) {	    System.err.println("Missing arguments");	    usage();	    return;	}		try {	    int port = Integer.parseInt(args[0]);	    String topofile = null;	    	    if(args.length > 1) {		topofile = args[1];	  	    }	    Trawler.GetInstance(port);	    Trawler.GetInstance().start(topofile);	    	}catch(FileNotFoundException e) {	    System.err.println("Incorrect topo file name given to Trawler. Exception: " + e);	}catch(IOException e) {	    System.err.println("Invalid port given to Trawler. Exception: " + e);		}catch(NumberFormatException e) {	    System.err.println("First argument must be the port number, an integer. Exception: " + e);	}catch(Exception e) {	    System.err.println("Exception occured in Trawler!! Exception: " + e);	}    }    private static void usage() {	System.out.println("Usage: java Trawler <port to listen on> [topo file]\n\n" +        			   "Topo file is the topology file. It is an optional argument.\n" +			   "By default all nodes will be neighbors.");    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -