📄 state_transfer.java
字号:
package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.Promise;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * New STATE_TRANSFER protocol based on PBCAST. Compared to the one in ./protocols, it doesn't * need a QUEUE layer above it. A state request is sent to a chosen member (coordinator if * null). That member makes a copy D of its current digest and asks the application for a copy of * its current state S. Then the member returns both S and D to the requester. The requester * first sets its digest to D and then returns the state to the application. * @author Bela Ban * @version $Id: STATE_TRANSFER.java,v 1.44 2006/09/29 21:48:17 bstansberry Exp $ */public class STATE_TRANSFER extends Protocol { Address local_addr=null; final Vector members=new Vector(); long state_id=1; // used to differentiate between state transfers (not currently used) // final Set state_requesters=new HashSet(); // requesters of state (usually just 1, could be more) /** Map<String,Set> of state requesters. Keys are state IDs, values are Sets of Addresses (one for each requester) */ final Map state_requesters=new HashMap(); /** set to true while waiting for a STATE_RSP */ boolean waiting_for_state_response=false; Digest digest=null; final HashMap map=new HashMap(); // to store configuration information long start, stop; // to measure state transfer time int num_state_reqs=0; long num_bytes_sent=0; double avg_state_size=0; final static String name="STATE_TRANSFER"; boolean use_flush=false; long flush_timeout=4000; Promise flush_promise; /** All protocol names have to be unique ! */ public String getName() { return name; } public int getNumberOfStateRequests() {return num_state_reqs;} public long getNumberOfStateBytesSent() {return num_bytes_sent;} public double getAverageStateSize() {return avg_state_size;} public Vector requiredDownServices() { Vector retval=new Vector(); retval.addElement(new Integer(Event.GET_DIGEST_STATE)); retval.addElement(new Integer(Event.SET_DIGEST)); return retval; } public Vector requiredUpServices() { Vector retval=new Vector(); if(use_flush) { retval.addElement(new Integer(Event.SUSPEND)); retval.addElement(new Integer(Event.RESUME)); } return retval; } public void resetStats() { super.resetStats(); num_state_reqs=0; num_bytes_sent=0; avg_state_size=0; } public boolean setProperties(Properties props) { super.setProperties(props); use_flush=Util.parseBoolean(props, "use_flush", false); flush_promise=new Promise(); flush_timeout = Util.parseLong(props, "flush_timeout", flush_timeout); if(props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } public void init() throws Exception { map.put("state_transfer", Boolean.TRUE); map.put("protocol_class", getClass().getName()); } public void start() throws Exception { passUp(new Event(Event.CONFIG, map)); } public void stop() { super.stop(); waiting_for_state_response=false; } public void up(Event evt) { Message msg; StateHeader hdr; switch(evt.getType()) { case Event.BECOME_SERVER: break; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.TMP_VIEW: case Event.VIEW_CHANGE: handleViewChange((View)evt.getArg()); break; case Event.GET_DIGEST_STATE_OK: synchronized(state_requesters) { digest=(Digest)evt.getArg(); if(log.isDebugEnabled()) log.debug("GET_DIGEST_STATE_OK: digest is " + digest + "\npassUp(GET_APPLSTATE)"); requestApplicationStates(); } return; case Event.MSG: msg=(Message)evt.getArg(); if(!(msg.getHeader(name) instanceof StateHeader)) break; hdr=(StateHeader)msg.removeHeader(name); switch(hdr.type) { case StateHeader.STATE_REQ: handleStateReq(hdr); break; case StateHeader.STATE_RSP: handleStateRsp(hdr, msg.getBuffer()); if(use_flush) { stopFlush(); } break; default: if(log.isErrorEnabled()) log.error("type " + hdr.type + " not known in StateHeader"); break; } return; } passUp(evt); } public void down(Event evt) { byte[] state; Address target, requester; StateTransferInfo info; StateHeader hdr; switch(evt.getType()) { case Event.TMP_VIEW: case Event.VIEW_CHANGE: handleViewChange((View)evt.getArg()); break; // generated by JChannel.getState(). currently, getting the state from more than 1 mbr is not implemented case Event.GET_STATE: info=(StateTransferInfo)evt.getArg(); if(info.target == null) { target=determineCoordinator(); } else { target=info.target; if(target.equals(local_addr)) { if(log.isErrorEnabled()) log.error("GET_STATE: cannot fetch state from myself !"); target=null; } } if(target == null) { if(log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)"); passUp(new Event(Event.GET_STATE_OK, new StateTransferInfo())); } else { if(use_flush) { startFlush(flush_timeout); } Message state_req=new Message(target, null, null); state_req.putHeader(name, new StateHeader(StateHeader.STATE_REQ, local_addr, state_id++, null, info.state_id)); if(log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state"); // suspend sending and handling of mesage garbage collection gossip messages, // fixes bugs #943480 and #938584). Wake up when state has been received if(log.isDebugEnabled()) log.debug("passing down a SUSPEND_STABLE event"); passDown(new Event(Event.SUSPEND_STABLE, new Long(info.timeout))); waiting_for_state_response=true; start=System.currentTimeMillis(); passDown(new Event(Event.MSG, state_req)); } return; // don't pass down any further ! case Event.GET_APPLSTATE_OK: info=(StateTransferInfo)evt.getArg(); state=info.state; String id=info.state_id; synchronized(state_requesters) { if(state_requesters.size() == 0) { if(warn) log.warn("GET_APPLSTATE_OK: received application state, but there are no requesters !"); return; } if(isDigestNeeded()){ if(digest == null) { if(warn) log.warn("GET_APPLSTATE_OK: received application state, but there is no digest !"); } else { digest=digest.copy(); } } if(stats) { num_state_reqs++; if(state != null) num_bytes_sent+=state.length; avg_state_size=num_bytes_sent / num_state_reqs; } Set requesters=(Set)state_requesters.get(id); if(requesters == null || requesters.size() == 0) { log.warn("received state for id=" + id + ", but there are no requesters for this ID"); } else { for(Iterator it=requesters.iterator(); it.hasNext();) { requester=(Address)it.next(); final Message state_rsp=new Message(requester, null, state); hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, digest, id); state_rsp.putHeader(name, hdr); if(trace) log.trace("sending state for ID=" + id + " to " + requester + " (" + state.length + " bytes)"); // This has to be done in a separate thread, so we don't block on FC // (see http://jira.jboss.com/jira/browse/JGRP-225 for details). This will be reverted once // we have the threadless stack (http://jira.jboss.com/jira/browse/JGRP-181) // and out-of-band messages (http://jira.jboss.com/jira/browse/JGRP-205) new Thread() { public void run() { passDown(new Event(Event.MSG, state_rsp)); } }.start(); // passDown(new Event(Event.MSG, state_rsp)); } state_requesters.remove(id); } } return; // don't pass down any further ! case Event.SUSPEND_OK: if(use_flush) { flush_promise.setResult(Boolean.TRUE); } break; case Event.CONFIG : Map config = (Map) evt.getArg(); if(config != null && config.containsKey("flush_timeout")){ Long ftimeout = (Long) config.get("flush_timeout"); use_flush = true; flush_timeout = ftimeout.longValue(); } break; } passDown(evt); // pass on to the layer below us } /* --------------------------- Private Methods -------------------------------- */ /** * When FLUSH is used we do not need to pass digests between members * * see JGroups/doc/design/PartialStateTransfer.txt * see JGroups/doc/design/FLUSH.txt
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -