⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gms.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
     thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which     JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because     it won't be processed twice.     */    public void receiveUpEvent(Event evt) {        switch(evt.getType()) {            case Event.GET_DIGEST_OK:                digest_promise.setResult(evt.getArg());                return; // don't pass further up        }        super.receiveUpEvent(evt);    }    public void down(Event evt) {        switch(evt.getType()) {            case Event.CONNECT:                Object arg=null;                passDown(evt);                if(local_addr == null)                    if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null");                try {                    impl.join(local_addr);                }                catch(SecurityException e) {                    arg=e;                }                passUp(new Event(Event.CONNECT_OK, arg));                return;                              // don't pass down: was already passed down            case Event.DISCONNECT:                impl.leave((Address)evt.getArg());                if(!(impl instanceof CoordGmsImpl)) {                    passUp(new Event(Event.DISCONNECT_OK));                    initState(); // in case connect() is called again                }                break;       // pass down            case Event.SUSPEND_OK:            	  	            	flush_promise.setResult(Boolean.TRUE);			            	return;                            case Event.CONFIG :               Map config = (Map) evt.getArg();                              if(config != null && config.containsKey("flush_timeout")){                  Long ftimeout = (Long) config.get("flush_timeout");                  use_flush = true;                                    flush_timeout = ftimeout.longValue();                                              }               break;            }        passDown(evt);    }    /** Setup the Protocol instance according to the configuration string */    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("shun");        if(str != null) {            shun=Boolean.valueOf(str).booleanValue();            props.remove("shun");        }        str=props.getProperty("merge_leader");        if(str != null) {            merge_leader=Boolean.valueOf(str).booleanValue();            props.remove("merge_leader");        }        str=props.getProperty("print_local_addr");        if(str != null) {            print_local_addr=Boolean.valueOf(str).booleanValue();            props.remove("print_local_addr");        }        str=props.getProperty("join_timeout");           // time to wait for JOIN        if(str != null) {            join_timeout=Long.parseLong(str);            props.remove("join_timeout");        }        str=props.getProperty("join_retry_timeout");     // time to wait between JOINs        if(str != null) {            join_retry_timeout=Long.parseLong(str);            props.remove("join_retry_timeout");        }        str=props.getProperty("leave_timeout");           // time to wait until coord responds to LEAVE req.        if(str != null) {            leave_timeout=Long.parseLong(str);            props.remove("leave_timeout");        }        str=props.getProperty("merge_timeout");           // time to wait for MERGE_RSPS from subgroup coordinators        if(str != null) {            merge_timeout=Long.parseLong(str);            props.remove("merge_timeout");        }        str=props.getProperty("digest_timeout");          // time to wait for GET_DIGEST_OK from PBCAST        if(str != null) {            digest_timeout=Long.parseLong(str);            props.remove("digest_timeout");        }        str=props.getProperty("view_ack_collection_timeout");        if(str != null) {            view_ack_collection_timeout=Long.parseLong(str);            props.remove("view_ack_collection_timeout");        }        str=props.getProperty("resume_task_timeout");        if(str != null) {            resume_task_timeout=Long.parseLong(str);            props.remove("resume_task_timeout");        }        str=props.getProperty("disable_initial_coord");        if(str != null) {            disable_initial_coord=Boolean.valueOf(str).booleanValue();            props.remove("disable_initial_coord");        }        str=props.getProperty("handle_concurrent_startup");        if(str != null) {            handle_concurrent_startup=Boolean.valueOf(str).booleanValue();            props.remove("handle_concurrent_startup");        }        str=props.getProperty("num_prev_mbrs");        if(str != null) {            num_prev_mbrs=Integer.parseInt(str);            props.remove("num_prev_mbrs");        }        str=props.getProperty("use_flush");        if(str != null) {            use_flush=Boolean.valueOf(str).booleanValue();            props.remove("use_flush");        }        str=props.getProperty("flush_timeout");        if(str != null) {        	flush_timeout=Long.parseLong(str);            props.remove("flush_timeout");        }        str=props.getProperty("view_bundling");        if(str != null) {            view_bundling=Boolean.valueOf(str).booleanValue();            props.remove("view_bundling");        }        str=props.getProperty("max_bundling_time");        if(str != null) {            max_bundling_time=Long.parseLong(str);            props.remove("max_bundling_time");        }        if(props.size() > 0) {            log.error("the following properties are not recognized: " + props);            return false;        }        return true;    }    /* ------------------------------- Private Methods --------------------------------- */    final void initState() {        becomeClient();        view_id=null;        view=null;    }    /* --------------------------- End of Private Methods ------------------------------- */    public static class GmsHeader extends Header implements Streamable {        public static final byte JOIN_REQ=1;        public static final byte JOIN_RSP=2;        public static final byte LEAVE_REQ=3;        public static final byte LEAVE_RSP=4;        public static final byte VIEW=5;        public static final byte MERGE_REQ=6;        public static final byte MERGE_RSP=7;        public static final byte INSTALL_MERGE_VIEW=8;        public static final byte CANCEL_MERGE=9;        public static final byte VIEW_ACK=10;        byte type=0;        View view=null;            // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW        Address mbr=null;             // used when type=JOIN_REQ or LEAVE_REQ        JoinRsp join_rsp=null;        // used when type=JOIN_RSP        Digest my_digest=null;          // used when type=MERGE_RSP or INSTALL_MERGE_VIEW        ViewId merge_id=null;        // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE        boolean merge_rejected=false; // used when type=MERGE_RSP        public GmsHeader() {        } // used for Externalization        public GmsHeader(byte type) {            this.type=type;        }        /** Used for VIEW header */        public GmsHeader(byte type, View view) {            this.type=type;            this.view=view;        }        /** Used for JOIN_REQ or LEAVE_REQ header */        public GmsHeader(byte type, Address mbr) {            this.type=type;            this.mbr=mbr;        }        /** Used for JOIN_RSP header */        public GmsHeader(byte type, JoinRsp join_rsp) {            this.type=type;            this.join_rsp=join_rsp;        }        public byte getType() {            return type;        }        public Address getMemeber() {            return mbr;        }        public String toString() {            StringBuffer sb=new StringBuffer("GmsHeader");            sb.append('[' + type2String(type) + ']');            switch(type) {                case JOIN_REQ:                    sb.append(": mbr=" + mbr);                    break;                case JOIN_RSP:                    sb.append(": join_rsp=" + join_rsp);                    break;                case LEAVE_REQ:                    sb.append(": mbr=" + mbr);                    break;                case LEAVE_RSP:                    break;                case VIEW:                case VIEW_ACK:                    sb.append(": view=" + view);                    break;                case MERGE_REQ:                    sb.append(": merge_id=" + merge_id);                    break;                case MERGE_RSP:                    sb.append(": view=" + view + ", digest=" + my_digest + ", merge_rejected=" + merge_rejected +                            ", merge_id=" + merge_id);                    break;                case INSTALL_MERGE_VIEW:                    sb.append(": view=" + view + ", digest=" + my_digest);                    break;                case CANCEL_MERGE:                    sb.append(", <merge cancelled>, merge_id=" + merge_id);                    break;            }            return sb.toString();        }        public static String type2String(int type) {            switch(type) {                case JOIN_REQ: return "JOIN_REQ";                case JOIN_RSP: return "JOIN_RSP";                case LEAVE_REQ: return "LEAVE_REQ";                case LEAVE_RSP: return "LEAVE_RSP";                case VIEW: return "VIEW";                case MERGE_REQ: return "MERGE_REQ";                case MERGE_RSP: return "MERGE_RSP";                case INSTALL_MERGE_VIEW: return "INSTALL_MERGE_VIEW";                case CANCEL_MERGE: return "CANCEL_MERGE";                case VIEW_ACK: return "VIEW_ACK";                default: return "<unknown>";            }        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeByte(type);            out.writeObject(view);            out.writeObject(mbr);            out.writeObject(join_rsp);            out.writeObject(my_digest);            out.writeObject(merge_id);            out.writeBoolean(merge_rejected);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            type=in.readByte();            view=(View)in.readObject();            mbr=(Address)in.readObject();            join_rsp=(JoinRsp)in.readObject();            my_digest=(Digest)in.readObject();            merge_id=(ViewId)in.readObject();            merge_rejected=in.readBoolean();        }        public void writeTo(DataOutputStream out) throws IOException {            out.writeByte(type);            boolean isMergeView=view != null && view instanceof MergeView;            out.writeBoolean(isMergeView);            Util.writeStreamable(view, out);            Util.writeAddress(mbr, out);            Util.writeStreamable(join_rsp, out);            Util.writeStreamable(my_digest, out);            Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId            out.writeBoolean(merge_rejected);        }        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {            type=in.readByte();            boolean isMergeView=in.readBoolean();            if(isMergeView)                view=(View)Util.readStreamable(MergeView.class, in);            else                view=(View)Util.readStreamable(View.class, in);            mbr=Util.readAddress(in);            join_rsp=(JoinRsp)Util.readStreamable(JoinRsp.class, in);            my_digest=(Digest)Util.readStreamable(Digest.class, in);            merge_id=(ViewId)Util.readStreamable(ViewId.class, in);            merge_rejected=in.readBoolean();        }        public long size() {            long retval=Global.BYTE_SIZE *2; // type + merge_rejected            retval+=Global.BYTE_SIZE; // presence view            retval+=Global.BYTE_SIZE; // MergeView or View            if(view != null)                retval+=view.serializedSize();            retval+=Util.size(mbr);            retval+=Global.BYTE_SIZE; // presence of join_rsp            if(join_rsp != null)                retval+=join_rsp.serializedSize();            retval+=Global.BYTE_SIZE; // presence for my_digest            if(my_digest != null)                retval+=my_digest.serializedSize();            retval+=Global.BYTE_SIZE; // presence for merge_id            if(merge_id != null)                retval+=merge_id.serializedSize();            return retval;        }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -