fd_sock.java

来自「JGRoups源码」· Java 代码 · 共 1,269 行 · 第 1/4 页

JAVA
1,269
字号
                srv_sock_addr=new IpAddress(bind_addr, srv_sock.getLocalPort());                startServerSocket();                break;            case Event.DISCONNECT:                group_name=null;                String tmp, prefix=Global.THREAD_PREFIX;                int index;                tmp=srv_sock_handler != null? srv_sock_handler.getName() : null;                if(tmp != null) {                    index=tmp.indexOf(prefix);                    if(index > -1) {                        tmp=tmp.substring(0, index);                        srv_sock_handler.setName(tmp);                    }                }                synchronized(pinger_mutex) {                    tmp=pinger_thread != null? pinger_thread.getName() : null;                    if(tmp != null) {                        index=tmp.indexOf(prefix);                        if(index > -1) {                            tmp=tmp.substring(0, index);                            pinger_thread.setName(tmp);                        }                    }                }                stopServerSocket();                break;            case Event.VIEW_CHANGE:                v=(View) evt.getArg();                Vector new_mbrs=v.getMembers();                passDown(evt);                synchronized(this) {                    members.removeAllElements();                    members.addAll(new_mbrs);                    bcast_task.adjustSuspectedMembers(members);                    pingable_mbrs.removeAllElements();                    pingable_mbrs.addAll(members);                    if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);                    // 1. Get the addr:pid cache from the coordinator (only if not already fetched)                    if(!got_cache_from_coord) {                        getCacheFromCoordinator();                        got_cache_from_coord=true;                    }                    // 2. Broadcast my own addr:sock to all members so they can update their cache                    if(!srv_sock_sent) {                        if(srv_sock_addr != null) {                            sendIHaveSockMessage(null, // send to all members                                    local_addr,                                    srv_sock_addr);                            srv_sock_sent=true;                        }                        else                            if(warn) log.warn("(VIEW_CHANGE): srv_sock_addr == null");                    }                    // 3. Remove all entries in 'cache' which are not in the new membership                    for(Enumeration e=cache.keys(); e.hasMoreElements();) {                        mbr=(Address) e.nextElement();                        if(!members.contains(mbr))                            cache.remove(mbr);                    }                    if(members.size() > 1) {                        synchronized(pinger_mutex) {                            if(pinger_thread != null && pinger_thread.isAlive()) {                                tmp_ping_dest=determinePingDest();                                if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) {                                    interruptPingerThread(); // allows the thread to use the new socket                                }                            }                            else                                startPingerThread(); // only starts if not yet running                        }                    }                    else {                        ping_dest=null;                        stopPingerThread();                    }                }                break;            default:                passDown(evt);                break;        }    }    /**     * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its     * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on     * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,     * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless     * there are fewer than 2 members).     */    public void run() {        Address tmp_ping_dest;        IpAddress ping_addr;        int max_fetch_tries=10;  // number of times a socket address is to be requested before giving up        if(trace) log.trace("pinger_thread started"); // +++ remove        while(pinger_thread != null && Thread.currentThread().equals(pinger_thread) && running) {            tmp_ping_dest=determinePingDest(); // gets the neighbor to our right            if(log.isDebugEnabled())                log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs);            if(tmp_ping_dest == null) {                ping_dest=null;                synchronized(pinger_mutex) {                    pinger_thread=null;                }                break;            }            ping_dest=tmp_ping_dest;            ping_addr=fetchPingAddress(ping_dest);            if(ping_addr == null) {                if(!running)                    break;                if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying");                if(--max_fetch_tries <= 0)                    break;                Util.sleep(2000);                continue;            }            if(!setupPingSocket(ping_addr)) {                // covers use cases #7 and #8 in ManualTests.txt                if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest);                broadcastSuspectMessage(ping_dest);                pingable_mbrs.removeElement(ping_dest);                continue;            }            if(log.isDebugEnabled()) log.debug("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache);            // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception            try {                if(ping_input != null) {                    int c=ping_input.read();                    switch(c) {                        case NORMAL_TERMINATION:                            if(log.isDebugEnabled())                                log.debug("peer closed socket normally");                            synchronized(pinger_mutex) {                                pinger_thread=null;                            }                            break;                        case ABNORMAL_TERMINATION:                            handleSocketClose(null);                            break;                        default:                            break;                    }                }            }            catch(IOException ex) {  // we got here when the peer closed the socket --> suspect peer and then continue                handleSocketClose(ex);            }            catch(Throwable catch_all_the_rest) {                log.error("exception", catch_all_the_rest);            }        }        if(log.isDebugEnabled()) log.debug("pinger thread terminated");        synchronized(pinger_mutex) {            pinger_thread=null;        }    }    /* ----------------------------------- Private Methods -------------------------------------- */    void handleSocketClose(Exception ex) {        teardownPingSocket();     // make sure we have no leftovers        if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())            if(log.isDebugEnabled())                log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')');            broadcastSuspectMessage(ping_dest);            pingable_mbrs.removeElement(ping_dest);        }        else {            if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset");            regular_sock_close=false;        }    }    /**     * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired     */    void startPingerThread() {        running=true;        if(pinger_thread == null) {            pinger_thread=new Thread(Util.getGlobalThreadGroup(), this, "FD_SOCK Ping thread");            pinger_thread.setDaemon(true);            pinger_thread.start();            if(group_name != null) {                String tmp, prefix=Global.THREAD_PREFIX;                tmp=pinger_thread.getName();                if(tmp != null && tmp.indexOf(prefix) == -1) {                    tmp+=prefix + group_name + ")";                    pinger_thread.setName(tmp);                }            }        }    }    void stopPingerThread() {        running=false;        synchronized(pinger_mutex) {            if(pinger_thread != null && pinger_thread.isAlive()) {                regular_sock_close=true;                pinger_thread=null;                sendPingTermination(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)                teardownPingSocket();                ping_addr_promise.setResult(null);            }        }    }    // PATCH: send something so the connection handler can exit    synchronized void sendPingTermination() {        sendPingSignal(NORMAL_TERMINATION);    }    void sendPingInterrupt() {        sendPingSignal(INTERRUPT);    }    synchronized void sendPingSignal(int signal) {        if(ping_sock != null) {            try {                OutputStream out=ping_sock.getOutputStream();                if(out != null) {                    out.write(signal);                    out.flush();                }            }            catch(Throwable t) {                if(trace)                    log.trace("problem sending signal " + signalToString(signal), t);            }        }    }    /**     * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1     * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are     * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>     * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this     * code portable and we don't have to check for OSs.<p/>     * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired     * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().     */    void interruptPingerThread() {        if(pinger_thread != null && pinger_thread.isAlive()) {            regular_sock_close=true;            sendPingInterrupt();  // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)            teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job        }    }    void startServerSocket() {        if(srv_sock_handler != null) {            srv_sock_handler.start(); // won't start if already running            if(group_name != null) {                String tmp, prefix=Global.THREAD_PREFIX;                tmp=srv_sock_handler.getName();                if(tmp != null && tmp.indexOf(prefix) == -1) {                    tmp+=prefix + group_name + ")";                    srv_sock_handler.setName(tmp);                }            }        }    }    void stopServerSocket() {        if(srv_sock_handler != null)            srv_sock_handler.stop();    }    /**     * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input     */    boolean setupPingSocket(IpAddress dest) {        synchronized(sock_mutex) {            if(dest == null) {                if(log.isErrorEnabled()) log.error("destination address is null");                return false;            }            try {                ping_sock=new Socket(dest.getIpAddress(), dest.getPort());                ping_sock.setSoLinger(true, 1);                ping_sock.setKeepAlive(keep_alive);                ping_input=ping_sock.getInputStream();                return true;            }            catch(Throwable ex) {                return false;            }        }    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?