📄 gossiprouter.java
字号:
ae=(AddressEntry)j.next(); sb.append('\t'); sb.append(ae); sb.append('\n'); } } } } } return sb.toString(); } /** * The main server loop. Runs on the JGroups Router Main Thread. */ private void mainLoop() { Socket sock=null; DataInputStream input=null; DataOutputStream output=null; Address peer_addr=null, mbr, logical_addr; if(bindAddress == null) { bindAddress=srvSock.getInetAddress(); } System.out.println("GossipRouter started at " + new Date() + "\nListening on port " + port + " bound on address " + bindAddress + '\n'); GossipData req; String group; while(up && srvSock != null) { try { sock=srvSock.accept(); sock.setSoLinger(true, 500); input=new DataInputStream(sock.getInputStream()); // if(log.isTraceEnabled()) // log.trace("accepted connection from " + sock); req=new GossipData(); req.readFrom(input); switch(req.getType()) { case GossipRouter.REGISTER: mbr=req.getAddress(); group=req.getGroup(); if(log.isTraceEnabled()) log.trace("REGISTER(" + group + ", " + mbr + ")"); if(group == null || mbr == null) { if(log.isErrorEnabled()) log.error("group or member is null, cannot register member"); } else addGossipEntry(group, mbr, new AddressEntry(mbr)); Util.close(input); Util.close(sock); break; case GossipRouter.UNREGISTER: mbr=req.getAddress(); group=req.getGroup(); if(log.isTraceEnabled()) log.trace("UNREGISTER(" + group + ", " + mbr + ")"); if(group == null || mbr == null) { if(log.isErrorEnabled()) log.error("group or member is null, cannot unregister member"); } else removeGossipEntry(group, mbr); Util.close(input); Util.close(output); Util.close(sock); break; case GossipRouter.GOSSIP_GET: group=req.getGroup(); List mbrs=null; Map map; synchronized(routingTable) { map=(Map)routingTable.get(group); if(map != null) { mbrs=new LinkedList(map.keySet()); } } if(log.isTraceEnabled()) log.trace("GOSSIP_GET(" + group + ") --> " + mbrs); output=new DataOutputStream(sock.getOutputStream()); GossipData rsp=new GossipData(GossipRouter.GET_RSP, group, null, mbrs); rsp.writeTo(output); Util.close(input); Util.close(output); Util.close(sock); break; case GossipRouter.ROUTER_GET: group=req.getGroup(); output=new DataOutputStream(sock.getOutputStream()); List ret=null; synchronized(routingTable) { map=(Map)routingTable.get(group); if(map != null) { ret=new LinkedList(map.keySet()); } else ret=new LinkedList(); } if(log.isTraceEnabled()) log.trace("ROUTER_GET(" + group + ") --> " + ret); rsp=new GossipData(GossipRouter.GET_RSP, group, null, ret); rsp.writeTo(output); Util.close(input); Util.close(output); Util.close(sock); break; case GossipRouter.DUMP: output=new DataOutputStream(sock.getOutputStream()); output.writeUTF(dumpRoutingTable()); Util.close(input); Util.close(output); Util.close(sock); break; case GossipRouter.CONNECT: output=new DataOutputStream(sock.getOutputStream()); peer_addr=new IpAddress(sock.getInetAddress(), sock.getPort()); output=new DataOutputStream(sock.getOutputStream()); logical_addr=req.getAddress(); String group_name=req.getGroup(); if(log.isTraceEnabled()) log.trace("CONNECT(" + group_name + ", " + logical_addr + ")"); SocketThread st=new SocketThread(sock, input, group_name, logical_addr); addEntry(group_name, logical_addr, new AddressEntry(logical_addr, peer_addr, sock, st, output)); st.start(); break; case GossipRouter.DISCONNECT: Address addr=req.getAddress(); group_name=req.getGroup(); removeEntry(group_name, addr); if(log.isTraceEnabled()) log.trace("DISCONNECT(" + group_name + ", " + addr + ")"); Util.close(input); Util.close(output); Util.close(sock); break; case GossipRouter.SHUTDOWN: if(log.isInfoEnabled()) log.info("router shutting down"); Util.close(input); Util.close(output); Util.close(sock); up=false; break; default: if(log.isWarnEnabled()) log.warn("received unkown gossip request (gossip=" + req + ')'); break; } } catch(Exception e) { if(up) if(log.isErrorEnabled()) log.error("failure handling a client request", e); Util.close(input); Util.close(output); Util.close(sock); } } } /** * Cleans the routing tables while the Router is going down. */ private void cleanup() { // shutdown the routing threads and cleanup the tables synchronized(routingTable) { Map map; for(Iterator i=routingTable.values().iterator(); i.hasNext();) { map=(Map)i.next(); if(map != null) { for(Iterator j=map.values().iterator(); j.hasNext();) { AddressEntry e=(AddressEntry)j.next(); e.destroy(); } } } routingTable.clear(); } } /** * Connects to the ServerSocket and sends the shutdown header. */ private void shutdown() { Socket s=null; DataOutputStream dos=null; try { s=new Socket(srvSock.getInetAddress(),srvSock.getLocalPort()); dos=new DataOutputStream(s.getOutputStream()); dos.writeInt(SHUTDOWN); dos.writeUTF(""); } catch(Exception e) { if(log.isErrorEnabled()) log.error("shutdown failed: " + e); } finally { Util.close(s); Util.close(dos); } } /** * Removes expired gossip entries (entries older than EXPIRY_TIME msec). * @since 2.2.1 */ private void sweep() { long diff, currentTime=System.currentTimeMillis(); int num_entries_removed=0; synchronized(routingTable) { Map.Entry entry, entry2; Map map; AddressEntry ae; for(Iterator it=routingTable.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); map=(Map)entry.getValue(); if(map == null || map.size() == 0) { it.remove(); continue; } for(Iterator it2=map.entrySet().iterator(); it2.hasNext();) { entry2=(Map.Entry)it2.next(); ae=(GossipRouter.AddressEntry)entry2.getValue(); diff=currentTime - ae.timestamp; if(diff > expiryTime) { it2.remove(); if(log.isTraceEnabled()) log.trace("removed " + ae.logical_addr + " (" + diff + " msecs old)"); num_entries_removed++; } } } } if(num_entries_removed > 0) { if(log.isTraceEnabled()) log.trace("done (removed " + num_entries_removed + " entries)"); } } private void route(Address dest, String dest_group, byte[] msg, Address sender) { //if(log.isTraceEnabled()) { // int len=msg != null? msg.length : 0; //log.trace("routing request from " + sender + " for " + dest_group + " to " + // (dest == null? "ALL" : dest.toString()) + ", " + len + " bytes"); //} if(dest == null) { // send to all members in group dest.getChannelName() if(dest_group == null) { if(log.isErrorEnabled()) log.error("both dest address and group are null"); } else { sendToAllMembersInGroup(dest_group, msg, sender); } } else { // send to destination address AddressEntry ae=findAddressEntry(dest_group, dest); if(ae == null) { if(log.isTraceEnabled()) log.trace("cannot find " + dest + " in the routing table, \nrouting table=\n" + dumpRoutingTable()); return; } if(ae.output == null) { if(log.isErrorEnabled()) log.error(dest + " is associated with a null output stream"); return; } try { sendToMember(dest, ae.output, msg, sender); } catch(Exception e) { if(log.isErrorEnabled()) log.error("failed sending message to " + dest + ": " + e.getMessage()); removeEntry(dest_group, dest); // will close socket } } } private void addEntry(String groupname, Address logical_addr, AddressEntry entry) { addEntry(groupname, logical_addr, entry, false); } /** * Adds a new member to the routing group. */ private void addEntry(String groupname, Address logical_addr, AddressEntry entry, boolean update_only) { if(groupname == null || logical_addr == null) { if(log.isErrorEnabled()) log.error("groupname or logical_addr was null, entry was not added"); return; } synchronized(routingTable) { Map mbrs=(Map)routingTable.get(groupname); if(mbrs == null) { mbrs=new HashMap(); mbrs.put(logical_addr, entry); routingTable.put(groupname, mbrs);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -