⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gossiprouter.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            }            else {                AddressEntry tmp=(AddressEntry)mbrs.get(logical_addr);                if(tmp != null) { // already present                    if(update_only) {                        tmp.update();                        return;                    }                    tmp.destroy();                }                mbrs.put(logical_addr, entry);            }        }    }    private void removeEntry(String groupname, Address logical_addr) {        Map val;        synchronized(routingTable) {            val=(Map)routingTable.get(groupname);            if(val == null)                return;        }        synchronized(val) {            AddressEntry entry=(AddressEntry)val.get(logical_addr);            if(entry != null) {                entry.destroy();                val.remove(logical_addr);            }        }    }    /**     * @return null if not found     */    private AddressEntry findAddressEntry(String group_name, Address logical_addr) {        if(group_name == null || logical_addr == null)            return null;        synchronized(routingTable) {            Map val=(Map)routingTable.get(group_name);            if(val == null)                return null;            return (AddressEntry)val.get(logical_addr);        }    }    /**      * Adds a new member to the group in the gossip table or renews the      * membership where is the case.      * @since 2.2.1      */     private void addGossipEntry(String groupname, Address logical_addr, AddressEntry e) {         addEntry(groupname, logical_addr, e, true);     }     private void removeGossipEntry(String groupname, Address mbr) {         removeEntry(groupname, mbr);     }    private void sendToAllMembersInGroup(String groupname, byte[] msg, Address sender) {        Map val;        val=(Map)routingTable.get(groupname);        if(val == null || val.size() == 0)            return;        Map.Entry tmp;        AddressEntry entry;        synchronized(val) {            for(Iterator i=val.entrySet().iterator(); i.hasNext();) {                tmp=(Map.Entry)i.next();                entry=(GossipRouter.AddressEntry)tmp.getValue();                DataOutputStream dos=entry.output;                if(dos != null) {                    // send only to 'connected' members                    try {                        sendToMember(null, dos, msg, sender);                    }                    catch(Exception e) {                        if(log.isWarnEnabled()) log.warn("cannot send to " + entry.logical_addr + ": " + e.getMessage());                        entry.destroy(); // this closes the socket                        i.remove();                    }                }            }        }    }    /**     * @throws IOException     */    private void sendToMember(Address dest, DataOutputStream out, byte[] msg, Address sender) throws IOException {        if(out == null)            return;        if(discard_loopbacks && dest != null && dest.equals(sender)) {            return;        }        synchronized(out) {            Util.writeAddress(dest, out);            out.writeInt(msg.length);            out.write(msg, 0, msg.length);        }    }    /**     * Class used to store Addresses in both routing and gossip tables.     * If it is used for routing, sock and output have valid values, otherwise     * they're null and only the timestamp counts.     */    class AddressEntry {        Address logical_addr=null, physical_addr=null;        Socket sock=null;        DataOutputStream output=null;        long timestamp=0;        final SocketThread thread;        /**         * AddressEntry for a 'gossip' membership.         */        public AddressEntry(Address addr) {            this(addr, null, null, null, null);        }        public AddressEntry(Address logical_addr, Address physical_addr, Socket sock, SocketThread thread, DataOutputStream output) {            this.logical_addr=logical_addr;            this.physical_addr=physical_addr;            this.sock=sock;            this.thread=thread;            this.output=output;            this.timestamp=System.currentTimeMillis();        }        void destroy() {            if(thread != null) {                thread.finish();            }            Util.close(output);            output=null;            Util.close(sock);            sock=null;            timestamp=0;        }        public void update() {            timestamp=System.currentTimeMillis();        }        public boolean equals(Object other) {            return logical_addr.equals(((AddressEntry)other).logical_addr);        }        public String toString() {            StringBuffer sb=new StringBuffer("logical addr=");            sb.append(logical_addr).append(" (").append(physical_addr).append(")");            //if(sock != null) {              //  sb.append(", sock=");                //sb.append(sock);            //}            if(timestamp > 0) {                long diff=System.currentTimeMillis() - timestamp;                sb.append(", ").append(diff).append(" ms old");            }            return sb.toString();        }    }    private static int threadCounter=0;    /**     * A SocketThread manages one connection to a client. Its main task is message routing.     */    class SocketThread extends Thread {        private volatile boolean active=true;        Socket sock=null;        DataInputStream input=null;        Address logical_addr=null;        String group_name=null;        public SocketThread(Socket sock, DataInputStream ois, String group_name, Address logical_addr) {            super(Util.getGlobalThreadGroup(), "SocketThread " + (threadCounter++));            this.sock=sock;            input=ois;            this.group_name=group_name;            this.logical_addr=logical_addr;        }        void closeSocket() {            Util.close(input);            Util.close(sock);        }        void finish() {            active=false;        }        public void run() {            byte[] buf;            int len;            Address dst_addr=null;            String gname;            while(active) {                try {                    // 1. Group name is first                    gname=input.readUTF();                    // 2. Second is the destination address                    dst_addr=Util.readAddress(input);                    // 3. Then the length of the byte buffer representing the message                    len=input.readInt();                    if(len == 0) {                        if(log.isWarnEnabled()) log.warn("received null message");                        continue;                    }                    // 4. Finally the message itself, as a byte buffer                    buf=new byte[len];                    input.readFully(buf, 0, buf.length);  // message                }                catch(Exception io_ex) {                    if(log.isTraceEnabled())                        log.trace(sock.getInetAddress().getHostName() + ':' + sock.getPort() +                                " closed connection; removing it from routing table");                    removeEntry(group_name, logical_addr); // will close socket                    return;                }                try {                    route(dst_addr, gname, buf, logical_addr);                }                catch(Exception e) {                    if(log.isErrorEnabled()) log.error("failed routing request to " + dst_addr, e);                    break;                }            }            closeSocket();        }    }    public static void main(String[] args) throws Exception {        String arg;        int port=12001;        long expiry=GossipRouter.EXPIRY_TIME;        long timeout=GossipRouter.GOSSIP_REQUEST_TIMEOUT;        long routingTimeout=GossipRouter.ROUTING_CLIENT_REPLY_TIMEOUT;        GossipRouter router=null;        String bind_addr=null;        for(int i=0; i < args.length; i++) {            arg=args[i];            if("-port".equals(arg)) {                port=Integer.parseInt(args[++i]);                continue;            }            if("-bindaddress".equals(arg) || "-bind_addr".equals(arg)) {                bind_addr=args[++i];                continue;            }            if("-expiry".equals(arg)) {                expiry=Long.parseLong(args[++i]);                continue;            }            if("-timeout".equals(arg)) {                timeout=Long.parseLong(args[++i]);                continue;            }            if("-rtimeout".equals(arg)) {                routingTimeout=Long.parseLong(args[++i]);                continue;            }            help();            return;        }        System.out.println("GossipRouter is starting...");        try {            ClassConfigurator.getInstance(true);            router=new GossipRouter(port, bind_addr, expiry, timeout, routingTimeout);            router.start();        }        catch(Exception e) {            System.err.println(e);        }    }    static void help() {        System.out.println();        System.out.println("GossipRouter [-port <port>] [-bind_addr <address>] [options]");        System.out.println("Options: ");        System.out.println("        -expiry <msecs>   - Time until a gossip cache entry expires.");        System.out.println("        -timeout <msecs>  - Number of millisecs the router waits to receive");        System.out.println("                            a gossip request after connection was established;");        System.out.println("                            upon expiration, the router initiates the routing");        System.out.println("                            protocol on the connection.");    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -