📄 gms.java
字号:
thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because it won't be processed twice. */ public void receiveUpEvent(Event evt) { switch(evt.getType()) { case Event.GET_DIGEST_OK: digest_promise.setResult(evt.getArg()); return; // don't pass further up } super.receiveUpEvent(evt); } public void down(Event evt) { switch(evt.getType()) { case Event.CONNECT: Object arg=null; passDown(evt); if(local_addr == null) if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null"); try { impl.join(local_addr); } catch(SecurityException e) { arg=e; } passUp(new Event(Event.CONNECT_OK, arg)); return; // don't pass down: was already passed down case Event.DISCONNECT: impl.leave((Address)evt.getArg()); if(!(impl instanceof CoordGmsImpl)) { passUp(new Event(Event.DISCONNECT_OK)); initState(); // in case connect() is called again } break; // pass down case Event.SUSPEND_OK: flush_promise.setResult(Boolean.TRUE); return; case Event.CONFIG : Map config = (Map) evt.getArg(); if(config != null && config.containsKey("flush_timeout")){ Long ftimeout = (Long) config.get("flush_timeout"); use_flush = true; flush_timeout = ftimeout.longValue(); } break; } passDown(evt); } /** Setup the Protocol instance according to the configuration string */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("shun"); if(str != null) { shun=Boolean.valueOf(str).booleanValue(); props.remove("shun"); } str=props.getProperty("merge_leader"); if(str != null) { merge_leader=Boolean.valueOf(str).booleanValue(); props.remove("merge_leader"); } str=props.getProperty("print_local_addr"); if(str != null) { print_local_addr=Boolean.valueOf(str).booleanValue(); props.remove("print_local_addr"); } str=props.getProperty("join_timeout"); // time to wait for JOIN if(str != null) { join_timeout=Long.parseLong(str); props.remove("join_timeout"); } str=props.getProperty("join_retry_timeout"); // time to wait between JOINs if(str != null) { join_retry_timeout=Long.parseLong(str); props.remove("join_retry_timeout"); } str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req. if(str != null) { leave_timeout=Long.parseLong(str); props.remove("leave_timeout"); } str=props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators if(str != null) { merge_timeout=Long.parseLong(str); props.remove("merge_timeout"); } str=props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST if(str != null) { digest_timeout=Long.parseLong(str); props.remove("digest_timeout"); } str=props.getProperty("view_ack_collection_timeout"); if(str != null) { view_ack_collection_timeout=Long.parseLong(str); props.remove("view_ack_collection_timeout"); } str=props.getProperty("resume_task_timeout"); if(str != null) { resume_task_timeout=Long.parseLong(str); props.remove("resume_task_timeout"); } str=props.getProperty("disable_initial_coord"); if(str != null) { disable_initial_coord=Boolean.valueOf(str).booleanValue(); props.remove("disable_initial_coord"); } str=props.getProperty("handle_concurrent_startup"); if(str != null) { handle_concurrent_startup=Boolean.valueOf(str).booleanValue(); props.remove("handle_concurrent_startup"); } str=props.getProperty("num_prev_mbrs"); if(str != null) { num_prev_mbrs=Integer.parseInt(str); props.remove("num_prev_mbrs"); } str=props.getProperty("use_flush"); if(str != null) { use_flush=Boolean.valueOf(str).booleanValue(); props.remove("use_flush"); } str=props.getProperty("flush_timeout"); if(str != null) { flush_timeout=Long.parseLong(str); props.remove("flush_timeout"); } str=props.getProperty("view_bundling"); if(str != null) { view_bundling=Boolean.valueOf(str).booleanValue(); props.remove("view_bundling"); } str=props.getProperty("max_bundling_time"); if(str != null) { max_bundling_time=Long.parseLong(str); props.remove("max_bundling_time"); } if(props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } /* ------------------------------- Private Methods --------------------------------- */ final void initState() { becomeClient(); view_id=null; view=null; } /* --------------------------- End of Private Methods ------------------------------- */ public static class GmsHeader extends Header implements Streamable { public static final byte JOIN_REQ=1; public static final byte JOIN_RSP=2; public static final byte LEAVE_REQ=3; public static final byte LEAVE_RSP=4; public static final byte VIEW=5; public static final byte MERGE_REQ=6; public static final byte MERGE_RSP=7; public static final byte INSTALL_MERGE_VIEW=8; public static final byte CANCEL_MERGE=9; public static final byte VIEW_ACK=10; byte type=0; View view=null; // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW Address mbr=null; // used when type=JOIN_REQ or LEAVE_REQ JoinRsp join_rsp=null; // used when type=JOIN_RSP Digest my_digest=null; // used when type=MERGE_RSP or INSTALL_MERGE_VIEW ViewId merge_id=null; // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE boolean merge_rejected=false; // used when type=MERGE_RSP public GmsHeader() { } // used for Externalization public GmsHeader(byte type) { this.type=type; } /** Used for VIEW header */ public GmsHeader(byte type, View view) { this.type=type; this.view=view; } /** Used for JOIN_REQ or LEAVE_REQ header */ public GmsHeader(byte type, Address mbr) { this.type=type; this.mbr=mbr; } /** Used for JOIN_RSP header */ public GmsHeader(byte type, JoinRsp join_rsp) { this.type=type; this.join_rsp=join_rsp; } public byte getType() { return type; } public Address getMemeber() { return mbr; } public String toString() { StringBuffer sb=new StringBuffer("GmsHeader"); sb.append('[' + type2String(type) + ']'); switch(type) { case JOIN_REQ: sb.append(": mbr=" + mbr); break; case JOIN_RSP: sb.append(": join_rsp=" + join_rsp); break; case LEAVE_REQ: sb.append(": mbr=" + mbr); break; case LEAVE_RSP: break; case VIEW: case VIEW_ACK: sb.append(": view=" + view); break; case MERGE_REQ: sb.append(": merge_id=" + merge_id); break; case MERGE_RSP: sb.append(": view=" + view + ", digest=" + my_digest + ", merge_rejected=" + merge_rejected + ", merge_id=" + merge_id); break; case INSTALL_MERGE_VIEW: sb.append(": view=" + view + ", digest=" + my_digest); break; case CANCEL_MERGE: sb.append(", <merge cancelled>, merge_id=" + merge_id); break; } return sb.toString(); } public static String type2String(int type) { switch(type) { case JOIN_REQ: return "JOIN_REQ"; case JOIN_RSP: return "JOIN_RSP"; case LEAVE_REQ: return "LEAVE_REQ"; case LEAVE_RSP: return "LEAVE_RSP"; case VIEW: return "VIEW"; case MERGE_REQ: return "MERGE_REQ"; case MERGE_RSP: return "MERGE_RSP"; case INSTALL_MERGE_VIEW: return "INSTALL_MERGE_VIEW"; case CANCEL_MERGE: return "CANCEL_MERGE"; case VIEW_ACK: return "VIEW_ACK"; default: return "<unknown>"; } } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); out.writeObject(view); out.writeObject(mbr); out.writeObject(join_rsp); out.writeObject(my_digest); out.writeObject(merge_id); out.writeBoolean(merge_rejected); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readByte(); view=(View)in.readObject(); mbr=(Address)in.readObject(); join_rsp=(JoinRsp)in.readObject(); my_digest=(Digest)in.readObject(); merge_id=(ViewId)in.readObject(); merge_rejected=in.readBoolean(); } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); boolean isMergeView=view != null && view instanceof MergeView; out.writeBoolean(isMergeView); Util.writeStreamable(view, out); Util.writeAddress(mbr, out); Util.writeStreamable(join_rsp, out); Util.writeStreamable(my_digest, out); Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId out.writeBoolean(merge_rejected); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); boolean isMergeView=in.readBoolean(); if(isMergeView) view=(View)Util.readStreamable(MergeView.class, in); else view=(View)Util.readStreamable(View.class, in); mbr=Util.readAddress(in); join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in); my_digest=(Digest)Util.readStreamable(Digest.class, in); merge_id=(ViewId)Util.readStreamable(ViewId.class, in); merge_rejected=in.readBoolean(); } public long size() { long retval=Global.BYTE_SIZE *2; // type + merge_rejected retval+=Global.BYTE_SIZE; // presence view retval+=Global.BYTE_SIZE; // MergeView or View if(view != null) retval+=view.serializedSize(); retval+=Util.size(mbr); retval+=Global.BYTE_SIZE; // presence of join_rsp if(join_rsp != null) retval+=join_rsp.serializedSize(); retval+=Global.BYTE_SIZE; // presence for my_digest if(my_digest != null) retval+=my_digest.serializedSize(); retval+=Global.BYTE_SIZE; // presence for merge_id if(merge_id != null) retval+=merge_id.serializedSize(); return retval; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -