📄 gossiprouter.java
字号:
// $Id: GossipRouter.java,v 1.22 2006/10/25 08:10:05 belaban Exp $package org.jgroups.stack;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.conf.ClassConfigurator;import org.jgroups.util.Util;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.net.InetAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.*;/** * Router for TCP based group comunication (using layer TCP instead of UDP). * Instead of the TCP layer sending packets point-to-point to each other * member, it sends the packet to the router which - depending on the target * address - multicasts or unicasts it to the group / or single member.<p> * This class is especially interesting for applets which cannot directly make * connections (neither UDP nor TCP) to a host different from the one they were * loaded from. Therefore, an applet would create a normal channel plus * protocol stack, but the bottom layer would have to be the TCP layer which * sends all packets point-to-point (over a TCP connection) to the router, * which in turn forwards them to their end location(s) (also over TCP). A * centralized router would therefore have to be running on the host the applet * was loaded from.<p> * An alternative for running JGroups in an applet (IP multicast is not allows * in applets as of 1.2), is to use point-to-point UDP communication via the * gossip server. However, then the appplet has to be signed which involves * additional administrative effort on the part of the user.<p> * @author Bela Ban * @author Ovidiu Feodorov <ovidiuf@users.sourceforge.net> * @since 2.1.1 */public class GossipRouter { public static final byte CONNECT=1; // CONNECT(group, addr) --> local address public static final byte DISCONNECT=2; // DISCONNECT(group, addr) public static final byte REGISTER=3; // REGISTER(group, addr) public static final byte GOSSIP_GET=4; // GET(group) --> List<addr> (members) public static final byte ROUTER_GET=5; // GET(group) --> List<addr> (members) public static final byte GET_RSP=6; // GET_RSP(List<addr>) public static final byte UNREGISTER=7; // UNREGISTER(group, addr) public static final byte DUMP=8; // DUMP public static final byte SHUTDOWN=9; public static final int PORT=8980; public static final long EXPIRY_TIME=30000; public static final long GOSSIP_REQUEST_TIMEOUT=1000; public static final long ROUTING_CLIENT_REPLY_TIMEOUT=120000; private int port; private String bindAddressString; // time (in msecs) until a cached 'gossip' member entry expires private long expiryTime; // number of millisecs the main thread waits to receive a gossip request // after connection was established; upon expiration, the router initiates // the routing protocol on the connection. Don't set the interval too big, // otherwise the router will appear slow in answering routing requests. private long gossipRequestTimeout; // time (in ms) main thread waits for a router client to send the routing // request type and the group afiliation before it declares the request // failed. private long routingClientReplyTimeout; // HashMap<String, Map<Address,AddressEntry> >. Maintains associations between groups and their members. Keys=group // names, values = maps of logical address / AddressEntry associations private final Map routingTable=new HashMap(); private ServerSocket srvSock=null; private InetAddress bindAddress=null; private boolean up=true; /** whether to discard message sent to self */ private boolean discard_loopbacks=false; // the cache sweeper Timer timer=null; protected final Log log=LogFactory.getLog(this.getClass()); // // JMX INSTRUMENTATION - MANAGEMENT INTERFACE // public GossipRouter() { this(PORT); } public GossipRouter(int port) { this(port, null); } public GossipRouter(int port, String bindAddressString) { this(port, bindAddressString, EXPIRY_TIME); } public GossipRouter(int port, String bindAddressString, long expiryTime) { this(port, bindAddressString, expiryTime, GOSSIP_REQUEST_TIMEOUT, ROUTING_CLIENT_REPLY_TIMEOUT); } public GossipRouter(int port, String bindAddressString, long expiryTime, long gossipRequestTimeout, long routingClientReplyTimeout) { this.port=port; this.bindAddressString=bindAddressString; this.expiryTime=expiryTime; this.gossipRequestTimeout=gossipRequestTimeout; this.routingClientReplyTimeout=routingClientReplyTimeout; } // // MANAGED ATTRIBUTES // public void setPort(int port) { this.port=port; } public int getPort() { return port; } public void setBindAddress(String bindAddress) { bindAddressString=bindAddress; } public String getBindAddress() { return bindAddressString; } public void setExpiryTime(long expiryTime) { this.expiryTime=expiryTime; } public long getExpiryTime() { return expiryTime; } public void setGossipRequestTimeout(long gossipRequestTimeout) { this.gossipRequestTimeout=gossipRequestTimeout; } public long getGossipRequestTimeout() { return gossipRequestTimeout; } public void setRoutingClientReplyTimeout(long routingClientReplyTimeout) { this.routingClientReplyTimeout=routingClientReplyTimeout; } public long getRoutingClientReplyTimeout() { return routingClientReplyTimeout; } public boolean isStarted() { return srvSock != null; } public boolean isDiscardLoopbacks() { return discard_loopbacks; } public void setDiscardLoopbacks(boolean discard_loopbacks) { this.discard_loopbacks=discard_loopbacks; } public static String type2String(int type) { switch(type) { case CONNECT: return "CONNECT"; case DISCONNECT: return "DISCONNECT"; case REGISTER: return "REGISTER"; case GOSSIP_GET: return "GOSSIP_GET"; case ROUTER_GET: return "ROUTER_GET"; case GET_RSP: return "GET_RSP"; case UNREGISTER: return "UNREGISTER"; case DUMP: return "DUMP"; case SHUTDOWN: return "SHUTDOWN"; default: return "unknown"; } } // // JBoss MBean LIFECYCLE OPERATIONS // /** * JBoss MBean lifecycle operation. */ public void create() throws Exception { // not used } /** * JBoss MBean lifecycle operation. Called after create(). When this method * is called, the managed attributes have already been set.<br> * Brings the Router in fully functional state. */ public void start() throws Exception { if(srvSock != null) { throw new Exception("Router already started."); } if(bindAddressString != null) { bindAddress=InetAddress.getByName(bindAddressString); srvSock=new ServerSocket(port, 50, bindAddress); } else { srvSock=new ServerSocket(port, 50); } up=true; // start the main server thread new Thread(new Runnable() { public void run() { mainLoop(); cleanup(); } }, "GossipRouter").start(); // starts the cache sweeper as daemon thread, so we won't block on it // upon termination timer=new Timer(true); timer.schedule(new TimerTask() { public void run() { sweep(); } }, expiryTime, expiryTime); } /** * JBoss MBean lifecycle operation. The JMX agent allways calls this method * before destroy(). Close connections and frees resources. */ public void stop() { up=false; if(srvSock == null) { if(log.isWarnEnabled()) log.warn("router already stopped"); return; } timer.cancel(); shutdown(); try { srvSock.close(); } catch(Exception e) { if(log.isErrorEnabled()) log.error("Failed to close server socket: " + e); } // exiting the mainLoop will clean the tables srvSock=null; if(log.isInfoEnabled()) log.info("router stopped"); } /** * JBoss MBean lifecycle operation. */ public void destroy() { // not used } // // ORDINARY OPERATIONS // public String dumpRoutingTable() { String label="routing"; StringBuffer sb=new StringBuffer(); synchronized(routingTable) { if(routingTable.size() == 0) { sb.append("empty "); sb.append(label); sb.append(" table"); } else { for(Iterator i=routingTable.keySet().iterator(); i.hasNext();) { String gname=(String)i.next(); sb.append("GROUP: '" + gname + "'\n"); Map map=(Map)routingTable.get(gname); if(map == null) { sb.append("\tnull list of addresses\n"); } else if(map.size() == 0) { sb.append("\tempty list of addresses\n"); } else { AddressEntry ae; for(Iterator j=map.values().iterator(); j.hasNext();) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -