logicallink.java

来自「JGRoups源码」· Java 代码 · 共 333 行

JAVA
333
字号
// $Id: LogicalLink.java,v 1.5 2005/05/30 16:14:34 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.InetAddress;import java.util.Vector;/** * Implements a logical point-to-point link between 2 entities consisting of a number of physical links. * Traffic is routed over any of the physical link, according to policies. Examples are: send traffic * over all links, round-robin, use first link for 70% of traffic, other links for the remaining 30%. * * @author Bela Ban, June 2000 */public class LogicalLink implements Link.Receiver {    Receiver receiver=null;    final Vector links=new Vector();  // of Links    final int link_to_use=0;    Log log=LogFactory.getLog(getClass());    public class NoLinksAvailable extends Exception {        public String toString() {            return "LogicalLinks.NoLinksAvailable: there are no physical links available";        }    }    public class AllLinksDown extends Exception {        public String toString() {            return "LogicalLinks.AllLinksDown: all physical links are currently down";        }    }    public interface Receiver {        void receive(byte[] buf);        void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port);        void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port);        void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs);        void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port);    }    public LogicalLink(Receiver r) {        receiver=r;    }    public LogicalLink() {    }    public void addLink(String local_addr, int local_port, String remote_addr, int remote_port) {        Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, this);        if(links.contains(new_link))            log.error("LogicalLink.add(): link " + new_link + " is already present");        else            links.addElement(new_link);    }    public void addLink(String local_addr, int local_port, String remote_addr, int remote_port,                        long timeout, long hb_interval) {        Link new_link=new Link(local_addr, local_port, remote_addr, remote_port, timeout, hb_interval, this);        if(links.contains(new_link))            log.error("LogicalLink.add(): link " + new_link + " is already present");        else            links.addElement(new_link);    }    public void removeAllLinks() {        Link tmp;        for(int i=0; i < links.size(); i++) {            tmp=(Link)links.elementAt(i);            tmp.stop();        }        links.removeAllElements();    }    public Vector getLinks() {        return links;    }    public int numberOfLinks() {        return links.size();    }    public int numberOfEstablishedLinks() {        int n=0;        for(int i=0; i < links.size(); i++) {            if(((Link)links.elementAt(i)).established())                n++;        }        return n;    }    /**     * Start all links     */    public void start() {        Link tmp;        for(int i=0; i < links.size(); i++) {            tmp=(Link)links.elementAt(i);            try {                tmp.start();            }            catch(Exception ex) {                log.error("LogicalLink.start(): could not create physical link, reason: " + ex);            }        }    }    /**     * Stop all links     */    public void stop() {        Link tmp;        for(int i=0; i < links.size(); i++) {            tmp=(Link)links.elementAt(i);            tmp.stop();        }    }    /**     * Send a message to the other side     */    public boolean send(byte[] buf) throws AllLinksDown, NoLinksAvailable {        Link link;        int link_used=0;        if(buf == null || buf.length == 0) {            log.error("LogicalLink.send(): buf is null or empty");            return false;        }        if(links.size() == 0)            throw new NoLinksAvailable();        // current policy (make policies configurable later !): alternate between links.        // if failure, take first link that works        //  	link=(Link)links.elementAt(link_to_use);        //  	if(link.send(buf)) {        //  	    System.out.println("Send over link #" + link_to_use + ": " + link);        //  	    link_to_use=(link_to_use + 1) % links.size();        //  	    return true;        //  	}        //  	link_used=(link_to_use + 1) % links.size();        //  	while(link_used != link_to_use) {        //  	    link=(Link)links.elementAt(link_used);        //  	    if(link.send(buf)) {        //  		System.out.println("Send over link #" + link_used + ": " + link);        //  		link_to_use=(link_to_use + 1) % links.size();        //  		return true;        //  	    }        //  	    link_used=(link_used + 1) % links.size();        //  	}        // take first available link. use other links only if first is down. if we have smaller and bigger        // pipes, the bigger ones should be specified first (so we're using them first, and only when they        // are not available we use the smaller ones)        for(int i=0; i < links.size(); i++) {            link=(Link)links.elementAt(i);            if(link.established()) {                if(link.send(buf)) {                    System.out.println("Send over link #" + link_used + ": " + link);                    return true;                }            }        }        throw new AllLinksDown();    }    public void setReceiver(Receiver r) {        receiver=r;    }    /*-------- Interface Link.Receiver ---------*/    /**     * Receive a message from any of the physical links. That's why this and the next methods have to be     * synchronized     */    public synchronized void receive(byte[] buf) {        if(receiver != null)            receiver.receive(buf);    }    /**     * One of the physical links went down     */    public synchronized void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port) {        if(receiver != null)            receiver.linkDown(local, local_port, remote, remote_port);    }    /**     * One of the physical links came up     */    public synchronized void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port) {        if(receiver != null)            receiver.linkUp(local, local_port, remote, remote_port);    }    /**     * Missed one or more heartbeats. Link is not yet down, though     */    public synchronized void missedHeartbeat(InetAddress local, int local_port,                                             InetAddress remote, int remote_port, int num_missed_hbs) {        if(receiver != null)            receiver.missedHeartbeat(local, local_port, remote, remote_port, num_missed_hbs);    }    /**     * Heartbeat came back again (before link was taken down) after missing some heartbeats     */    public synchronized void receivedHeartbeatAgain(InetAddress local, int local_port,                                                    InetAddress remote, int remote_port) {        if(receiver != null)            receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);    }    private static class MyReceiver implements LogicalLink.Receiver {        public void receive(byte[] buf) {            System.out.println("<-- " + new String(buf));        }        /**         * All of the physical links are down --> logical link is down too         */        public synchronized void linkDown(InetAddress l, int lp, InetAddress r, int rp) {            System.out.println("** linkDown(): " + r + ':' + rp);        }        /**         * At least 1 physical links is up again         */        public synchronized void linkUp(InetAddress l, int lp, InetAddress r, int rp) {            System.out.println("** linkUp(): " + r + ':' + rp);        }        public synchronized void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) {            //System.out.println("missedHeartbeat(): " + r + ":" + rp);        }        public synchronized void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) {            //System.out.println("receivedHeartbeatAgain(): " + r + ":" + rp);        }    }    public static void main(String[] args) {        LogicalLink ll=new LogicalLink();        String local_host, remote_host;        int local_port, remote_port;        int i=0;        ll.setReceiver(new MyReceiver());        if(args.length % 4 != 0 || args.length == 0) {            System.err.println("\nLogicalLink <link+>\nwhere <link> is " +                               "<local host> <local port> <remote host> <remote port>\n");            return;        }        while(i < args.length) {            local_host=args[i++];            local_port=Integer.parseInt(args[i++]);            remote_host=args[i++];            remote_port=Integer.parseInt(args[i++]);            ll.addLink(local_host, local_port, remote_host, remote_port);        }        try {            ll.start();        }        catch(Exception e) {            System.err.println("LogicalLink.main(): " + e);        }        BufferedReader in=new BufferedReader(new InputStreamReader(System.in));        while(true) {            try {                System.out.print("> ");                System.out.flush();                String line=in.readLine();                ll.send(line.getBytes());            }            catch(Exception e) {                System.err.println(e);            }        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?