fd_sock.java
来自「JGRoups源码」· Java 代码 · 共 1,269 行 · 第 1/4 页
JAVA
1,269 行
// $Id: FD_SOCK.java,v 1.51 2006/10/30 11:09:40 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.*;import java.io.*;import java.net.InetAddress;import java.net.ServerSocket;import java.net.Socket;import java.net.UnknownHostException;import java.util.*;import java.util.List;/** * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a * server socket and announces its address together with the server socket's address in a multicast. A * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity * will take place between 2 peers as long as they are alive (i.e. have their server sockets open). * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore * they won't be detected. * The FD_SOCK protocol will work for groups where members are on different hosts<p> * The costs involved are 2 additional threads: one that * monitors the client side of the socket connection (to monitor a peer) and another one that manages the * server socket. However, those threads will be idle as long as both peers are running. * @author Bela Ban May 29 2001 */public class FD_SOCK extends Protocol implements Runnable { long get_cache_timeout=3000; // msecs to wait for the socket cache from the coordinator static final long get_cache_retry_timeout=500; // msecs to wait until we retry getting the cache from coord long suspect_msg_interval=5000; // (BroadcastTask): mcast SUSPECT every 5000 msecs int num_tries=3; // attempts coord is solicited for socket cache until we give up final Vector members=new Vector(11); // list of group members (updated on VIEW_CHANGE) boolean srv_sock_sent=false; // has own socket been broadcast yet ? final Vector pingable_mbrs=new Vector(11); // mbrs from which we select ping_dest. may be subset of 'members' final Promise get_cache_promise=new Promise(); // used for rendezvous on GET_CACHE and GET_CACHE_RSP boolean got_cache_from_coord=false; // was cache already fetched ? Address local_addr=null; // our own address ServerSocket srv_sock=null; // server socket to which another member connects to monitor me InetAddress bind_addr=null; // the NIC on which the ServerSocket should listen String group_name=null; // the name of the group (set on CONNECT, nulled on DISCONNECT) /** @deprecated Use {@link bind_addr} instead */ InetAddress srv_sock_bind_addr=null; // the NIC on which the ServerSocket should listen private ServerSocketHandler srv_sock_handler=null; // accepts new connections on srv_sock IpAddress srv_sock_addr=null; // pair of server_socket:port Address ping_dest=null; // address of the member we monitor Socket ping_sock=null; // socket to the member we monitor InputStream ping_input=null; // input stream of the socket to the member we monitor Thread pinger_thread=null; // listens on ping_sock, suspects member if socket is closed final Object pinger_mutex=new Object(); final Hashtable cache=new Hashtable(11); // keys=Addresses, vals=IpAddresses (socket:port) /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default) * picks a random port */ int start_port=0; final Promise ping_addr_promise=new Promise(); // to fetch the ping_addr for ping_dest final Object sock_mutex=new Object(); // for access to ping_sock, ping_input TimeScheduler timer=null; private final BroadcastTask bcast_task=new BroadcastTask(); // to transmit SUSPECT message (until view change) boolean regular_sock_close=false; // used by interruptPingerThread() when new ping_dest is computed int num_suspect_events=0; private static final int INTERRUPT =8; private static final int NORMAL_TERMINATION=9; private static final int ABNORMAL_TERMINATION=-1; private static final String name="FD_SOCK"; BoundedList suspect_history=new BoundedList(20); /** whether to use KEEP_ALIVE on the ping socket or not */ private boolean keep_alive=true; private boolean running=false; public String getName() { return name; } public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";} public String getMembers() {return members != null? members.toString() : "null";} public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";} public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";} public int getNumSuspectEventsGenerated() {return num_suspect_events;} public String printSuspectHistory() { StringBuffer sb=new StringBuffer(); for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) { sb.append(new Date()).append(": ").append(en.nextElement()).append("\n"); } return sb.toString(); } public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("get_cache_timeout"); if(str != null) { get_cache_timeout=Long.parseLong(str); props.remove("get_cache_timeout"); } str=props.getProperty("suspect_msg_interval"); if(str != null) { suspect_msg_interval=Long.parseLong(str); props.remove("suspect_msg_interval"); } str=props.getProperty("num_tries"); if(str != null) { num_tries=Integer.parseInt(str); props.remove("num_tries"); } str=props.getProperty("start_port"); if(str != null) { start_port=Integer.parseInt(str); props.remove("start_port"); } str=props.getProperty("keep_alive"); if(str != null) { keep_alive=new Boolean(str).booleanValue(); props.remove("keep_alive"); } str=props.getProperty("srv_sock_bind_addr"); if(str != null) { log.error("srv_sock_bind_addr is deprecated and will be ignored - use bind_addr instead"); props.remove("srv_sock_bind_addr"); } boolean ignore_systemprops=Util.isBindAddressPropertyIgnored(); str=Util.getProperty(new String[]{Global.BIND_ADDR, Global.BIND_ADDR_OLD}, props, "bind_addr", ignore_systemprops, null); if(str != null) { try { bind_addr=InetAddress.getByName(str); } catch(UnknownHostException unknown) { log.error("(bind_addr): host " + str + " not known"); return false; } props.remove("bind_addr"); } if(props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } public void init() throws Exception { srv_sock_handler=new ServerSocketHandler(); timer=stack != null ? stack.timer : null; if(timer == null) throw new Exception("FD_SOCK.init(): timer == null"); } public void start() throws Exception { super.start(); running=true; } public void stop() { running=false; bcast_task.removeAll(); stopPingerThread(); stopServerSocket(); } public void resetStats() { super.resetStats(); num_suspect_events=0; suspect_history.removeAll(); } public void up(Event evt) { Message msg; FdHeader hdr; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address) evt.getArg(); break; case Event.MSG: msg=(Message) evt.getArg(); hdr=(FdHeader) msg.removeHeader(name); if(hdr == null) break; // message did not originate from FD_SOCK layer, just pass up switch(hdr.type) { case FdHeader.SUSPECT: if(hdr.mbrs != null) { if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr); for(int i=0; i < hdr.mbrs.size(); i++) { Address m=(Address)hdr.mbrs.elementAt(i); if(local_addr != null && m.equals(local_addr)) { if(warn) log.warn("I was suspected by " + msg.getSrc() + "; ignoring the SUSPECT message"); continue; } passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i))); passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i))); } } else if(warn) log.warn("[SUSPECT]: hdr.mbrs == null"); break; // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it case FdHeader.WHO_HAS_SOCK: if(local_addr != null && local_addr.equals(msg.getSrc())) return; // don't reply to WHO_HAS bcasts sent by me ! if(hdr.mbr == null) { if(log.isErrorEnabled()) log.error("hdr.mbr is null"); return; } if(trace) log.trace("who-has-sock " + hdr.mbr); // 1. Try my own address, maybe it's me whose socket is wanted if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) { sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr); // unicast message to msg.getSrc() return; } // 2. If I don't have it, maybe it is in the cache if(cache.containsKey(hdr.mbr)) sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr)); // ucast msg break; // Update the cache with the addr:sock_addr entry (if on the same host) case FdHeader.I_HAVE_SOCK: if(hdr.mbr == null || hdr.sock_addr == null) { if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null"); return; } // if(!cache.containsKey(hdr.mbr)) cache.put(hdr.mbr, hdr.sock_addr); // update the cache if(trace) log.trace("i-have-sock: " + hdr.mbr + " --> " + hdr.sock_addr + " (cache is " + cache + ')'); if(ping_dest != null && hdr.mbr.equals(ping_dest)) ping_addr_promise.setResult(hdr.sock_addr); break; // Return the cache to the sender of this message case FdHeader.GET_CACHE: if(hdr.mbr == null) { if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null"); return; } hdr=new FdHeader(FdHeader.GET_CACHE_RSP); hdr.cachedAddrs=(Hashtable) cache.clone(); msg=new Message(hdr.mbr, null, null); msg.putHeader(name, hdr); passDown(new Event(Event.MSG, msg)); break; case FdHeader.GET_CACHE_RSP: if(hdr.cachedAddrs == null) { if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null"); return; } get_cache_promise.setResult(hdr.cachedAddrs); break; } return; case Event.CONFIG: if(bind_addr == null) { Map config=(Map)evt.getArg(); bind_addr=(InetAddress)config.get("bind_addr"); } break; } passUp(evt); // pass up to the layer above us } public void down(Event evt) { Address mbr, tmp_ping_dest; View v; switch(evt.getType()) { case Event.UNSUSPECT: bcast_task.removeSuspectedMember((Address)evt.getArg()); break; case Event.CONNECT: passDown(evt); group_name=(String)evt.getArg(); srv_sock=Util.createServerSocket(bind_addr, start_port); // grab a random unused port above 10000
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?