⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streaming_state_transfer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
package org.jgroups.protocols.pbcast;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.ObjectInput;import java.io.ObjectInputStream;import java.io.ObjectOutput;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.net.UnknownHostException;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.Vector;import org.jgroups.Address;import org.jgroups.Channel;import org.jgroups.Event;import org.jgroups.Global;import org.jgroups.Header;import org.jgroups.Message;import org.jgroups.TimeoutException;import org.jgroups.View;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.Promise;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;/** * <code>STREAMING_STATE_TRANSFER</code>, as its name implies, allows a streaming  * state transfer between two channel instances.  *  * <p> *  * Major advantage of this approach is that transfering application state to a  * joining member of a group does not entail loading of the complete application  * state into memory. Application state, for example, might be located entirely  * on some form of disk based storage. The default <code>STATE_TRANSFER</code>  * requires this state to be loaded entirely into memory before being transferred  * to a group member while <code>STREAMING_STATE_TRANSFER</code> does not.  * Thus <code>STREAMING_STATE_TRANSFER</code> protocol is able to transfer  * application state that is very large (>1Gb) without a likelihood of such transfer  * resulting in OutOfMemoryException. *  * <p> *  * Channel instance can be configured with either <code>STREAMING_STATE_TRANSFER</code>  * or <code>STATE_TRANSFER</code> but not both protocols at the same time.  *  * <p> *  * In order to process streaming state transfer an application has to implement  * <code>ExtendedMessageListener</code> if it is using channel in a push style  * mode or it has to process <code>StreamingSetStateEvent</code> and  * <code>StreamingGetStateEvent</code> if it is using channel in a pull style mode.     *  *  * @author Vladimir Blagojevic * @see org.jgroups.ExtendedMessageListener * @see org.jgroups.StreamingGetStateEvent * @see org.jgroups.StreamingSetStateEvent * @see org.jgroups.protocols.pbcast.STATE_TRANSFER * @since 2.4 *  * @version $Id$ *  */public class STREAMING_STATE_TRANSFER extends Protocol{   Address local_addr = null;   final Vector members = new Vector();   final Map state_requesters = new HashMap();   /** set to true while waiting for a STATE_RSP */   boolean waiting_for_state_response = false;   Digest digest = null;   final HashMap map = new HashMap(); // to store configuration information   int num_state_reqs = 0;   long num_bytes_sent = 0;   double avg_state_size = 0;   final static String NAME = "STREAMING_STATE_TRANSFER";   private InetAddress bind_addr;   private int bind_port = 0;   private StateProviderThreadSpawner spawner;   private int max_pool = 5;   private long pool_thread_keep_alive;   private int socket_buffer_size = 8 * 1024;   private boolean use_reading_thread;   private Promise flush_promise = new Promise();;   private volatile boolean use_flush;   private long flush_timeout = 4000;   private final Object poolLock = new Object();   private int threadCounter;   public final String getName()   {      return NAME;   }   public int getNumberOfStateRequests()   {      return num_state_reqs;   }   public long getNumberOfStateBytesSent()   {      return num_bytes_sent;   }   public double getAverageStateSize()   {      return avg_state_size;   }   public Vector requiredDownServices()   {      Vector retval = new Vector();      retval.addElement(new Integer(Event.GET_DIGEST_STATE));      retval.addElement(new Integer(Event.SET_DIGEST));      return retval;   }   public Vector requiredUpServices()   {      Vector retval = new Vector();      if (use_flush)      {         retval.addElement(new Integer(Event.SUSPEND));         retval.addElement(new Integer(Event.RESUME));      }      return retval;   }   public void resetStats()   {      super.resetStats();      num_state_reqs = 0;      num_bytes_sent = 0;      avg_state_size = 0;   }   public boolean setProperties(Properties props)   {      super.setProperties(props);      use_flush = Util.parseBoolean(props, "use_flush", false);                  flush_timeout = Util.parseLong(props, "flush_timeout", flush_timeout);            try      {         bind_addr = Util.parseBindAddress(props, "bind_addr");      }      catch (UnknownHostException e)      {         log.error("(bind_addr): host " + e.getLocalizedMessage() + " not known");         return false;      }      bind_port = Util.parseInt(props, "start_port", 0);      socket_buffer_size = Util.parseInt(props, "socket_buffer_size", 8 * 1024); //8K      max_pool = Util.parseInt(props, "max_pool", 5);      pool_thread_keep_alive = Util.parseLong(props, "pool_thread_keep_alive", 1000 * 30); //30 sec      use_reading_thread = Util.parseBoolean(props, "use_reading_thread", false);      if (props.size() > 0)      {         log.error("the following properties are not recognized: " + props);         return false;      }      return true;   }   public void init() throws Exception   {      map.put("state_transfer", Boolean.TRUE);      map.put("protocol_class", getClass().getName());   }   public void start() throws Exception   {      passUp(new Event(Event.CONFIG, map));   }   public void stop()   {      super.stop();      waiting_for_state_response = false;      if (spawner != null)      {         spawner.stop();      }   }   public void up(Event evt)   {      switch (evt.getType())      {         case Event.BECOME_SERVER :            break;         case Event.SET_LOCAL_ADDRESS :            local_addr = (Address) evt.getArg();            break;         case Event.TMP_VIEW :         case Event.VIEW_CHANGE :            handleViewChange((View) evt.getArg());            break;         case Event.GET_DIGEST_STATE_OK :            synchronized (state_requesters)            {               digest = (Digest) evt.getArg();               if (log.isDebugEnabled())                  log.debug("GET_DIGEST_STATE_OK: digest is " + digest);            }            respondToStateRequester();            return;         case Event.MSG :            Message msg = (Message) evt.getArg();            StateHeader hdr = (StateHeader) msg.removeHeader(getName());            if (hdr != null)            {               switch (hdr.type)               {                  case StateHeader.STATE_REQ :                     handleStateReq(hdr);                     break;                  case StateHeader.STATE_RSP :                     handleStateRsp(hdr);                     break;                  case StateHeader.STATE_REMOVE_REQUESTER :                     removeFromStateRequesters(hdr.sender, hdr.state_id);                     break;                  default :                     if (log.isErrorEnabled())                        log.error("type " + hdr.type + " not known in StateHeader");                     break;               }               return;            }            break;         case Event.CONFIG :            Map config = (Map) evt.getArg();            if (bind_addr == null && (config != null && config.containsKey("bind_addr")))            {                              bind_addr = (InetAddress) config.get("bind_addr");               if (log.isDebugEnabled())                  log.debug("using bind_addr from CONFIG event " + bind_addr);            }                break;      }      passUp(evt);   }   public void down(Event evt)   {      Address target;      StateTransferInfo info;      switch (evt.getType())      {         case Event.TMP_VIEW :         case Event.VIEW_CHANGE :            handleViewChange((View) evt.getArg());            break;         case Event.GET_STATE :            info = (StateTransferInfo) evt.getArg();            if (info.target == null)            {               target = determineCoordinator();            }            else            {               target = info.target;               if (target.equals(local_addr))               {                  if (log.isErrorEnabled())                     log.error("GET_STATE: cannot fetch state from myself !");                  target = null;               }            }            if (target == null)            {               if (log.isDebugEnabled())                  log.debug("GET_STATE: first member (no state)");               passUp(new Event(Event.GET_STATE_OK, new StateTransferInfo()));            }            else            {               if (use_flush)               {                  startFlush(flush_timeout);               }               Message state_req = new Message(target, null, null);               state_req.putHeader(NAME, new StateHeader(StateHeader.STATE_REQ, local_addr, info.state_id));               if (log.isDebugEnabled())                  log.debug("GET_STATE: asking " + target + " for state");               // suspend sending and handling of mesage garbage collection gossip messages,               // fixes bugs #943480 and #938584). Wake up when state has been received               if (log.isDebugEnabled())                  log.debug("passing down a SUSPEND_STABLE event");               passDown(new Event(Event.SUSPEND_STABLE, new Long(info.timeout)));               waiting_for_state_response = true;                            passDown(new Event(Event.MSG, state_req));            }            return; // don't pass down any further !         case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED :            if (use_flush)            {               stopFlush();            }            if (log.isDebugEnabled())               log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received");            //resume sending and handling of message garbage collection gossip messages,            // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage            // collection protocol (e.g. STABLE)            if (log.isDebugEnabled())               log.debug("passing down a RESUME_STABLE event");            passDown(new Event(Event.RESUME_STABLE));            return;         case Event.SUSPEND_OK :            if (use_flush)            {               flush_promise.setResult(Boolean.TRUE);            }            break;         case Event.CONFIG :            Map config = (Map) evt.getArg();                       if(config != null && config.containsKey("flush_timeout"))            {               Long ftimeout = (Long) config.get("flush_timeout");               use_flush = true;                            flush_timeout = ftimeout.longValue();                                         }            break;                     }      passDown(evt); // pass on to the layer below us   }   /* --------------------------- Private Methods -------------------------------- */   /**    * When FLUSH is used we do not need to pass digests between members    *    * see JGroups/doc/design/PArtialStateTransfer.txt    * see JGroups/doc/design/FLUSH.txt    *    * @return true if use of digests is required, false otherwise    */   private boolean isDigestNeeded()   {      return !use_flush;   }   private void respondToStateRequester()   {      // setup the plumbing if needed      if (spawner == null)      {         ServerSocket serverSocket = Util.createServerSocket(bind_addr, bind_port);         spawner = new StateProviderThreadSpawner(setupThreadPool(), serverSocket);         new Thread(Util.getGlobalThreadGroup(), spawner, "StateProviderThreadSpawner").start();      }      synchronized (state_requesters)      {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -