📄 flush.java
字号:
package org.jgroups.protocols.pbcast;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.TreeSet;import java.util.Vector;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Header;import org.jgroups.Message;import org.jgroups.TimeoutException;import org.jgroups.View;import org.jgroups.ViewId;import org.jgroups.stack.Protocol;import org.jgroups.util.Promise;import org.jgroups.util.Streamable;import org.jgroups.util.Util;/** * Flush, as it name implies, forces group members to flush their pending messages * while blocking them to send any additional messages. The process of flushing * acquiesces the group so that state transfer or a join can be done. It is also * called stop-the-world model as nobody will be able to send messages while a * flush is in process. * * <p> * Flush is needed for: * <p> * (1) State transfer. When a member requests state transfer, the coordinator * tells everyone to stop sending messages and waits for everyone's ack. Then it asks * the application for its state and ships it back to the requester. After the * requester has received and set the state successfully, the coordinator tells * everyone to resume sending messages. * <p> * (2) View changes (e.g.a join). Before installing a new view V2, flushing would * ensure that all messages *sent* in the current view V1 are indeed *delivered* * in V1, rather than in V2 (in all non-faulty members). This is essentially * Virtual Synchrony. * * * * @author Vladimir Blagojevic * @version $Id$ * @since 2.4 */public class FLUSH extends Protocol{ public static final String NAME = "FLUSH"; private View currentView; private Address localAddress; /** * Group member that requested FLUSH. * For view intallations flush coordinator is the group coordinator * For state transfer flush coordinator is the state requesting member */ private Address flushCoordinator; private Collection flushMembers; private Set flushOkSet; private Set flushCompletedSet; private Set stopFlushOkSet; private final Object sharedLock = new Object(); private final Object blockMutex = new Object(); /** * Indicates if FLUSH.down() is currently blocking threads */ private volatile boolean isBlockState = true; /** * Default timeout for a group member to be in <code>isBlockState</code> */ private long timeout = 8000; /** * Default timeout started when <code>Event.BLOCK</code> is passed to * application. Response <code>Event.BLOCK_OK</code> should be received by * application within timeout. */ private long block_timeout = 10000; private Set suspected; private volatile boolean receivedFirstView = false; private volatile boolean receivedMoreThanOneView = false; private long startFlushTime; private long totalTimeInFlush; private int numberOfFlushes; private double averageFlushDuration; private Promise flush_promise; private Promise blockok_promise; /** * If true configures timeout in GMS and STATE_TRANFER using FLUSH timeout value */ private boolean auto_flush_conf = true; /** * Indicated member of the group that return last from the FLUSH round (i.e after STOP_FLUSH_OK) * In curent design, this member is joining member and state requesting member */ private volatile boolean shouldReturnLastFromFlush = false; public FLUSH() { super(); //tmp view to avoid NPE currentView = new View(new ViewId(), new Vector()); flushOkSet = new TreeSet(); flushCompletedSet = new TreeSet(); stopFlushOkSet = new TreeSet(); flushMembers = new ArrayList(); suspected = new TreeSet(); flush_promise = new Promise(); blockok_promise = new Promise(); } public String getName() { return NAME; } public boolean setProperties(Properties props) { super.setProperties(props); timeout = Util.parseLong(props, "timeout", timeout); block_timeout = Util.parseLong(props, "block_timeout", block_timeout); auto_flush_conf = Util.parseBoolean(props, "auto_flush_conf", auto_flush_conf); if (props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } public void init() throws Exception { if(auto_flush_conf) { Map map = new HashMap(); map.put("flush_timeout", new Long(timeout)); passUp(new Event(Event.CONFIG, map)); passDown(new Event(Event.CONFIG, map)); } } public void start() throws Exception { Map map = new HashMap(); map.put("flush_supported", Boolean.TRUE); passUp(new Event(Event.CONFIG, map)); passDown(new Event(Event.CONFIG, map)); receivedFirstView = false; receivedMoreThanOneView = false; isBlockState = true; shouldReturnLastFromFlush = false; } public void stop() { synchronized (sharedLock) { currentView = new View(new ViewId(), new Vector()); flushCompletedSet.clear(); flushOkSet.clear(); stopFlushOkSet.clear(); flushMembers.clear(); suspected.clear(); flushCoordinator = null; } } /* -------------------JMX attributes and operations --------------------- */ public double getAverageFlushDuration() { return averageFlushDuration; } public long getTotalTimeInFlush() { return totalTimeInFlush; } public int getNumberOfFlushes() { return numberOfFlushes; } public boolean startFlush(long timeout) { boolean successfulFlush = false; down(new Event(Event.SUSPEND)); flush_promise.reset(); try { flush_promise.getResultWithTimeout(timeout); successfulFlush = true; } catch (TimeoutException e) { } return successfulFlush; } public void stopFlush() { down(new Event(Event.RESUME)); } /* ------------------- end JMX attributes and operations --------------------- */ public void down(Event evt) { switch (evt.getType()) { case Event.GET_STATE: case Event.MSG : boolean shouldSuspendByItself = false; long start=0, stop=0; synchronized (blockMutex) { while (isFlushRunning()) { if (log.isDebugEnabled()) log.debug("FLUSH block at " + localAddress + " for " + (timeout <= 0? "ever" : timeout + "ms")); try { start=System.currentTimeMillis(); if(timeout <= 0) blockMutex.wait(); else blockMutex.wait(timeout); stop=System.currentTimeMillis(); if (isFlushRunning()) { isBlockState = false; shouldSuspendByItself = true; } } catch (InterruptedException e) { } } } if(shouldSuspendByItself) { log.warn("unblocking FLUSH.down() at " + localAddress + " after timeout of " + (stop-start) + "ms"); passUp(new Event(Event.SUSPEND_OK)); passDown(new Event(Event.SUSPEND_OK)); } break; case Event.CONNECT: boolean successfulBlock = sendBlockUpToChannel(block_timeout); if (successfulBlock && log.isDebugEnabled()) { log.debug("Blocking of channel " + localAddress + " completed successfully"); } //member sending JOIN request returns last from FLUSH shouldReturnLastFromFlush = true; break; case Event.SUSPEND : onSuspend((View) evt.getArg()); return; case Event.RESUME : onResume(); return; case Event.BLOCK_OK: blockok_promise.setResult(Boolean.TRUE); return; } passDown(evt); } public void up(Event evt) { Message msg = null; switch (evt.getType()) { case Event.MSG : msg = (Message) evt.getArg(); FlushHeader fh = (FlushHeader) msg.removeHeader(getName()); if (fh != null) { if (fh.type == FlushHeader.START_FLUSH) { boolean successfulBlock = sendBlockUpToChannel(block_timeout); if (successfulBlock && log.isDebugEnabled()) { log.debug("Blocking of channel " + localAddress + " completed successfully"); } onStartFlush(msg.getSrc(), fh); } else if (fh.type == FlushHeader.STOP_FLUSH) { onStopFlush(); } else if (isCurrentFlushMessage(fh)) { if (fh.type == FlushHeader.FLUSH_OK) { onFlushOk(msg.getSrc(), fh.viewID); } else if (fh.type == FlushHeader.STOP_FLUSH_OK) { onStopFlushOk(msg.getSrc(),fh.viewID); } else if (fh.type == FlushHeader.FLUSH_COMPLETED) { onFlushCompleted(msg.getSrc()); } } else { if (log.isDebugEnabled()) log.debug(localAddress + " received outdated FLUSH message " + fh + ",ignoring it."); } return; //do not pass FLUSH msg up } break; case Event.VIEW_CHANGE : //if this is channel's first view and its the only member of the group then the //goal is to pass BLOCK,VIEW,UNBLOCK to application space on the same thread as VIEW. View newView = (View) evt.getArg(); boolean firstView = onViewChange(newView); boolean singletonMember = newView.size()==1 && newView.containsMember(localAddress); if(firstView && singletonMember){ passUp(evt); synchronized (blockMutex) { isBlockState = false; blockMutex.notifyAll(); } if (log.isDebugEnabled()) log.debug("At " + localAddress + " unblocking FLUSH.down() and sending UNBLOCK up"); shouldReturnLastFromFlush = false; passUp(new Event(Event.UNBLOCK)); return; } break; case Event.SET_LOCAL_ADDRESS : localAddress = (Address) evt.getArg(); break; case Event.SUSPECT : onSuspect((Address) evt.getArg()); break; case Event.SUSPEND : onSuspend((View) evt.getArg()); return; case Event.RESUME : onResume(); return; //state requesting member are returning last from flush round (i.e. JChannel.getState()) case Event.STATE_TRANSFER_INPUTSTREAM: case Event.GET_STATE_OK: shouldReturnLastFromFlush = true; break; } passUp(evt); } public Vector providedDownServices() { Vector retval = new Vector(2); retval.addElement(new Integer(Event.SUSPEND)); retval.addElement(new Integer(Event.RESUME)); return retval; } private boolean sendBlockUpToChannel(long btimeout)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -