📄 streaming_state_transfer.java
字号:
package org.jgroups.protocols.pbcast;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.ObjectInput;import java.io.ObjectInputStream;import java.io.ObjectOutput;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.net.UnknownHostException;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.Vector;import org.jgroups.Address;import org.jgroups.Channel;import org.jgroups.Event;import org.jgroups.Global;import org.jgroups.Header;import org.jgroups.Message;import org.jgroups.TimeoutException;import org.jgroups.View;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.Promise;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;/** * <code>STREAMING_STATE_TRANSFER</code>, as its name implies, allows a streaming * state transfer between two channel instances. * * <p> * * Major advantage of this approach is that transfering application state to a * joining member of a group does not entail loading of the complete application * state into memory. Application state, for example, might be located entirely * on some form of disk based storage. The default <code>STATE_TRANSFER</code> * requires this state to be loaded entirely into memory before being transferred * to a group member while <code>STREAMING_STATE_TRANSFER</code> does not. * Thus <code>STREAMING_STATE_TRANSFER</code> protocol is able to transfer * application state that is very large (>1Gb) without a likelihood of such transfer * resulting in OutOfMemoryException. * * <p> * * Channel instance can be configured with either <code>STREAMING_STATE_TRANSFER</code> * or <code>STATE_TRANSFER</code> but not both protocols at the same time. * * <p> * * In order to process streaming state transfer an application has to implement * <code>ExtendedMessageListener</code> if it is using channel in a push style * mode or it has to process <code>StreamingSetStateEvent</code> and * <code>StreamingGetStateEvent</code> if it is using channel in a pull style mode. * * * @author Vladimir Blagojevic * @see org.jgroups.ExtendedMessageListener * @see org.jgroups.StreamingGetStateEvent * @see org.jgroups.StreamingSetStateEvent * @see org.jgroups.protocols.pbcast.STATE_TRANSFER * @since 2.4 * * @version $Id$ * */public class STREAMING_STATE_TRANSFER extends Protocol{ Address local_addr = null; final Vector members = new Vector(); final Map state_requesters = new HashMap(); /** set to true while waiting for a STATE_RSP */ boolean waiting_for_state_response = false; Digest digest = null; final HashMap map = new HashMap(); // to store configuration information int num_state_reqs = 0; long num_bytes_sent = 0; double avg_state_size = 0; final static String NAME = "STREAMING_STATE_TRANSFER"; private InetAddress bind_addr; private int bind_port = 0; private StateProviderThreadSpawner spawner; private int max_pool = 5; private long pool_thread_keep_alive; private int socket_buffer_size = 8 * 1024; private boolean use_reading_thread; private Promise flush_promise = new Promise();; private volatile boolean use_flush; private long flush_timeout = 4000; private final Object poolLock = new Object(); private int threadCounter; public final String getName() { return NAME; } public int getNumberOfStateRequests() { return num_state_reqs; } public long getNumberOfStateBytesSent() { return num_bytes_sent; } public double getAverageStateSize() { return avg_state_size; } public Vector requiredDownServices() { Vector retval = new Vector(); retval.addElement(new Integer(Event.GET_DIGEST_STATE)); retval.addElement(new Integer(Event.SET_DIGEST)); return retval; } public Vector requiredUpServices() { Vector retval = new Vector(); if (use_flush) { retval.addElement(new Integer(Event.SUSPEND)); retval.addElement(new Integer(Event.RESUME)); } return retval; } public void resetStats() { super.resetStats(); num_state_reqs = 0; num_bytes_sent = 0; avg_state_size = 0; } public boolean setProperties(Properties props) { super.setProperties(props); use_flush = Util.parseBoolean(props, "use_flush", false); flush_timeout = Util.parseLong(props, "flush_timeout", flush_timeout); try { bind_addr = Util.parseBindAddress(props, "bind_addr"); } catch (UnknownHostException e) { log.error("(bind_addr): host " + e.getLocalizedMessage() + " not known"); return false; } bind_port = Util.parseInt(props, "start_port", 0); socket_buffer_size = Util.parseInt(props, "socket_buffer_size", 8 * 1024); //8K max_pool = Util.parseInt(props, "max_pool", 5); pool_thread_keep_alive = Util.parseLong(props, "pool_thread_keep_alive", 1000 * 30); //30 sec use_reading_thread = Util.parseBoolean(props, "use_reading_thread", false); if (props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } public void init() throws Exception { map.put("state_transfer", Boolean.TRUE); map.put("protocol_class", getClass().getName()); } public void start() throws Exception { passUp(new Event(Event.CONFIG, map)); } public void stop() { super.stop(); waiting_for_state_response = false; if (spawner != null) { spawner.stop(); } } public void up(Event evt) { switch (evt.getType()) { case Event.BECOME_SERVER : break; case Event.SET_LOCAL_ADDRESS : local_addr = (Address) evt.getArg(); break; case Event.TMP_VIEW : case Event.VIEW_CHANGE : handleViewChange((View) evt.getArg()); break; case Event.GET_DIGEST_STATE_OK : synchronized (state_requesters) { digest = (Digest) evt.getArg(); if (log.isDebugEnabled()) log.debug("GET_DIGEST_STATE_OK: digest is " + digest); } respondToStateRequester(); return; case Event.MSG : Message msg = (Message) evt.getArg(); StateHeader hdr = (StateHeader) msg.removeHeader(getName()); if (hdr != null) { switch (hdr.type) { case StateHeader.STATE_REQ : handleStateReq(hdr); break; case StateHeader.STATE_RSP : handleStateRsp(hdr); break; case StateHeader.STATE_REMOVE_REQUESTER : removeFromStateRequesters(hdr.sender, hdr.state_id); break; default : if (log.isErrorEnabled()) log.error("type " + hdr.type + " not known in StateHeader"); break; } return; } break; case Event.CONFIG : Map config = (Map) evt.getArg(); if (bind_addr == null && (config != null && config.containsKey("bind_addr"))) { bind_addr = (InetAddress) config.get("bind_addr"); if (log.isDebugEnabled()) log.debug("using bind_addr from CONFIG event " + bind_addr); } break; } passUp(evt); } public void down(Event evt) { Address target; StateTransferInfo info; switch (evt.getType()) { case Event.TMP_VIEW : case Event.VIEW_CHANGE : handleViewChange((View) evt.getArg()); break; case Event.GET_STATE : info = (StateTransferInfo) evt.getArg(); if (info.target == null) { target = determineCoordinator(); } else { target = info.target; if (target.equals(local_addr)) { if (log.isErrorEnabled()) log.error("GET_STATE: cannot fetch state from myself !"); target = null; } } if (target == null) { if (log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)"); passUp(new Event(Event.GET_STATE_OK, new StateTransferInfo())); } else { if (use_flush) { startFlush(flush_timeout); } Message state_req = new Message(target, null, null); state_req.putHeader(NAME, new StateHeader(StateHeader.STATE_REQ, local_addr, info.state_id)); if (log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state"); // suspend sending and handling of mesage garbage collection gossip messages, // fixes bugs #943480 and #938584). Wake up when state has been received if (log.isDebugEnabled()) log.debug("passing down a SUSPEND_STABLE event"); passDown(new Event(Event.SUSPEND_STABLE, new Long(info.timeout))); waiting_for_state_response = true; passDown(new Event(Event.MSG, state_req)); } return; // don't pass down any further ! case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED : if (use_flush) { stopFlush(); } if (log.isDebugEnabled()) log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received"); //resume sending and handling of message garbage collection gossip messages, // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage // collection protocol (e.g. STABLE) if (log.isDebugEnabled()) log.debug("passing down a RESUME_STABLE event"); passDown(new Event(Event.RESUME_STABLE)); return; case Event.SUSPEND_OK : if (use_flush) { flush_promise.setResult(Boolean.TRUE); } break; case Event.CONFIG : Map config = (Map) evt.getArg(); if(config != null && config.containsKey("flush_timeout")) { Long ftimeout = (Long) config.get("flush_timeout"); use_flush = true; flush_timeout = ftimeout.longValue(); } break; } passDown(evt); // pass on to the layer below us } /* --------------------------- Private Methods -------------------------------- */ /** * When FLUSH is used we do not need to pass digests between members * * see JGroups/doc/design/PArtialStateTransfer.txt * see JGroups/doc/design/FLUSH.txt * * @return true if use of digests is required, false otherwise */ private boolean isDigestNeeded() { return !use_flush; } private void respondToStateRequester() { // setup the plumbing if needed if (spawner == null) { ServerSocket serverSocket = Util.createServerSocket(bind_addr, bind_port); spawner = new StateProviderThreadSpawner(setupThreadPool(), serverSocket); new Thread(Util.getGlobalThreadGroup(), spawner, "StateProviderThreadSpawner").start(); } synchronized (state_requesters) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -