📄 state_transfer.java
字号:
* * @return true if use of digests is required, false otherwise */ private boolean isDigestNeeded(){ return !use_flush; } private void requestApplicationStates() { synchronized(state_requesters) { Set appl_ids=new HashSet(state_requesters.keySet()); String id; for(Iterator it=appl_ids.iterator(); it.hasNext();) { id=(String)it.next(); StateTransferInfo info=new StateTransferInfo(null, id, 0L, null); passUp(new Event(Event.GET_APPLSTATE, info)); } } } /** Return the first element of members which is not me. Otherwise return null. */ private Address determineCoordinator() { Address ret=null; synchronized(members) { if(members != null && members.size() > 1) { for(int i=0; i < members.size(); i++) if(!local_addr.equals(members.elementAt(i))) return (Address)members.elementAt(i); } } return ret; } private void handleViewChange(View v) { Address old_coord; Vector new_members=v.getMembers(); boolean send_up_null_state_rsp=false; synchronized(members) { old_coord=(Address)(members.size() > 0? members.firstElement() : null); members.clear(); members.addAll(new_members); // this handles the case where a coord dies during a state transfer; prevents clients from hanging forever // Note this only takes a coordinator crash into account, a getState(target, timeout), where target is not // null is not handled ! (Usually we get the state from the coordinator) // http://jira.jboss.com/jira/browse/JGRP-148 if(waiting_for_state_response && old_coord != null && !members.contains(old_coord)) { send_up_null_state_rsp=true; } } if(send_up_null_state_rsp) { if(warn) log.warn("discovered that the state provider (" + old_coord + ") crashed; will return null state to application"); StateHeader hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, null, null); handleStateRsp(hdr, null); // sends up null GET_STATE_OK } } /** * If a state transfer is in progress, we don't need to send a GET_APPLSTATE event to the application, but * instead we just add the sender to the requester list so it will receive the same state when done. If not, * we add the sender to the requester list and send a GET_APPLSTATE event up. */ private void handleStateReq(StateHeader hdr) { Object sender=hdr.sender; if(sender == null) { if(log.isErrorEnabled()) log.error("sender is null !"); return; } String id=hdr.state_id; // id could be null, which means get the entire state synchronized(state_requesters) { boolean empty=state_requesters.size() == 0; Set requesters=(Set)state_requesters.get(id); if(requesters == null) { requesters=new HashSet(); state_requesters.put(id, requesters); } requesters.add(sender); if(!isDigestNeeded()) { // state transfer is in progress, digest was already requested requestApplicationStates(); } else if(empty){ digest=null; if(log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE"); passDown(new Event(Event.GET_DIGEST_STATE)); } } } /** Set the digest and the send the state up to the application */ void handleStateRsp(StateHeader hdr, byte[] state) { Address sender=hdr.sender; Digest tmp_digest=hdr.my_digest; String id=hdr.state_id; waiting_for_state_response=false; if(isDigestNeeded()){ if(tmp_digest == null) { if(warn) log.warn("digest received from " + sender + " is null, skipping setting digest !"); } else passDown(new Event(Event.SET_DIGEST, tmp_digest)); // set the digest (e.g. in NAKACK) } stop=System.currentTimeMillis(); // resume sending and handling of message garbage collection gossip messages, // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage // collection protocol (e.g. STABLE) if(log.isDebugEnabled()) log.debug("passing down a RESUME_STABLE event"); passDown(new Event(Event.RESUME_STABLE)); if(state == null) { if(warn) log.warn("state received from " + sender + " is null, will return null state to application"); } else log.debug("received state, size=" + state.length + " bytes. Time=" + (stop-start) + " milliseconds"); StateTransferInfo info=new StateTransferInfo(null, id, 0L, state); passUp(new Event(Event.GET_STATE_OK, info)); } private boolean startFlush(long timeout) { boolean successfulFlush=false; passUp(new Event(Event.SUSPEND)); try { flush_promise.reset(); flush_promise.getResultWithTimeout(timeout); successfulFlush=true; } catch(TimeoutException e) { log.warn("Initiator of flush and state requesting member " + local_addr + " timed out waiting for flush responses after " + flush_timeout + " msec"); } return successfulFlush; } private void stopFlush() { passUp(new Event(Event.RESUME)); } /* ------------------------ End of Private Methods ------------------------------ */ /** * Wraps data for a state request/response. Note that for a state response the actual state will <em>not</em * be stored in the header itself, but in the message's buffer. * */ public static class StateHeader extends Header implements Streamable { public static final byte STATE_REQ=1; public static final byte STATE_RSP=2; long id=0; // state transfer ID (to separate multiple state transfers at the same time) byte type=0; Address sender; // sender of state STATE_REQ or STATE_RSP Digest my_digest=null; // digest of sender (if type is STATE_RSP) String state_id=null; // for partial state transfer public StateHeader() { // for externalization } public StateHeader(byte type, Address sender, long id, Digest digest) { this.type=type; this.sender=sender; this.id=id; this.my_digest=digest; } public StateHeader(byte type, Address sender, long id, Digest digest, String state_id) { this.type=type; this.sender=sender; this.id=id; this.my_digest=digest; this.state_id=state_id; } public int getType() { return type; } public Digest getDigest() { return my_digest; } public String getStateId() { return state_id; } public boolean equals(Object o) { StateHeader other; if(sender != null && o != null) { if(!(o instanceof StateHeader)) return false; other=(StateHeader)o; return sender.equals(other.sender) && id == other.id; } return false; } public int hashCode() { if(sender != null) return sender.hashCode() + (int)id; else return (int)id; } public String toString() { StringBuffer sb=new StringBuffer(); sb.append("type=").append(type2Str(type)); if(sender != null) sb.append(", sender=").append(sender).append(" id=").append(id); if(my_digest != null) sb.append(", digest=").append(my_digest); if(state_id != null) sb.append(", state_id=").append(state_id); return sb.toString(); } static String type2Str(int t) { switch(t) { case STATE_REQ: return "STATE_REQ"; case STATE_RSP: return "STATE_RSP"; default: return "<unknown>"; } } public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(sender); out.writeLong(id); out.writeByte(type); out.writeObject(my_digest); out.writeUTF(state_id); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { sender=(Address)in.readObject(); id=in.readLong(); type=in.readByte(); my_digest=(Digest)in.readObject(); state_id=in.readUTF(); } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); out.writeLong(id); Util.writeAddress(sender, out); Util.writeStreamable(my_digest, out); Util.writeString(state_id, out); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); id=in.readLong(); sender=Util.readAddress(in); my_digest=(Digest)Util.readStreamable(Digest.class, in); state_id=Util.readString(in); } public long size() { long retval=Global.LONG_SIZE + Global.BYTE_SIZE; // id and type retval+=Util.size(sender); retval+=Global.BYTE_SIZE; // presence byte for my_digest if(my_digest != null) retval+=my_digest.serializedSize(); retval+=Global.BYTE_SIZE; // presence byte for state_id if(state_id != null) retval+=state_id.length() +2; return retval; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -