⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coordgmsimpl.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                    }                    catch(Exception ex) {                    }                }                if(log.isDebugEnabled())                    log.debug("num_rsps_expected=" + num_rsps_expected + ", actual responses=" + merge_rsps.size());                if(merge_rsps.size() >= num_rsps_expected)                    break;                curr_time=System.currentTimeMillis();            }            stop=System.currentTimeMillis();            if(trace)                log.trace("collected " + merge_rsps.size() + " merge response(s) in " + (stop-start) + "ms");        }    }    /**     * Generates a unique merge id by taking the local address and the current time     */    private ViewId generateMergeId() {        return new ViewId(gms.local_addr, System.currentTimeMillis());        // we're (ab)using ViewId as a merge id    }    /**     * Merge all MergeData. All MergeData elements should be disjunct (both views and digests). However,     * this method is prepared to resolve duplicate entries (for the same member). Resolution strategy for     * views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher     * seqnos for duplicate digests.<p>     * After merging all members into a Membership and subsequent sorting, the first member of the sorted membership     * will be the new coordinator.     * @param v A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed     *          not to be null and to contain at least 1 member.     */    private MergeData consolidateMergeData(Vector v) {        MergeData ret;        MergeData tmp_data;        long logical_time=0; // for new_vid        ViewId new_vid, tmp_vid;        MergeView new_view;        View tmp_view;        Membership new_mbrs=new Membership();        int num_mbrs;        Digest new_digest;        Address new_coord;        Vector subgroups=new Vector(11);        // contains a list of Views, each View is a subgroup        for(int i=0; i < v.size(); i++) {            tmp_data=(MergeData)v.elementAt(i);            if(log.isDebugEnabled()) log.debug("merge data is " + tmp_data);            tmp_view=tmp_data.getView();            if(tmp_view != null) {                tmp_vid=tmp_view.getVid();                if(tmp_vid != null) {                    // compute the new view id (max of all vids +1)                    logical_time=Math.max(logical_time, tmp_vid.getId());                }                // merge all membership lists into one (prevent duplicates)                new_mbrs.add(tmp_view.getMembers());                subgroups.addElement(tmp_view.clone());            }        }        // the new coordinator is the first member of the consolidated & sorted membership list        new_mbrs.sort();        num_mbrs=new_mbrs.size();        new_coord=num_mbrs > 0? (Address)new_mbrs.elementAt(0) : null;        if(new_coord == null) {            if(log.isErrorEnabled()) log.error("new_coord == null");            return null;        }        // should be the highest view ID seen up to now plus 1        new_vid=new ViewId(new_coord, logical_time + 1);        // determine the new view        new_view=new MergeView(new_vid, new_mbrs.getMembers(), subgroups);        if(log.isDebugEnabled()) log.debug("new merged view will be " + new_view);        // determine the new digest        new_digest=consolidateDigests(v, num_mbrs);        if(new_digest == null) {            if(log.isErrorEnabled()) log.error("digest could not be consolidated");            return null;        }        if(log.isDebugEnabled()) log.debug("consolidated digest=" + new_digest);        ret=new MergeData(gms.local_addr, new_view, new_digest);        return ret;    }    /**     * Merge all digests into one. For each sender, the new value is min(low_seqno), max(high_seqno),     * max(high_seqno_seen)     */    private Digest consolidateDigests(Vector v, int num_mbrs) {        MergeData data;        Digest tmp_digest, retval=new Digest(num_mbrs);        for(int i=0; i < v.size(); i++) {            data=(MergeData)v.elementAt(i);            tmp_digest=data.getDigest();            if(tmp_digest == null) {                if(log.isErrorEnabled()) log.error("tmp_digest == null; skipping");                continue;            }            retval.merge(tmp_digest);        }        return retval;    }    /**     * Sends the new view and digest to all subgroup coordinors in coords. Each coord will in turn     * <ol>     * <li>cast the new view and digest to all the members of its subgroup (MergeView)     * <li>on reception of the view, if it is a MergeView, each member will set the digest and install     *     the new view     * </ol>     */    private void sendMergeView(Vector coords, MergeData combined_merge_data) {        Message msg;        GMS.GmsHeader hdr;        Address coord;        View v;        Digest d;        if(coords == null || combined_merge_data == null)            return;        v=combined_merge_data.view;        d=combined_merge_data.digest;        if(v == null || d == null) {            if(log.isErrorEnabled()) log.error("view or digest is null, cannot send consolidated merge view/digest");            return;        }        if(trace)            log.trace("sending merge view " + v.getVid() + " to coordinators " + coords);        for(int i=0; i < coords.size(); i++) {            coord=(Address)coords.elementAt(i);            msg=new Message(coord, null, null);            hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW);            hdr.view=v;            hdr.my_digest=d;            hdr.merge_id=merge_id;            msg.putHeader(gms.getName(), hdr);            gms.passDown(new Event(Event.MSG, msg));        }    }    /**     * Send back a response containing view and digest to sender     */    private void sendMergeResponse(Address sender, View view, Digest digest) {        Message msg=new Message(sender, null, null);        GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP);        hdr.merge_id=merge_id;        hdr.view=view;        hdr.my_digest=digest;        msg.putHeader(gms.getName(), hdr);        if(log.isDebugEnabled()) log.debug("response=" + hdr);        gms.passDown(new Event(Event.MSG, msg));    }    private void sendMergeCancelledMessage(Vector coords, ViewId merge_id) {        Message msg;        GMS.GmsHeader hdr;        Address coord;        if(coords == null || merge_id == null) {            if(log.isErrorEnabled()) log.error("coords or merge_id == null");            return;        }        for(int i=0; i < coords.size(); i++) {            coord=(Address)coords.elementAt(i);            msg=new Message(coord, null, null);            hdr=new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE);            hdr.merge_id=merge_id;            msg.putHeader(gms.getName(), hdr);            gms.passDown(new Event(Event.MSG, msg));        }    }    /** Removed rejected merge requests from merge_rsps and coords */    private void removeRejectedMergeRequests(Vector coords) {        MergeData data;        for(Iterator it=merge_rsps.iterator(); it.hasNext();) {            data=(MergeData)it.next();            if(data.merge_rejected) {                if(data.getSender() != null && coords != null)                    coords.removeElement(data.getSender());                it.remove();                if(log.isDebugEnabled()) log.debug("removed element " + data);            }        }    }    /* --------------------------------------- End of Private methods ------------------------------------- */    /**     * Starts the merge protocol (only run by the merge leader). Essentially sends a MERGE_REQ to all     * coordinators of all subgroups found. Each coord receives its digest and view and returns it.     * The leader then computes the digest and view for the new group from the return values. Finally, it     * sends this merged view/digest to all subgroup coordinators; each coordinator will install it in their     * subgroup.     */    private class MergeTask implements Runnable {        Thread t=null;        Vector coords=null; // list of subgroup coordinators to be contacted        public void start(Vector coords) {            if(t == null || !t.isAlive()) {                this.coords=(Vector)(coords != null? coords.clone() : null);                t=new Thread(this, "MergeTask");                t.setDaemon(true);                t.start();            }        }        public void stop() {            Thread tmp=t;            if(isRunning()) {                t=null;                tmp.interrupt();            }            t=null;            coords=null;        }        public boolean isRunning() {            return t != null && t.isAlive();        }        /**         * Runs the merge protocol as a leader         */        public void run() {            MergeData combined_merge_data;            if(merging == true) {                if(warn) log.warn("merge is already in progress, terminating");                return;            }            if(log.isDebugEnabled()) log.debug("merge task started, coordinators are " + this.coords);            try {                /* 1. Generate a merge_id that uniquely identifies the merge in progress */                setMergeId(generateMergeId());                /* 2. Fetch the current Views/Digests from all subgroup coordinators */                getMergeDataFromSubgroupCoordinators(coords, gms.merge_timeout);                /* 3. Remove rejected MergeData elements from merge_rsp and coords (so we'll send the new view only                   to members who accepted the merge request) */                removeRejectedMergeRequests(coords);                if(merge_rsps.size() <= 1) {                    if(warn)                        log.warn("merge responses from subgroup coordinators <= 1 (" + merge_rsps + "). Cancelling merge");                    sendMergeCancelledMessage(coords, merge_id);                    return;                }                /* 4. Combine all views and digests into 1 View/1 Digest */                combined_merge_data=consolidateMergeData(merge_rsps);                if(combined_merge_data == null) {                    if(log.isErrorEnabled()) log.error("combined_merge_data == null");                    sendMergeCancelledMessage(coords, merge_id);                    return;                }                /* 5. Don't allow JOINs or LEAVEs until we are done with the merge. Suspend() will clear the                      view handler queue, so no requests beyond this current MERGE request will be processed */                gms.getViewHandler().suspend(merge_id);                /* 6. Send the new View/Digest to all coordinators (including myself). On reception, they will                   install the digest and view in all of their subgroup members */                sendMergeView(coords, combined_merge_data);            }            catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("exception while merging", ex);            }            finally {                sendMergeCancelledMessage(coords, merge_id);                stopMergeCanceller(); // this is probably not necessary                merging=false;                merge_leader=null;                if(log.isDebugEnabled()) log.debug("merge task terminated");                t=null;            }        }    }    private class MergeCanceller implements TimeScheduler.Task {        private Object my_merge_id=null;        private long timeout;        private boolean cancelled=false;        MergeCanceller(Object my_merge_id, long timeout) {            this.my_merge_id=my_merge_id;            this.timeout=timeout;        }        public boolean cancelled() {            return cancelled;        }        public void cancel() {            cancelled=true;        }        public long nextInterval() {            return timeout;        }        public void run() {            if(merge_id != null && my_merge_id.equals(merge_id)) {                if(trace)                    log.trace("cancelling merge due to timer timeout (" + timeout + " ms)");                cancelMerge();                cancelled=true;            }            else {                if(trace)                    log.trace("timer kicked in after " + timeout + " ms, but no (or different) merge was in progress: " +                              "merge_id=" + merge_id + ", my_merge_id=" + my_merge_id);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -