⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 state_transfer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.stack.StateTransferInfo;import org.jgroups.util.Promise;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * New STATE_TRANSFER protocol based on PBCAST. Compared to the one in ./protocols, it doesn't * need a QUEUE layer above it. A state request is sent to a chosen member (coordinator if * null). That member makes a copy D of its current digest and asks the application for a copy of * its current state S. Then the member returns both S and D to the requester. The requester * first sets its digest to D and then returns the state to the application. * @author Bela Ban * @version $Id: STATE_TRANSFER.java,v 1.44 2006/09/29 21:48:17 bstansberry Exp $ */public class STATE_TRANSFER extends Protocol {    Address        local_addr=null;    final Vector   members=new Vector();    long           state_id=1;  // used to differentiate between state transfers (not currently used)    // final Set      state_requesters=new HashSet(); // requesters of state (usually just 1, could be more)    /** Map<String,Set> of state requesters. Keys are state IDs, values are Sets of Addresses (one for each requester) */    final Map      state_requesters=new HashMap();    /** set to true while waiting for a STATE_RSP */    boolean        waiting_for_state_response=false;    Digest         digest=null;    final HashMap  map=new HashMap(); // to store configuration information    long           start, stop; // to measure state transfer time    int            num_state_reqs=0;    long           num_bytes_sent=0;    double         avg_state_size=0;    final static   String name="STATE_TRANSFER";    boolean        use_flush=false;    long           flush_timeout=4000;    Promise        flush_promise;    /** All protocol names have to be unique ! */    public String getName() {        return name;    }    public int getNumberOfStateRequests() {return num_state_reqs;}    public long getNumberOfStateBytesSent() {return num_bytes_sent;}    public double getAverageStateSize() {return avg_state_size;}    public Vector requiredDownServices() {        Vector retval=new Vector();        retval.addElement(new Integer(Event.GET_DIGEST_STATE));        retval.addElement(new Integer(Event.SET_DIGEST));        return retval;    }    public Vector requiredUpServices() {        Vector retval=new Vector();        if(use_flush) {            retval.addElement(new Integer(Event.SUSPEND));            retval.addElement(new Integer(Event.RESUME));        }        return retval;    }    public void resetStats() {        super.resetStats();        num_state_reqs=0;        num_bytes_sent=0;        avg_state_size=0;    }    public boolean setProperties(Properties props) {        super.setProperties(props);        use_flush=Util.parseBoolean(props, "use_flush", false);                flush_promise=new Promise();                flush_timeout = Util.parseLong(props, "flush_timeout", flush_timeout);               if(props.size() > 0) {            log.error("the following properties are not recognized: " + props);            return false;        }        return true;    }    public void init() throws Exception {        map.put("state_transfer", Boolean.TRUE);        map.put("protocol_class", getClass().getName());    }    public void start() throws Exception {        passUp(new Event(Event.CONFIG, map));    }    public void stop() {        super.stop();        waiting_for_state_response=false;    }    public void up(Event evt) {        Message     msg;        StateHeader hdr;        switch(evt.getType()) {        case Event.BECOME_SERVER:            break;        case Event.SET_LOCAL_ADDRESS:            local_addr=(Address)evt.getArg();            break;        case Event.TMP_VIEW:        case Event.VIEW_CHANGE:            handleViewChange((View)evt.getArg());            break;        case Event.GET_DIGEST_STATE_OK:            synchronized(state_requesters) {                digest=(Digest)evt.getArg();                if(log.isDebugEnabled())                    log.debug("GET_DIGEST_STATE_OK: digest is " + digest + "\npassUp(GET_APPLSTATE)");                requestApplicationStates();            }            return;        case Event.MSG:            msg=(Message)evt.getArg();            if(!(msg.getHeader(name) instanceof StateHeader))                break;            hdr=(StateHeader)msg.removeHeader(name);            switch(hdr.type) {            case StateHeader.STATE_REQ:                handleStateReq(hdr);                break;            case StateHeader.STATE_RSP:                handleStateRsp(hdr, msg.getBuffer());                if(use_flush) {            		stopFlush();            	}                break;            default:                if(log.isErrorEnabled()) log.error("type " + hdr.type + " not known in StateHeader");                break;            }            return;        }        passUp(evt);    }    public void down(Event evt) {        byte[] state;        Address target, requester;        StateTransferInfo info;        StateHeader hdr;        switch(evt.getType()) {            case Event.TMP_VIEW:            case Event.VIEW_CHANGE:                handleViewChange((View)evt.getArg());                break;            // generated by JChannel.getState(). currently, getting the state from more than 1 mbr is not implemented            case Event.GET_STATE:                info=(StateTransferInfo)evt.getArg();                if(info.target == null) {                    target=determineCoordinator();                }                else {                    target=info.target;                    if(target.equals(local_addr)) {                        if(log.isErrorEnabled()) log.error("GET_STATE: cannot fetch state from myself !");                        target=null;                    }                }                if(target == null) {                    if(log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)");                    passUp(new Event(Event.GET_STATE_OK, new StateTransferInfo()));                }                else {                    if(use_flush) {                        startFlush(flush_timeout);                    }                    Message state_req=new Message(target, null, null);                    state_req.putHeader(name, new StateHeader(StateHeader.STATE_REQ, local_addr, state_id++, null, info.state_id));                    if(log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state");                    // suspend sending and handling of mesage garbage collection gossip messages,                    // fixes bugs #943480 and #938584). Wake up when state has been received                    if(log.isDebugEnabled())                        log.debug("passing down a SUSPEND_STABLE event");                    passDown(new Event(Event.SUSPEND_STABLE, new Long(info.timeout)));                    waiting_for_state_response=true;                    start=System.currentTimeMillis();                    passDown(new Event(Event.MSG, state_req));                }                return;                 // don't pass down any further !            case Event.GET_APPLSTATE_OK:                info=(StateTransferInfo)evt.getArg();                state=info.state;                String id=info.state_id;                synchronized(state_requesters) {                    if(state_requesters.size() == 0) {                        if(warn)                            log.warn("GET_APPLSTATE_OK: received application state, but there are no requesters !");                        return;                    }                    if(isDigestNeeded()){	                    if(digest == null) {	                        if(warn) log.warn("GET_APPLSTATE_OK: received application state, but there is no digest !");                        }	                    else {	                        digest=digest.copy();                        }                    }                    if(stats) {                        num_state_reqs++;                        if(state != null)                            num_bytes_sent+=state.length;                        avg_state_size=num_bytes_sent / num_state_reqs;                    }                    Set requesters=(Set)state_requesters.get(id);                    if(requesters == null || requesters.size() == 0) {                        log.warn("received state for id=" + id + ", but there are no requesters for this ID");                    }                    else {                        for(Iterator it=requesters.iterator(); it.hasNext();) {                            requester=(Address)it.next();                            final Message state_rsp=new Message(requester, null, state);                            hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, digest, id);                            state_rsp.putHeader(name, hdr);                            if(trace)                                log.trace("sending state for ID=" + id + " to " + requester + " (" + state.length + " bytes)");                            // This has to be done in a separate thread, so we don't block on FC                            // (see http://jira.jboss.com/jira/browse/JGRP-225 for details). This will be reverted once                            // we have the threadless stack  (http://jira.jboss.com/jira/browse/JGRP-181)                            // and out-of-band messages (http://jira.jboss.com/jira/browse/JGRP-205)                            new Thread() {                                public void run() {                                   passDown(new Event(Event.MSG, state_rsp));                                }                            }.start();                            // passDown(new Event(Event.MSG, state_rsp));                        }                        state_requesters.remove(id);                    }                }                return;             // don't pass down any further !            case Event.SUSPEND_OK:            	if(use_flush) {            		flush_promise.setResult(Boolean.TRUE);            	}            	break;                            case Event.CONFIG :               Map config = (Map) evt.getArg();                              if(config != null && config.containsKey("flush_timeout")){                  Long ftimeout = (Long) config.get("flush_timeout");                  use_flush = true;                                    flush_timeout = ftimeout.longValue();                                              }               break;             }        passDown(evt);              // pass on to the layer below us    }    /* --------------------------- Private Methods -------------------------------- */    /**	 * When FLUSH is used we do not need to pass digests between members	 *	 * see JGroups/doc/design/PartialStateTransfer.txt	 * see JGroups/doc/design/FLUSH.txt

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -