⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gossiprouter.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                            ae=(AddressEntry)j.next();                            sb.append('\t');                            sb.append(ae);                            sb.append('\n');                        }                    }                }            }        }        return sb.toString();    }    /**     * The main server loop. Runs on the JGroups Router Main Thread.     */    private void mainLoop() {        Socket sock=null;        DataInputStream input=null;        DataOutputStream output=null;        Address peer_addr=null, mbr, logical_addr;        if(bindAddress == null) {            bindAddress=srvSock.getInetAddress();        }        System.out.println("GossipRouter started at " + new Date() +                "\nListening on port " + port + " bound on address " + bindAddress + '\n');        GossipData req;        String group;        while(up && srvSock != null) {            try {                sock=srvSock.accept();                sock.setSoLinger(true, 500);                input=new DataInputStream(sock.getInputStream());                // if(log.isTraceEnabled())                   // log.trace("accepted connection from " + sock);                req=new GossipData();                req.readFrom(input);                switch(req.getType()) {                    case GossipRouter.REGISTER:                        mbr=req.getAddress();                        group=req.getGroup();                        if(log.isTraceEnabled())                            log.trace("REGISTER(" + group + ", " + mbr + ")");                        if(group == null || mbr == null) {                            if(log.isErrorEnabled()) log.error("group or member is null, cannot register member");                        }                        else                            addGossipEntry(group, mbr, new AddressEntry(mbr));                        Util.close(input);                        Util.close(sock);                        break;                    case GossipRouter.UNREGISTER:                        mbr=req.getAddress();                        group=req.getGroup();                        if(log.isTraceEnabled())                            log.trace("UNREGISTER(" + group + ", " + mbr + ")");                        if(group == null || mbr == null) {                            if(log.isErrorEnabled()) log.error("group or member is null, cannot unregister member");                        }                        else                            removeGossipEntry(group, mbr);                        Util.close(input);                        Util.close(output);                        Util.close(sock);                        break;                    case GossipRouter.GOSSIP_GET:                        group=req.getGroup();                        List mbrs=null;                        Map map;                        synchronized(routingTable) {                            map=(Map)routingTable.get(group);                            if(map != null) {                                mbrs=new LinkedList(map.keySet());                            }                        }                        if(log.isTraceEnabled())                            log.trace("GOSSIP_GET(" + group + ") --> " + mbrs);                        output=new DataOutputStream(sock.getOutputStream());                        GossipData rsp=new GossipData(GossipRouter.GET_RSP, group, null, mbrs);                        rsp.writeTo(output);                        Util.close(input);                        Util.close(output);                        Util.close(sock);                        break;                    case GossipRouter.ROUTER_GET:                        group=req.getGroup();                        output=new DataOutputStream(sock.getOutputStream());                        List ret=null;                        synchronized(routingTable) {                            map=(Map)routingTable.get(group);                            if(map != null) {                                ret=new LinkedList(map.keySet());                            }                            else                                ret=new LinkedList();                        }                        if(log.isTraceEnabled())                            log.trace("ROUTER_GET(" + group + ") --> " + ret);                        rsp=new GossipData(GossipRouter.GET_RSP, group, null, ret);                        rsp.writeTo(output);                        Util.close(input);                        Util.close(output);                        Util.close(sock);                        break;                    case GossipRouter.DUMP:                        output=new DataOutputStream(sock.getOutputStream());                        output.writeUTF(dumpRoutingTable());                        Util.close(input);                        Util.close(output);                        Util.close(sock);                        break;                    case GossipRouter.CONNECT:                        output=new DataOutputStream(sock.getOutputStream());                        peer_addr=new IpAddress(sock.getInetAddress(), sock.getPort());                        output=new DataOutputStream(sock.getOutputStream());                        logical_addr=req.getAddress();                        String group_name=req.getGroup();                        if(log.isTraceEnabled())                            log.trace("CONNECT(" + group_name + ", " + logical_addr + ")");                        SocketThread st=new SocketThread(sock, input, group_name, logical_addr);                        addEntry(group_name, logical_addr, new AddressEntry(logical_addr, peer_addr, sock, st, output));                        st.start();                        break;                    case GossipRouter.DISCONNECT:                        Address addr=req.getAddress();                        group_name=req.getGroup();                        removeEntry(group_name, addr);                        if(log.isTraceEnabled())                            log.trace("DISCONNECT(" + group_name + ", " + addr + ")");                        Util.close(input);                        Util.close(output);                        Util.close(sock);                        break;                    case GossipRouter.SHUTDOWN:                        if(log.isInfoEnabled()) log.info("router shutting down");                        Util.close(input);                        Util.close(output);                        Util.close(sock);                        up=false;                        break;                    default:                        if(log.isWarnEnabled())                            log.warn("received unkown gossip request (gossip=" + req + ')');                        break;                }            }            catch(Exception e) {                if(up)                    if(log.isErrorEnabled()) log.error("failure handling a client request", e);                Util.close(input);                Util.close(output);                Util.close(sock);            }        }    }    /**     * Cleans the routing tables while the Router is going down.     */    private void cleanup() {        // shutdown the routing threads and cleanup the tables        synchronized(routingTable) {            Map map;            for(Iterator i=routingTable.values().iterator(); i.hasNext();) {                map=(Map)i.next();                if(map != null) {                    for(Iterator j=map.values().iterator(); j.hasNext();) {                        AddressEntry e=(AddressEntry)j.next();                        e.destroy();                    }                }            }            routingTable.clear();        }    }    /**     * Connects to the ServerSocket and sends the shutdown header.     */    private void shutdown() {        Socket s=null;        DataOutputStream dos=null;        try {            s=new Socket(srvSock.getInetAddress(),srvSock.getLocalPort());            dos=new DataOutputStream(s.getOutputStream());            dos.writeInt(SHUTDOWN);            dos.writeUTF("");        }        catch(Exception e) {            if(log.isErrorEnabled()) log.error("shutdown failed: " + e);        }        finally {            Util.close(s);            Util.close(dos);        }    }    /**     * Removes expired gossip entries (entries older than EXPIRY_TIME msec).     * @since 2.2.1     */    private void sweep() {        long diff, currentTime=System.currentTimeMillis();        int num_entries_removed=0;        synchronized(routingTable) {            Map.Entry entry, entry2;            Map map;            AddressEntry ae;            for(Iterator it=routingTable.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                map=(Map)entry.getValue();                if(map == null || map.size() == 0) {                    it.remove();                    continue;                }                for(Iterator it2=map.entrySet().iterator(); it2.hasNext();) {                    entry2=(Map.Entry)it2.next();                    ae=(GossipRouter.AddressEntry)entry2.getValue();                    diff=currentTime - ae.timestamp;                    if(diff > expiryTime) {                        it2.remove();                        if(log.isTraceEnabled())                            log.trace("removed " + ae.logical_addr + " (" + diff + " msecs old)");                        num_entries_removed++;                    }                }            }        }        if(num_entries_removed > 0) {            if(log.isTraceEnabled()) log.trace("done (removed " + num_entries_removed + " entries)");        }    }    private void route(Address dest, String dest_group, byte[] msg, Address sender) {        //if(log.isTraceEnabled()) {          //  int len=msg != null? msg.length : 0;            //log.trace("routing request from " + sender + " for " + dest_group + " to " +              //      (dest == null? "ALL" : dest.toString()) + ", " + len + " bytes");        //}        if(dest == null) { // send to all members in group dest.getChannelName()            if(dest_group == null) {                if(log.isErrorEnabled()) log.error("both dest address and group are null");            }            else {                sendToAllMembersInGroup(dest_group, msg, sender);            }        }        else {            // send to destination address            AddressEntry ae=findAddressEntry(dest_group, dest);            if(ae == null) {                if(log.isTraceEnabled())                    log.trace("cannot find " + dest + " in the routing table, \nrouting table=\n" + dumpRoutingTable());                return;            }            if(ae.output == null) {                if(log.isErrorEnabled()) log.error(dest + " is associated with a null output stream");                return;            }            try {                sendToMember(dest, ae.output, msg, sender);            }            catch(Exception e) {                if(log.isErrorEnabled()) log.error("failed sending message to " + dest + ": " + e.getMessage());                removeEntry(dest_group, dest); // will close socket            }        }    }     private void addEntry(String groupname, Address logical_addr, AddressEntry entry) {         addEntry(groupname, logical_addr, entry, false);     }    /**     * Adds a new member to the routing group.     */    private void addEntry(String groupname, Address logical_addr, AddressEntry entry, boolean update_only) {        if(groupname == null || logical_addr == null) {            if(log.isErrorEnabled()) log.error("groupname or logical_addr was null, entry was not added");            return;        }        synchronized(routingTable) {            Map mbrs=(Map)routingTable.get(groupname);            if(mbrs == null) {                mbrs=new HashMap();                mbrs.put(logical_addr, entry);                routingTable.put(groupname, mbrs);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -