fd_sock.java

来自「JGRoups源码」· Java 代码 · 共 1,269 行 · 第 1/4 页

JAVA
1,269
字号
// $Id: FD_SOCK.java,v 1.51 2006/10/30 11:09:40 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.*;import java.io.*;import java.net.InetAddress;import java.net.ServerSocket;import java.net.Socket;import java.net.UnknownHostException;import java.util.*;import java.util.List;/** * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a * server socket and announces its address together with the server socket's address in a multicast. A * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity * will take place between 2 peers as long as they are alive (i.e. have their server sockets open). * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore * they won't be detected. * The FD_SOCK protocol will work for groups where members are on different hosts<p> * The costs involved are 2 additional threads: one that * monitors the client side of the socket connection (to monitor a peer) and another one that manages the * server socket. However, those threads will be idle as long as both peers are running. * @author Bela Ban May 29 2001 */public class FD_SOCK extends Protocol implements Runnable {    long                get_cache_timeout=3000;            // msecs to wait for the socket cache from the coordinator    static final long   get_cache_retry_timeout=500;       // msecs to wait until we retry getting the cache from coord    long                suspect_msg_interval=5000;         // (BroadcastTask): mcast SUSPECT every 5000 msecs    int                 num_tries=3;                       // attempts coord is solicited for socket cache until we give up    final Vector        members=new Vector(11);            // list of group members (updated on VIEW_CHANGE)    boolean             srv_sock_sent=false;               // has own socket been broadcast yet ?    final Vector        pingable_mbrs=new Vector(11);      // mbrs from which we select ping_dest. may be subset of 'members'    final Promise       get_cache_promise=new Promise();   // used for rendezvous on GET_CACHE and GET_CACHE_RSP    boolean             got_cache_from_coord=false;        // was cache already fetched ?    Address             local_addr=null;                   // our own address    ServerSocket        srv_sock=null;                     // server socket to which another member connects to monitor me    InetAddress         bind_addr=null;                    // the NIC on which the ServerSocket should listen    String              group_name=null;                   // the name of the group (set on CONNECT, nulled on DISCONNECT)    /** @deprecated Use {@link bind_addr} instead */    InetAddress         srv_sock_bind_addr=null;           // the NIC on which the ServerSocket should listen    private ServerSocketHandler srv_sock_handler=null;             // accepts new connections on srv_sock    IpAddress           srv_sock_addr=null;                // pair of server_socket:port    Address             ping_dest=null;                    // address of the member we monitor    Socket              ping_sock=null;                    // socket to the member we monitor    InputStream         ping_input=null;                   // input stream of the socket to the member we monitor    Thread              pinger_thread=null;                // listens on ping_sock, suspects member if socket is closed    final Object        pinger_mutex=new Object();    final Hashtable     cache=new Hashtable(11);           // keys=Addresses, vals=IpAddresses (socket:port)    /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)     * picks a random port */    int                 start_port=0;    final Promise       ping_addr_promise=new Promise();   // to fetch the ping_addr for ping_dest    final Object        sock_mutex=new Object();           // for access to ping_sock, ping_input    TimeScheduler       timer=null;    private final BroadcastTask bcast_task=new BroadcastTask();    // to transmit SUSPECT message (until view change)    boolean             regular_sock_close=false;         // used by interruptPingerThread() when new ping_dest is computed    int                 num_suspect_events=0;    private static final int INTERRUPT =8;    private static final int NORMAL_TERMINATION=9;    private static final int ABNORMAL_TERMINATION=-1;    private static final String name="FD_SOCK";    BoundedList          suspect_history=new BoundedList(20);    /** whether to use KEEP_ALIVE on the ping socket or not */    private boolean      keep_alive=true;    private boolean      running=false;    public String getName() {        return name;    }    public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}    public String getMembers() {return members != null? members.toString() : "null";}    public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}    public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}    public int getNumSuspectEventsGenerated() {return num_suspect_events;}    public String printSuspectHistory() {        StringBuffer sb=new StringBuffer();        for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {            sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");        }        return sb.toString();    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("get_cache_timeout");        if(str != null) {            get_cache_timeout=Long.parseLong(str);            props.remove("get_cache_timeout");        }        str=props.getProperty("suspect_msg_interval");        if(str != null) {            suspect_msg_interval=Long.parseLong(str);            props.remove("suspect_msg_interval");        }        str=props.getProperty("num_tries");        if(str != null) {            num_tries=Integer.parseInt(str);            props.remove("num_tries");        }        str=props.getProperty("start_port");        if(str != null) {            start_port=Integer.parseInt(str);            props.remove("start_port");        }        str=props.getProperty("keep_alive");        if(str != null) {            keep_alive=new Boolean(str).booleanValue();            props.remove("keep_alive");        }        str=props.getProperty("srv_sock_bind_addr");        if(str != null) {            log.error("srv_sock_bind_addr is deprecated and will be ignored - use bind_addr instead");            props.remove("srv_sock_bind_addr");        }        boolean ignore_systemprops=Util.isBindAddressPropertyIgnored();        str=Util.getProperty(new String[]{Global.BIND_ADDR, Global.BIND_ADDR_OLD}, props, "bind_addr",                             ignore_systemprops, null);        if(str != null) {            try {                bind_addr=InetAddress.getByName(str);            }            catch(UnknownHostException unknown) {                log.error("(bind_addr): host " + str + " not known");                return false;            }            props.remove("bind_addr");        }        if(props.size() > 0) {            log.error("the following properties are not recognized: " + props);            return false;        }        return true;    }    public void init() throws Exception {        srv_sock_handler=new ServerSocketHandler();        timer=stack != null ? stack.timer : null;        if(timer == null)            throw new Exception("FD_SOCK.init(): timer == null");    }    public void start() throws Exception {        super.start();        running=true;    }    public void stop() {        running=false;        bcast_task.removeAll();        stopPingerThread();        stopServerSocket();    }    public void resetStats() {        super.resetStats();        num_suspect_events=0;        suspect_history.removeAll();    }    public void up(Event evt) {        Message msg;        FdHeader hdr;        switch(evt.getType()) {        case Event.SET_LOCAL_ADDRESS:            local_addr=(Address) evt.getArg();            break;        case Event.MSG:            msg=(Message) evt.getArg();            hdr=(FdHeader) msg.removeHeader(name);            if(hdr == null)                break;  // message did not originate from FD_SOCK layer, just pass up            switch(hdr.type) {            case FdHeader.SUSPECT:                if(hdr.mbrs != null) {                    if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr);                    for(int i=0; i < hdr.mbrs.size(); i++) {                        Address m=(Address)hdr.mbrs.elementAt(i);                        if(local_addr != null && m.equals(local_addr)) {                            if(warn)                                log.warn("I was suspected by " + msg.getSrc() + "; ignoring the SUSPECT message");                            continue;                        }                        passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));                        passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));                    }                }                else                    if(warn) log.warn("[SUSPECT]: hdr.mbrs == null");                break;                // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it            case FdHeader.WHO_HAS_SOCK:                if(local_addr != null && local_addr.equals(msg.getSrc()))                    return; // don't reply to WHO_HAS bcasts sent by me !                if(hdr.mbr == null) {                    if(log.isErrorEnabled()) log.error("hdr.mbr is null");                    return;                }                if(trace) log.trace("who-has-sock " + hdr.mbr);                // 1. Try my own address, maybe it's me whose socket is wanted                if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) {                    sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr);  // unicast message to msg.getSrc()                    return;                }                // 2. If I don't have it, maybe it is in the cache                if(cache.containsKey(hdr.mbr))                    sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr));  // ucast msg                break;                // Update the cache with the addr:sock_addr entry (if on the same host)            case FdHeader.I_HAVE_SOCK:                if(hdr.mbr == null || hdr.sock_addr == null) {                    if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");                    return;                }                // if(!cache.containsKey(hdr.mbr))                cache.put(hdr.mbr, hdr.sock_addr); // update the cache                if(trace) log.trace("i-have-sock: " + hdr.mbr + " --> " +                                                   hdr.sock_addr + " (cache is " + cache + ')');                if(ping_dest != null && hdr.mbr.equals(ping_dest))                    ping_addr_promise.setResult(hdr.sock_addr);                break;                // Return the cache to the sender of this message            case FdHeader.GET_CACHE:                if(hdr.mbr == null) {                    if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null");                    return;                }                hdr=new FdHeader(FdHeader.GET_CACHE_RSP);                hdr.cachedAddrs=(Hashtable) cache.clone();                msg=new Message(hdr.mbr, null, null);                msg.putHeader(name, hdr);                passDown(new Event(Event.MSG, msg));                break;            case FdHeader.GET_CACHE_RSP:                if(hdr.cachedAddrs == null) {                    if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null");                    return;                }                get_cache_promise.setResult(hdr.cachedAddrs);                break;            }            return;            case Event.CONFIG:                if(bind_addr == null) {                    Map config=(Map)evt.getArg();                    bind_addr=(InetAddress)config.get("bind_addr");                }                break;        }        passUp(evt);                                        // pass up to the layer above us    }    public void down(Event evt) {        Address mbr, tmp_ping_dest;        View v;        switch(evt.getType()) {            case Event.UNSUSPECT:                bcast_task.removeSuspectedMember((Address)evt.getArg());                break;            case Event.CONNECT:                passDown(evt);                group_name=(String)evt.getArg();                srv_sock=Util.createServerSocket(bind_addr, start_port); // grab a random unused port above 10000

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?