state_transfer.java
来自「JGRoups源码」· Java 代码 · 共 432 行
JAVA
432 行
// $Id: STATE_TRANSFER.java,v 1.21 2006/04/13 08:14:20 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.RequestCorrelator;import org.jgroups.blocks.RequestHandler;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.Rsp;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.Serializable;import java.util.HashMap;import java.util.Properties;import java.util.Vector;class StateTransferRequest implements Serializable { static final int MAKE_COPY=1; // arg = originator of request static final int RETURN_STATE=2; // arg = orginator of request int type=0; final Object arg; private static final long serialVersionUID = -7734608266762273116L; StateTransferRequest(int type, Object arg) { this.type=type; this.arg=arg; } public int getType() { return type; } public Object getArg() { return arg; } public String toString() { return "[StateTransferRequest: type=" + type2Str(type) + ", arg=" + arg + ']'; } static String type2Str(int t) { switch(t) { case MAKE_COPY: return "MAKE_COPY"; case RETURN_STATE: return "RETURN_STATE"; default: return "<unknown>"; } }}/** * State transfer layer. Upon receiving a GET_STATE event from JChannel, a MAKE_COPY message is * sent to all members. When the originator receives MAKE_COPY, it queues all messages to the * channel. * When another member receives the message, it asks the JChannel to provide it with a copy of * the current state (GetStateEvent is received by application, returnState() sends state down the * stack). Then the current layer sends a unicast RETURN_STATE message to the coordinator, which * returns the cached copy. * When the state is received by the originator, the GET_STATE sender is unblocked with a * GET_STATE_OK event up the stack (unless it already timed out).<p> * Requires QUEUE layer on top. * * @author Bela Ban */public class STATE_TRANSFER extends Protocol implements RequestHandler { Address local_addr=null; final Vector members=new Vector(11); final Message m=null; boolean is_server=false; byte[] cached_state=null; final Object state_xfer_mutex=new Object(); // get state from appl (via channel). long timeout_get_appl_state=5000; long timeout_return_state=5000; RequestCorrelator corr=null; final Vector observers=new Vector(5); final HashMap map=new HashMap(7); /** * All protocol names have to be unique ! */ public String getName() { return "STATE_TRANSFER"; } public void init() throws Exception { map.put("state_transfer", Boolean.TRUE); map.put("protocol_class", getClass().getName()); } public void start() throws Exception { corr=new RequestCorrelator(getName(), this, this); passUp(new Event(Event.CONFIG, map)); } public void stop() { if(corr != null) { corr.stop(); corr=null; } } public boolean setProperties(Properties props) { String str; super.setProperties(props); // Milliseconds to wait for application to provide requested state, events are // STATE_TRANSFER up and STATE_TRANSFER_OK down str=props.getProperty("timeout_get_appl_state"); if(str != null) { timeout_get_appl_state=Long.parseLong(str); props.remove("timeout_get_appl_state"); } // Milliseconds to wait for 1 or all members to return its/their state. 0 means wait // forever. States are retrieved using GroupRequest/RequestCorrelator str=props.getProperty("timeout_return_state"); if(str != null) { timeout_return_state=Long.parseLong(str); props.remove("timeout_return_state"); } if(props.size() > 0) { log.error("STATE_TRANSFER.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public Vector requiredUpServices() { Vector ret=new Vector(2); ret.addElement(new Integer(Event.START_QUEUEING)); ret.addElement(new Integer(Event.STOP_QUEUEING)); return ret; } public void up(Event evt) { switch(evt.getType()) { case Event.BECOME_SERVER: is_server=true; break; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.TMP_VIEW: case Event.VIEW_CHANGE: Vector new_members=((View)evt.getArg()).getMembers(); synchronized(members) { members.removeAllElements(); if(new_members != null && new_members.size() > 0) for(int k=0; k < new_members.size(); k++) members.addElement(new_members.elementAt(k)); } break; } if(corr != null) corr.receive(evt); // will consume or pass up, depending on header else passUp(evt); } public void down(Event evt) { Object coord, state; Vector event_list; StateTransferInfo info; switch(evt.getType()) { case Event.TMP_VIEW: case Event.VIEW_CHANGE: Vector new_members=((View)evt.getArg()).getMembers(); synchronized(members) { members.removeAllElements(); if(new_members != null && new_members.size() > 0) for(int k=0; k < new_members.size(); k++) members.addElement(new_members.elementAt(k)); } break; case Event.GET_STATE: // generated by JChannel.getState() info=(StateTransferInfo)evt.getArg(); coord=determineCoordinator(); if(coord == null || coord.equals(local_addr)) { event_list=new Vector(1); event_list.addElement(new Event(Event.GET_STATE_OK, new StateTransferInfo())); passUp(new Event(Event.STOP_QUEUEING, event_list)); return; // don't pass down any further ! } try { sendMakeCopyMessage(); // multicast MAKE_COPY to all members (including me) state=getStateFromSingle(info.target); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed sending state request", t); state=null; } /* Pass up the state to the application layer (insert into JChannel's event queue */ event_list=new Vector(1); event_list.addElement(new Event(Event.GET_STATE_OK, new StateTransferInfo(null, info.state_id, 0L, (byte[])state))); /* Now stop queueing */ passUp(new Event(Event.STOP_QUEUEING, event_list)); return; // don't pass down any further ! case Event.GET_APPLSTATE_OK: synchronized(state_xfer_mutex) { info=(StateTransferInfo)evt.getArg(); cached_state=info.state; state_xfer_mutex.notifyAll(); } return; // don't pass down any further ! } passDown(evt); // pass on to the layer below us } /* ---------------------- Interface RequestHandler -------------------------- */ public Object handle(Message msg) { StateTransferRequest req; try { req=(StateTransferRequest)msg.getObject(); switch(req.getType()) { case StateTransferRequest.MAKE_COPY: makeCopy(req.getArg()); return null; case StateTransferRequest.RETURN_STATE: if(is_server) return cached_state; else { if(warn) log.warn("RETURN_STATE: returning null" + "as I'm not yet an operational state server !"); return null; } default: if(log.isErrorEnabled()) log.error("type " + req.getType() + "is unknown in StateTransferRequest !"); return null; } } catch(Exception e) { if(log.isErrorEnabled()) log.error("exception is " + e); return null; } } /* ------------------- End of Interface RequestHandler ---------------------- */ byte[] getStateFromSingle(Address target) throws Throwable { Vector dests=new Vector(11); Message msg; StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr); RspList rsp_list; Rsp rsp; Address dest; GroupRequest req; int num_tries=0; try { msg=new Message(null, null, Util.objectToByteBuffer(r)); } catch(Exception e) { if(log.isErrorEnabled()) log.error("exception=" + e); return null; } while(members.size() > 1 && num_tries++ < 3) { // excluding myself dest=target != null? target : determineCoordinator(); if(dest == null) return null; msg.setDest(dest); dests.removeAllElements(); dests.addElement(dest); req=new GroupRequest(msg, corr, dests, GroupRequest.GET_FIRST, timeout_return_state, 0); req.execute(); rsp_list=req.getResults(); for(int i=0; i < rsp_list.size(); i++) { // get the first non-suspected result rsp=(Rsp)rsp_list.elementAt(i); if(rsp.wasReceived()) return (byte[])rsp.getValue(); } Util.sleep(1000); } return null; }// Vector getStateFromMany(Vector targets) {// Vector dests=new Vector(11);// Message msg;// StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr);// RspList rsp_list;// GroupRequest req;// int i;////// if(targets != null) {// for(i=0; i < targets.size(); i++)// if(!local_addr.equals(targets.elementAt(i)))// dests.addElement(targets.elementAt(i));// }// else {// for(i=0; i < members.size(); i++)// if(!local_addr.equals(members.elementAt(i)))// dests.addElement(members.elementAt(i));// }//// if(dests.size() == 0)// return null;//// msg=new Message();// try {// msg.setBuffer(Util.objectToByteBuffer(r));// }// catch(Exception e) {// }//// req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0);// req.execute();// rsp_list=req.getResults();// return rsp_list.getResults();// } void sendMakeCopyMessage() throws Throwable { GroupRequest req; Message msg=new Message(); StateTransferRequest r=new StateTransferRequest(StateTransferRequest.MAKE_COPY, local_addr); Vector dests=new Vector(11); for(int i=0; i < members.size(); i++) dests.addElement(members.elementAt(i)); if(dests.size() == 0) return; try { msg.setBuffer(Util.objectToByteBuffer(r)); } catch(Exception e) { } req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0); req.execute(); } /** * Return the first element of members which is not me. Otherwise return null. */ Address determineCoordinator() { Address ret=null; if(members != null && members.size() > 1) { for(int i=0; i < members.size(); i++) if(!local_addr.equals(members.elementAt(i))) return (Address)members.elementAt(i); } return ret; } /** * If server, ask application to send us a copy of its state (STATE_TRANSFER up, * STATE_TRANSFER down). If client, start queueing events. Queuing will be stopped when * state has been retrieved (or not) from single or all member(s). */ void makeCopy(Object sender) { if(sender.equals(local_addr)) { // was sent by us, has to start queueing passUp(new Event(Event.START_QUEUEING)); } else { // only retrieve state from appl when not in client state anymore if(is_server) { // get state from application and store it locally synchronized(state_xfer_mutex) { cached_state=null; StateTransferInfo info=new StateTransferInfo(local_addr); passUp(new Event(Event.GET_APPLSTATE, info)); if(cached_state == null) { try { state_xfer_mutex.wait(timeout_get_appl_state); // wait for STATE_TRANSFER_OK } catch(Exception e) { } } } } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?