📄 gossiprouter.java
字号:
} else { AddressEntry tmp=(AddressEntry)mbrs.get(logical_addr); if(tmp != null) { // already present if(update_only) { tmp.update(); return; } tmp.destroy(); } mbrs.put(logical_addr, entry); } } } private void removeEntry(String groupname, Address logical_addr) { Map val; synchronized(routingTable) { val=(Map)routingTable.get(groupname); if(val == null) return; } synchronized(val) { AddressEntry entry=(AddressEntry)val.get(logical_addr); if(entry != null) { entry.destroy(); val.remove(logical_addr); } } } /** * @return null if not found */ private AddressEntry findAddressEntry(String group_name, Address logical_addr) { if(group_name == null || logical_addr == null) return null; synchronized(routingTable) { Map val=(Map)routingTable.get(group_name); if(val == null) return null; return (AddressEntry)val.get(logical_addr); } } /** * Adds a new member to the group in the gossip table or renews the * membership where is the case. * @since 2.2.1 */ private void addGossipEntry(String groupname, Address logical_addr, AddressEntry e) { addEntry(groupname, logical_addr, e, true); } private void removeGossipEntry(String groupname, Address mbr) { removeEntry(groupname, mbr); } private void sendToAllMembersInGroup(String groupname, byte[] msg, Address sender) { Map val; val=(Map)routingTable.get(groupname); if(val == null || val.size() == 0) return; Map.Entry tmp; AddressEntry entry; synchronized(val) { for(Iterator i=val.entrySet().iterator(); i.hasNext();) { tmp=(Map.Entry)i.next(); entry=(GossipRouter.AddressEntry)tmp.getValue(); DataOutputStream dos=entry.output; if(dos != null) { // send only to 'connected' members try { sendToMember(null, dos, msg, sender); } catch(Exception e) { if(log.isWarnEnabled()) log.warn("cannot send to " + entry.logical_addr + ": " + e.getMessage()); entry.destroy(); // this closes the socket i.remove(); } } } } } /** * @throws IOException */ private void sendToMember(Address dest, DataOutputStream out, byte[] msg, Address sender) throws IOException { if(out == null) return; if(discard_loopbacks && dest != null && dest.equals(sender)) { return; } synchronized(out) { Util.writeAddress(dest, out); out.writeInt(msg.length); out.write(msg, 0, msg.length); } } /** * Class used to store Addresses in both routing and gossip tables. * If it is used for routing, sock and output have valid values, otherwise * they're null and only the timestamp counts. */ class AddressEntry { Address logical_addr=null, physical_addr=null; Socket sock=null; DataOutputStream output=null; long timestamp=0; final SocketThread thread; /** * AddressEntry for a 'gossip' membership. */ public AddressEntry(Address addr) { this(addr, null, null, null, null); } public AddressEntry(Address logical_addr, Address physical_addr, Socket sock, SocketThread thread, DataOutputStream output) { this.logical_addr=logical_addr; this.physical_addr=physical_addr; this.sock=sock; this.thread=thread; this.output=output; this.timestamp=System.currentTimeMillis(); } void destroy() { if(thread != null) { thread.finish(); } Util.close(output); output=null; Util.close(sock); sock=null; timestamp=0; } public void update() { timestamp=System.currentTimeMillis(); } public boolean equals(Object other) { return logical_addr.equals(((AddressEntry)other).logical_addr); } public String toString() { StringBuffer sb=new StringBuffer("logical addr="); sb.append(logical_addr).append(" (").append(physical_addr).append(")"); //if(sock != null) { // sb.append(", sock="); //sb.append(sock); //} if(timestamp > 0) { long diff=System.currentTimeMillis() - timestamp; sb.append(", ").append(diff).append(" ms old"); } return sb.toString(); } } private static int threadCounter=0; /** * A SocketThread manages one connection to a client. Its main task is message routing. */ class SocketThread extends Thread { private volatile boolean active=true; Socket sock=null; DataInputStream input=null; Address logical_addr=null; String group_name=null; public SocketThread(Socket sock, DataInputStream ois, String group_name, Address logical_addr) { super(Util.getGlobalThreadGroup(), "SocketThread " + (threadCounter++)); this.sock=sock; input=ois; this.group_name=group_name; this.logical_addr=logical_addr; } void closeSocket() { Util.close(input); Util.close(sock); } void finish() { active=false; } public void run() { byte[] buf; int len; Address dst_addr=null; String gname; while(active) { try { // 1. Group name is first gname=input.readUTF(); // 2. Second is the destination address dst_addr=Util.readAddress(input); // 3. Then the length of the byte buffer representing the message len=input.readInt(); if(len == 0) { if(log.isWarnEnabled()) log.warn("received null message"); continue; } // 4. Finally the message itself, as a byte buffer buf=new byte[len]; input.readFully(buf, 0, buf.length); // message } catch(Exception io_ex) { if(log.isTraceEnabled()) log.trace(sock.getInetAddress().getHostName() + ':' + sock.getPort() + " closed connection; removing it from routing table"); removeEntry(group_name, logical_addr); // will close socket return; } try { route(dst_addr, gname, buf, logical_addr); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed routing request to " + dst_addr, e); break; } } closeSocket(); } } public static void main(String[] args) throws Exception { String arg; int port=12001; long expiry=GossipRouter.EXPIRY_TIME; long timeout=GossipRouter.GOSSIP_REQUEST_TIMEOUT; long routingTimeout=GossipRouter.ROUTING_CLIENT_REPLY_TIMEOUT; GossipRouter router=null; String bind_addr=null; for(int i=0; i < args.length; i++) { arg=args[i]; if("-port".equals(arg)) { port=Integer.parseInt(args[++i]); continue; } if("-bindaddress".equals(arg) || "-bind_addr".equals(arg)) { bind_addr=args[++i]; continue; } if("-expiry".equals(arg)) { expiry=Long.parseLong(args[++i]); continue; } if("-timeout".equals(arg)) { timeout=Long.parseLong(args[++i]); continue; } if("-rtimeout".equals(arg)) { routingTimeout=Long.parseLong(args[++i]); continue; } help(); return; } System.out.println("GossipRouter is starting..."); try { ClassConfigurator.getInstance(true); router=new GossipRouter(port, bind_addr, expiry, timeout, routingTimeout); router.start(); } catch(Exception e) { System.err.println(e); } } static void help() { System.out.println(); System.out.println("GossipRouter [-port <port>] [-bind_addr <address>] [options]"); System.out.println("Options: "); System.out.println(" -expiry <msecs> - Time until a gossip cache entry expires."); System.out.println(" -timeout <msecs> - Number of millisecs the router waits to receive"); System.out.println(" a gossip request after connection was established;"); System.out.println(" upon expiration, the router initiates the routing"); System.out.println(" protocol on the connection."); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -