⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 flush.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.jgroups.protocols.pbcast;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.TreeSet;import java.util.Vector;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Header;import org.jgroups.Message;import org.jgroups.TimeoutException;import org.jgroups.View;import org.jgroups.ViewId;import org.jgroups.stack.Protocol;import org.jgroups.util.Promise;import org.jgroups.util.Streamable;import org.jgroups.util.Util;/** * Flush, as it name implies, forces group members to flush their pending messages  * while blocking them to send any additional messages. The process of flushing  * acquiesces the group so that state transfer or a join can be done. It is also  * called stop-the-world model as nobody will be able to send messages while a  * flush is in process. *  * <p> * Flush is needed for: * <p> * (1) 	State transfer. When a member requests state transfer, the coordinator  * 		tells everyone to stop sending messages and waits for everyone's ack. Then it asks *  	the application for its state and ships it back to the requester. After the  *  	requester has received and set the state successfully, the coordinator tells  *  	everyone to resume sending messages. * <p>   * (2) 	View changes (e.g.a join). Before installing a new view V2, flushing would  * 		ensure that all messages *sent* in the current view V1 are indeed *delivered*  * 		in V1, rather than in V2 (in all non-faulty members). This is essentially  * 		Virtual Synchrony. *  *  *  * @author Vladimir Blagojevic * @version $Id$ * @since 2.4 */public class FLUSH extends Protocol{   public static final String NAME = "FLUSH";   private View currentView;   private Address localAddress;   /**    * Group member that requested FLUSH.    * For view intallations flush coordinator is the group coordinator    * For state transfer flush coordinator is the state requesting member    */   private Address flushCoordinator;   private Collection flushMembers;   private Set flushOkSet;   private Set flushCompletedSet;   private Set stopFlushOkSet;   private final Object sharedLock = new Object();   private final Object blockMutex = new Object();   /**    * Indicates if FLUSH.down() is currently blocking threads    */   private volatile boolean isBlockState = true;   /**    * Default timeout for a group member to be in <code>isBlockState</code>    */   private long timeout = 8000;   /**    * Default timeout started when <code>Event.BLOCK</code> is passed to    * application. Response <code>Event.BLOCK_OK</code> should be received by    * application within timeout.    */   private long block_timeout = 10000;   private Set suspected;   private volatile boolean receivedFirstView = false;   private volatile boolean receivedMoreThanOneView = false;   private long startFlushTime;   private long totalTimeInFlush;   private int numberOfFlushes;   private double averageFlushDuration;   private Promise flush_promise;   private Promise blockok_promise;   /**    * If true configures timeout in GMS and STATE_TRANFER using FLUSH timeout value    */   private boolean auto_flush_conf = true;   /**    * Indicated member of the group that return last from the FLUSH round (i.e after STOP_FLUSH_OK)    * In curent design, this member is joining member and state requesting member    */   private volatile boolean shouldReturnLastFromFlush = false;   public FLUSH()   {      super();      //tmp view to avoid NPE      currentView = new View(new ViewId(), new Vector());      flushOkSet = new TreeSet();      flushCompletedSet = new TreeSet();      stopFlushOkSet = new TreeSet();      flushMembers = new ArrayList();      suspected = new TreeSet();      flush_promise = new Promise();      blockok_promise = new Promise();   }   public String getName()   {      return NAME;   }   public boolean setProperties(Properties props)   {      super.setProperties(props);      timeout = Util.parseLong(props, "timeout", timeout);      block_timeout = Util.parseLong(props, "block_timeout", block_timeout);      auto_flush_conf = Util.parseBoolean(props, "auto_flush_conf", auto_flush_conf);      if (props.size() > 0)      {         log.error("the following properties are not recognized: " + props);         return false;      }      return true;   }   public void init() throws Exception   {      if(auto_flush_conf)      {         Map map = new HashMap();         map.put("flush_timeout", new Long(timeout));         passUp(new Event(Event.CONFIG, map));         passDown(new Event(Event.CONFIG, map));      }   }   public void start() throws Exception   {      Map map = new HashMap();      map.put("flush_supported", Boolean.TRUE);      passUp(new Event(Event.CONFIG, map));      passDown(new Event(Event.CONFIG, map));      receivedFirstView = false;      receivedMoreThanOneView = false;      isBlockState = true;      shouldReturnLastFromFlush = false;   }   public void stop()   {      synchronized (sharedLock)      {         currentView = new View(new ViewId(), new Vector());         flushCompletedSet.clear();         flushOkSet.clear();         stopFlushOkSet.clear();         flushMembers.clear();         suspected.clear();         flushCoordinator = null;      }   }   /* -------------------JMX attributes and operations --------------------- */   public double getAverageFlushDuration()   {      return averageFlushDuration;   }   public long getTotalTimeInFlush()   {      return totalTimeInFlush;   }   public int getNumberOfFlushes()   {      return numberOfFlushes;   }   public boolean startFlush(long timeout)   {      boolean successfulFlush = false;      down(new Event(Event.SUSPEND));      flush_promise.reset();      try      {         flush_promise.getResultWithTimeout(timeout);         successfulFlush = true;      }      catch (TimeoutException e)      {      }      return successfulFlush;   }   public void stopFlush()   {      down(new Event(Event.RESUME));   }   /* ------------------- end JMX attributes and operations --------------------- */   public void down(Event evt)   {      switch (evt.getType())      {         case Event.GET_STATE:         case Event.MSG :            boolean shouldSuspendByItself = false;            long start=0, stop=0;            synchronized (blockMutex)            {               while (isFlushRunning())               {                  if (log.isDebugEnabled())                     log.debug("FLUSH block at " + localAddress + " for " + (timeout <= 0? "ever" : timeout + "ms"));                  try                  {                     start=System.currentTimeMillis();                     if(timeout <= 0)                        blockMutex.wait();                     else                        blockMutex.wait(timeout);                     stop=System.currentTimeMillis();                     if (isFlushRunning())                     {                        isBlockState = false;                        shouldSuspendByItself = true;                     }                  }                  catch (InterruptedException e)                  {                  }               }            }            if(shouldSuspendByItself)            {               log.warn("unblocking FLUSH.down() at " + localAddress + " after timeout of " + (stop-start) + "ms");               passUp(new Event(Event.SUSPEND_OK));               passDown(new Event(Event.SUSPEND_OK));            }            break;         case Event.CONNECT:            boolean successfulBlock = sendBlockUpToChannel(block_timeout);            if (successfulBlock && log.isDebugEnabled())            {               log.debug("Blocking of channel " + localAddress + " completed successfully");            }            //member sending JOIN request returns last from FLUSH            shouldReturnLastFromFlush = true;            break;         case Event.SUSPEND :            onSuspend((View) evt.getArg());            return;         case Event.RESUME :            onResume();            return;         case Event.BLOCK_OK:            blockok_promise.setResult(Boolean.TRUE);            return;      }      passDown(evt);   }   public void up(Event evt)   {      Message msg = null;      switch (evt.getType())      {         case Event.MSG :            msg = (Message) evt.getArg();            FlushHeader fh = (FlushHeader) msg.removeHeader(getName());            if (fh != null)            {               if (fh.type == FlushHeader.START_FLUSH)               {                  boolean successfulBlock = sendBlockUpToChannel(block_timeout);                  if (successfulBlock && log.isDebugEnabled())                  {                     log.debug("Blocking of channel " + localAddress + " completed successfully");                  }                  onStartFlush(msg.getSrc(), fh);               }               else if (fh.type == FlushHeader.STOP_FLUSH)               {                  onStopFlush();               }               else if (isCurrentFlushMessage(fh))               {                  if (fh.type == FlushHeader.FLUSH_OK)                  {                     onFlushOk(msg.getSrc(), fh.viewID);                  }                  else if (fh.type == FlushHeader.STOP_FLUSH_OK)                  {                     onStopFlushOk(msg.getSrc(),fh.viewID);                  }                  else if (fh.type == FlushHeader.FLUSH_COMPLETED)                  {                     onFlushCompleted(msg.getSrc());                  }               }               else               {                  if (log.isDebugEnabled())                     log.debug(localAddress + " received outdated FLUSH message " + fh + ",ignoring it.");               }               return; //do not pass FLUSH msg up            }            break;         case Event.VIEW_CHANGE :            //if this is channel's first view and its the only member of the group then the            //goal is to pass BLOCK,VIEW,UNBLOCK to application space on the same thread as VIEW.            View newView = (View) evt.getArg();            boolean firstView = onViewChange(newView);            boolean singletonMember = newView.size()==1 && newView.containsMember(localAddress);            if(firstView && singletonMember){               passUp(evt);               synchronized (blockMutex)               {                  isBlockState = false;                  blockMutex.notifyAll();               }               if (log.isDebugEnabled())                  log.debug("At " + localAddress + " unblocking FLUSH.down() and sending UNBLOCK up");               shouldReturnLastFromFlush = false;               passUp(new Event(Event.UNBLOCK));               return;            }            break;         case Event.SET_LOCAL_ADDRESS :            localAddress = (Address) evt.getArg();            break;         case Event.SUSPECT :            onSuspect((Address) evt.getArg());            break;         case Event.SUSPEND :            onSuspend((View) evt.getArg());            return;         case Event.RESUME :            onResume();            return;         //state requesting member are returning last from flush round (i.e. JChannel.getState())         case Event.STATE_TRANSFER_INPUTSTREAM:         case Event.GET_STATE_OK:            shouldReturnLastFromFlush = true;            break;      }      passUp(evt);   }   public Vector providedDownServices()   {      Vector retval = new Vector(2);      retval.addElement(new Integer(Event.SUSPEND));      retval.addElement(new Integer(Event.RESUME));      return retval;   }      private boolean sendBlockUpToChannel(long btimeout)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -