fd_sock.java
来自「JGRoups源码」· Java 代码 · 共 1,269 行 · 第 1/4 页
JAVA
1,269 行
srv_sock_addr=new IpAddress(bind_addr, srv_sock.getLocalPort()); startServerSocket(); break; case Event.DISCONNECT: group_name=null; String tmp, prefix=Global.THREAD_PREFIX; int index; tmp=srv_sock_handler != null? srv_sock_handler.getName() : null; if(tmp != null) { index=tmp.indexOf(prefix); if(index > -1) { tmp=tmp.substring(0, index); srv_sock_handler.setName(tmp); } } synchronized(pinger_mutex) { tmp=pinger_thread != null? pinger_thread.getName() : null; if(tmp != null) { index=tmp.indexOf(prefix); if(index > -1) { tmp=tmp.substring(0, index); pinger_thread.setName(tmp); } } } stopServerSocket(); break; case Event.VIEW_CHANGE: v=(View) evt.getArg(); Vector new_mbrs=v.getMembers(); passDown(evt); synchronized(this) { members.removeAllElements(); members.addAll(new_mbrs); bcast_task.adjustSuspectedMembers(members); pingable_mbrs.removeAllElements(); pingable_mbrs.addAll(members); if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members); // 1. Get the addr:pid cache from the coordinator (only if not already fetched) if(!got_cache_from_coord) { getCacheFromCoordinator(); got_cache_from_coord=true; } // 2. Broadcast my own addr:sock to all members so they can update their cache if(!srv_sock_sent) { if(srv_sock_addr != null) { sendIHaveSockMessage(null, // send to all members local_addr, srv_sock_addr); srv_sock_sent=true; } else if(warn) log.warn("(VIEW_CHANGE): srv_sock_addr == null"); } // 3. Remove all entries in 'cache' which are not in the new membership for(Enumeration e=cache.keys(); e.hasMoreElements();) { mbr=(Address) e.nextElement(); if(!members.contains(mbr)) cache.remove(mbr); } if(members.size() > 1) { synchronized(pinger_mutex) { if(pinger_thread != null && pinger_thread.isAlive()) { tmp_ping_dest=determinePingDest(); if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) { interruptPingerThread(); // allows the thread to use the new socket } } else startPingerThread(); // only starts if not yet running } } else { ping_dest=null; stopPingerThread(); } } break; default: passDown(evt); break; } } /** * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly, * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless * there are fewer than 2 members). */ public void run() { Address tmp_ping_dest; IpAddress ping_addr; int max_fetch_tries=10; // number of times a socket address is to be requested before giving up if(trace) log.trace("pinger_thread started"); // +++ remove while(pinger_thread != null && Thread.currentThread().equals(pinger_thread) && running) { tmp_ping_dest=determinePingDest(); // gets the neighbor to our right if(log.isDebugEnabled()) log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs); if(tmp_ping_dest == null) { ping_dest=null; synchronized(pinger_mutex) { pinger_thread=null; } break; } ping_dest=tmp_ping_dest; ping_addr=fetchPingAddress(ping_dest); if(ping_addr == null) { if(!running) break; if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying"); if(--max_fetch_tries <= 0) break; Util.sleep(2000); continue; } if(!setupPingSocket(ping_addr)) { // covers use cases #7 and #8 in ManualTests.txt if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest); broadcastSuspectMessage(ping_dest); pingable_mbrs.removeElement(ping_dest); continue; } if(log.isDebugEnabled()) log.debug("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache); // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception try { if(ping_input != null) { int c=ping_input.read(); switch(c) { case NORMAL_TERMINATION: if(log.isDebugEnabled()) log.debug("peer closed socket normally"); synchronized(pinger_mutex) { pinger_thread=null; } break; case ABNORMAL_TERMINATION: handleSocketClose(null); break; default: break; } } } catch(IOException ex) { // we got here when the peer closed the socket --> suspect peer and then continue handleSocketClose(ex); } catch(Throwable catch_all_the_rest) { log.error("exception", catch_all_the_rest); } } if(log.isDebugEnabled()) log.debug("pinger thread terminated"); synchronized(pinger_mutex) { pinger_thread=null; } } /* ----------------------------------- Private Methods -------------------------------------- */ void handleSocketClose(Exception ex) { teardownPingSocket(); // make sure we have no leftovers if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread()) if(log.isDebugEnabled()) log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')'); broadcastSuspectMessage(ping_dest); pingable_mbrs.removeElement(ping_dest); } else { if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset"); regular_sock_close=false; } } /** * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired */ void startPingerThread() { running=true; if(pinger_thread == null) { pinger_thread=new Thread(Util.getGlobalThreadGroup(), this, "FD_SOCK Ping thread"); pinger_thread.setDaemon(true); pinger_thread.start(); if(group_name != null) { String tmp, prefix=Global.THREAD_PREFIX; tmp=pinger_thread.getName(); if(tmp != null && tmp.indexOf(prefix) == -1) { tmp+=prefix + group_name + ")"; pinger_thread.setName(tmp); } } } } void stopPingerThread() { running=false; synchronized(pinger_mutex) { if(pinger_thread != null && pinger_thread.isAlive()) { regular_sock_close=true; pinger_thread=null; sendPingTermination(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246) teardownPingSocket(); ping_addr_promise.setResult(null); } } } // PATCH: send something so the connection handler can exit synchronized void sendPingTermination() { sendPingSignal(NORMAL_TERMINATION); } void sendPingInterrupt() { sendPingSignal(INTERRUPT); } synchronized void sendPingSignal(int signal) { if(ping_sock != null) { try { OutputStream out=ping_sock.getOutputStream(); if(out != null) { out.write(signal); out.flush(); } } catch(Throwable t) { if(trace) log.trace("problem sending signal " + signalToString(signal), t); } } } /** * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1 * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p> * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this * code portable and we don't have to check for OSs.<p/> * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read(). */ void interruptPingerThread() { if(pinger_thread != null && pinger_thread.isAlive()) { regular_sock_close=true; sendPingInterrupt(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246) teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job } } void startServerSocket() { if(srv_sock_handler != null) { srv_sock_handler.start(); // won't start if already running if(group_name != null) { String tmp, prefix=Global.THREAD_PREFIX; tmp=srv_sock_handler.getName(); if(tmp != null && tmp.indexOf(prefix) == -1) { tmp+=prefix + group_name + ")"; srv_sock_handler.setName(tmp); } } } } void stopServerSocket() { if(srv_sock_handler != null) srv_sock_handler.stop(); } /** * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input */ boolean setupPingSocket(IpAddress dest) { synchronized(sock_mutex) { if(dest == null) { if(log.isErrorEnabled()) log.error("destination address is null"); return false; } try { ping_sock=new Socket(dest.getIpAddress(), dest.getPort()); ping_sock.setSoLinger(true, 1); ping_sock.setKeepAlive(keep_alive); ping_input=ping_sock.getInputStream(); return true; } catch(Throwable ex) { return false; } } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?