⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 state_transfer.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	 *	 * @return true if use of digests is required, false otherwise	 */	private boolean isDigestNeeded(){		return !use_flush;	}    private void requestApplicationStates() {        synchronized(state_requesters) {            Set appl_ids=new HashSet(state_requesters.keySet());            String id;            for(Iterator it=appl_ids.iterator(); it.hasNext();) {                id=(String)it.next();                StateTransferInfo info=new StateTransferInfo(null, id, 0L, null);                passUp(new Event(Event.GET_APPLSTATE, info));            }        }    }    /** Return the first element of members which is not me. Otherwise return null. */    private Address determineCoordinator() {        Address ret=null;        synchronized(members) {            if(members != null && members.size() > 1) {                for(int i=0; i < members.size(); i++)                    if(!local_addr.equals(members.elementAt(i)))                        return (Address)members.elementAt(i);            }        }        return ret;    }    private void handleViewChange(View v) {        Address old_coord;        Vector new_members=v.getMembers();        boolean send_up_null_state_rsp=false;        synchronized(members) {            old_coord=(Address)(members.size() > 0? members.firstElement() : null);            members.clear();            members.addAll(new_members);            // this handles the case where a coord dies during a state transfer; prevents clients from hanging forever            // Note this only takes a coordinator crash into account, a getState(target, timeout), where target is not            // null is not handled ! (Usually we get the state from the coordinator)            // http://jira.jboss.com/jira/browse/JGRP-148            if(waiting_for_state_response && old_coord != null && !members.contains(old_coord)) {                send_up_null_state_rsp=true;            }        }        if(send_up_null_state_rsp) {            if(warn)                log.warn("discovered that the state provider (" + old_coord + ") crashed; will return null state to application");            StateHeader hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, null, null);            handleStateRsp(hdr, null); // sends up null GET_STATE_OK        }    }    /**     * If a state transfer is in progress, we don't need to send a GET_APPLSTATE event to the application, but     * instead we just add the sender to the requester list so it will receive the same state when done. If not,     * we add the sender to the requester list and send a GET_APPLSTATE event up.     */    private void handleStateReq(StateHeader hdr) {        Object sender=hdr.sender;        if(sender == null) {            if(log.isErrorEnabled()) log.error("sender is null !");            return;        }        String id=hdr.state_id; // id could be null, which means get the entire state        synchronized(state_requesters) {            boolean empty=state_requesters.size() == 0;            Set requesters=(Set)state_requesters.get(id);            if(requesters == null) {                requesters=new HashSet();                state_requesters.put(id, requesters);            }            requesters.add(sender);            if(!isDigestNeeded()) { // state transfer is in progress, digest was already requested                requestApplicationStates();            }            else if(empty){                digest=null;                if(log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE");                passDown(new Event(Event.GET_DIGEST_STATE));            }        }    }    /** Set the digest and the send the state up to the application */    void handleStateRsp(StateHeader hdr, byte[] state) {        Address sender=hdr.sender;        Digest tmp_digest=hdr.my_digest;        String id=hdr.state_id;        waiting_for_state_response=false;        if(isDigestNeeded()){	        if(tmp_digest == null) {	            if(warn)	                log.warn("digest received from " + sender + " is null, skipping setting digest !");	        }	        else	            passDown(new Event(Event.SET_DIGEST, tmp_digest)); // set the digest (e.g. in NAKACK)        }        stop=System.currentTimeMillis();        // resume sending and handling of message garbage collection gossip messages,        // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage        // collection protocol (e.g. STABLE)        if(log.isDebugEnabled())            log.debug("passing down a RESUME_STABLE event");        passDown(new Event(Event.RESUME_STABLE));        if(state == null) {            if(warn)                log.warn("state received from " + sender + " is null, will return null state to application");        }        else            log.debug("received state, size=" + state.length + " bytes. Time=" + (stop-start) + " milliseconds");        StateTransferInfo info=new StateTransferInfo(null, id, 0L, state);        passUp(new Event(Event.GET_STATE_OK, info));    }    private boolean startFlush(long timeout) {        boolean successfulFlush=false;        passUp(new Event(Event.SUSPEND));        try {            flush_promise.reset();            flush_promise.getResultWithTimeout(timeout);            successfulFlush=true;        }        catch(TimeoutException e) {           log.warn("Initiator of flush and state requesting member " + local_addr                 + " timed out waiting for flush responses after "                  + flush_timeout + " msec");        }        return successfulFlush;    }    private void stopFlush() {        passUp(new Event(Event.RESUME));    }    /* ------------------------ End of Private Methods ------------------------------ */    /**     * Wraps data for a state request/response. Note that for a state response the actual state will <em>not</em     * be stored in the header itself, but in the message's buffer.     *     */    public static class StateHeader extends Header implements Streamable {        public static final byte STATE_REQ=1;        public static final byte STATE_RSP=2;        long    id=0;               // state transfer ID (to separate multiple state transfers at the same time)        byte    type=0;        Address sender;             // sender of state STATE_REQ or STATE_RSP        Digest  my_digest=null;     // digest of sender (if type is STATE_RSP)        String  state_id=null;      // for partial state transfer        public StateHeader() {  // for externalization        }        public StateHeader(byte type, Address sender, long id, Digest digest) {            this.type=type;            this.sender=sender;            this.id=id;            this.my_digest=digest;        }        public StateHeader(byte type, Address sender, long id, Digest digest, String state_id) {            this.type=type;            this.sender=sender;            this.id=id;            this.my_digest=digest;            this.state_id=state_id;        }        public int getType() {            return type;        }        public Digest getDigest() {            return my_digest;        }        public String getStateId() {            return state_id;        }        public boolean equals(Object o) {            StateHeader other;            if(sender != null && o != null) {                if(!(o instanceof StateHeader))                    return false;                other=(StateHeader)o;                return sender.equals(other.sender) && id == other.id;            }            return false;        }        public int hashCode() {            if(sender != null)                return sender.hashCode() + (int)id;            else                return (int)id;        }        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append("type=").append(type2Str(type));            if(sender != null) sb.append(", sender=").append(sender).append(" id=").append(id);            if(my_digest != null) sb.append(", digest=").append(my_digest);            if(state_id != null)                sb.append(", state_id=").append(state_id);            return sb.toString();        }        static String type2Str(int t) {            switch(t) {                case STATE_REQ:                    return "STATE_REQ";                case STATE_RSP:                    return "STATE_RSP";                default:                    return "<unknown>";            }        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeObject(sender);            out.writeLong(id);            out.writeByte(type);            out.writeObject(my_digest);            out.writeUTF(state_id);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            sender=(Address)in.readObject();            id=in.readLong();            type=in.readByte();            my_digest=(Digest)in.readObject();            state_id=in.readUTF();        }        public void writeTo(DataOutputStream out) throws IOException {            out.writeByte(type);            out.writeLong(id);            Util.writeAddress(sender, out);            Util.writeStreamable(my_digest, out);            Util.writeString(state_id, out);        }        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {            type=in.readByte();            id=in.readLong();            sender=Util.readAddress(in);            my_digest=(Digest)Util.readStreamable(Digest.class, in);            state_id=Util.readString(in);        }        public long size() {            long retval=Global.LONG_SIZE + Global.BYTE_SIZE; // id and type            retval+=Util.size(sender);            retval+=Global.BYTE_SIZE; // presence byte for my_digest            if(my_digest != null)                retval+=my_digest.serializedSize();            retval+=Global.BYTE_SIZE; // presence byte for state_id            if(state_id != null)                retval+=state_id.length() +2;            return retval;        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -