⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coordgmsimpl.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            d.add(mbr, 0, 0); // ... and add the new member. it's first seqno will be 1            v=gms.getNextView(new_mbrs, null, null);            if(log.isDebugEnabled()) log.debug("joined member " + mbr + ", view is " + v);            join_rsp=new JoinRsp(v, d);            // 2. Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest            //    in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.            // Check NAKACK's TMP_VIEW handling for details            if(join_rsp.getView() != null)                gms.passDown(new Event(Event.TMP_VIEW, join_rsp.getView()));            Vector tmp_mbrs=join_rsp.getView() != null? new Vector(join_rsp.getView().getMembers()) : null;            if(gms.use_flush) {                //3a. FLUSH protocol is in use. First we FLUSH current members. Then we send a                // view to a joining member and we will wait for his ACK together with view                // ACKs from current members (castViewChangeWithDest). After all ACKS have been                // collected, FLUSH is stopped (below in finally clause) and thus members are                // allowed to send messages again.                gms.startFlush(join_rsp.getView());                sendJoinResponse(join_rsp, mbr);                gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs);            }            else {                //3b. Broadcast the new view                // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout),                // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in                // view V2 which may get dropped by existing members because they're still in view V1.                // (http://jira.jboss.com/jira/browse/JGRP-235)                if(tmp_mbrs != null)                    tmp_mbrs.remove(mbr); // exclude the newly joined member from VIEW_ACKs                gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs);                // 4. Return result to client                sendJoinResponse(join_rsp, mbr);            }        }        finally {            if(gms.use_flush)                gms.stopFlush();            gms.passDown(new Event(Event.RESUME_STABLE));        }    }*/    /**      Exclude <code>mbr</code> from the membership. If <code>suspected</code> is true, then      this member crashed and therefore is forced to leave, otherwise it is leaving voluntarily.      */    /*private void handleLeave(Address mbr, boolean suspected) {        View new_view=null;        Vector v=new Vector(1);        v.addElement(mbr);        // contains either leaving mbrs or suspected mbrs        if(log.isDebugEnabled()) log.debug("mbr=" + mbr);        if(!gms.members.contains(mbr)) {            if(trace) log.trace("mbr " + mbr + " is not a member !");            return;        }        if(gms.view_id == null) {            // we're probably not the coord anymore (we just left ourselves), let someone else do it            // (client will retry when it doesn't get a response            if(log.isDebugEnabled())                log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving +                        "); the new coordinator will handle the leave request");            return;        }        try {            sendLeaveResponse(mbr); // send an ack to the leaving member            if(suspected)                new_view=gms.getNextView(null, null, v);            else                new_view=gms.getNextView(null, v, null);            if(gms.use_flush) {                gms.startFlush(new_view);            }            gms.castViewChange(new_view, null);        }        finally {            if(gms.use_flush) {                gms.stopFlush();            }        }        if(leaving) {            gms.passUp(new Event(Event.DISCONNECT_OK));            gms.initState(); // in case connect() is called again        }    }*/    public void handleMembershipChange(Collection new_mbrs, Collection leaving_mbrs, Collection suspected_mbrs) {        if(new_mbrs       == null) new_mbrs=new LinkedHashSet(0);        if(suspected_mbrs == null) suspected_mbrs=new LinkedHashSet(0);        if(leaving_mbrs   == null) leaving_mbrs=new LinkedHashSet(0);        boolean joining_mbrs=!new_mbrs.isEmpty();        new_mbrs.remove(gms.local_addr); // remove myself - cannot join myself (already joined)        if(gms.view_id == null) {            // we're probably not the coord anymore (we just left ourselves), let someone else do it            // (client will retry when it doesn't get a response)            if(log.isDebugEnabled())                log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving +                        "); the new coordinator will handle the leave request");            return;        }        Vector current_members=gms.members.getMembers();        leaving_mbrs.retainAll(current_members); // remove all elements of leaving_mbrs which are not current members        if(suspected_mbrs.remove(gms.local_addr)) {            if(warn) log.warn("I am the coord and I'm being suspected -- will probably leave shortly");        }        suspected_mbrs.retainAll(current_members); // remove all elements of suspected_mbrs which are not current members        // for the members that have already joined, return the current digest and membership        for(Iterator it=new_mbrs.iterator(); it.hasNext();) {            Address mbr=(Address)it.next();            if(gms.members.contains(mbr)) { // already joined: return current digest and membership                if(warn)                    log.warn(mbr + " already present; returning existing view " + gms.view);                JoinRsp join_rsp=new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest());                sendJoinResponse(join_rsp, mbr);                it.remove();            }        }        if(new_mbrs.isEmpty() && leaving_mbrs.isEmpty() && suspected_mbrs.isEmpty()) {            if(trace)                log.trace("found no members to add or remove, will not create new view");            return;        }        JoinRsp join_rsp=null;        try {            View new_view=gms.getNextView(new_mbrs, leaving_mbrs, suspected_mbrs);            if(log.isDebugEnabled())                log.debug("new=" + new_mbrs + ", suspected=" + suspected_mbrs + ", leaving=" + leaving_mbrs +                        ", new view: " + new_view);            // we cannot garbage collect during joining a new member *if* we're the only member            // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those            // in the digest returned to the client, so the client will *not* be able to ask for retransmission            // of those messages if he misses them            if(joining_mbrs) {                gms.passDown(new Event(Event.SUSPEND_STABLE, MAX_SUSPEND_TIMEOUT));                Digest d=null, tmp=gms.getDigest(); // get existing digest                if(tmp == null)                    log.error("received null digest from GET_DIGEST: will cause JOIN to fail");                else {                    // create a new digest, which contains the new member                    d=new Digest(tmp.size() + new_mbrs.size());                    d.add(tmp); // add the existing digest to the new one                    for(Iterator i=new_mbrs.iterator(); i.hasNext();)                        d.add((Address)i.next(), 0, 0); // ... and add the new members. their first seqno will be 1                }                join_rsp=new JoinRsp(new_view, d);            }            sendLeaveResponses(leaving_mbrs); // no-op if no leaving members            // Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest            // in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.            // Check NAKACK's TMP_VIEW handling for details            if(new_view != null)                gms.passDown(new Event(Event.TMP_VIEW, new_view));            Vector tmp_mbrs=new_view != null? new Vector(new_view.getMembers()) : null;                     if(gms.use_flush) {                // First we flush current members. Then we send a view to all joining member and we wait for their ACKs                // together with ACKs from current members. After all ACKS have been collected, FLUSH is stopped                // (below in finally clause) and members are allowed to send messages again                gms.startFlush(new_view);                sendJoinResponses(join_rsp, new_mbrs); // might be a no-op if no joining members                gms.castViewChangeWithDest(new_view, null, tmp_mbrs);            }            else {               if(tmp_mbrs != null) // exclude the newly joined member from VIEW_ACKs                  tmp_mbrs.removeAll(new_mbrs);                // Broadcast the new view                // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout),                // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in                // view V2 which may get dropped by existing members because they're still in view V1.                // (http://jira.jboss.com/jira/browse/JGRP-235)                gms.castViewChangeWithDest(new_view, null, tmp_mbrs);                sendJoinResponses(join_rsp, new_mbrs); // Return result to newly joined clients (if there are any)            }        }        finally {            if(joining_mbrs)                gms.passDown(new Event(Event.RESUME_STABLE));            if(gms.use_flush)                gms.stopFlush();            if(leaving) {                gms.passUp(new Event(Event.DISCONNECT_OK));                gms.initState(); // in case connect() is called again            }        }    }    /**     * Called by the GMS when a VIEW is received.     * @param new_view The view to be installed     * @param digest   If view is a MergeView, digest contains the seqno digest of all members and has to     *                 be set by GMS     */    public void handleViewChange(View new_view, Digest digest) {        Vector mbrs=new_view.getMembers();        if(log.isDebugEnabled()) {            if(digest != null)                log.debug("view=" + new_view + ", digest=" + digest);            else                log.debug("view=" + new_view);        }        if(leaving && !mbrs.contains(gms.local_addr))            return;        gms.installView(new_view, digest);    }    public void handleExit() {        cancelMerge();    }    public void stop() {        super.stop(); // sets leaving=false        stopMergeTask();    }    /* ------------------------------------------ Private methods ----------------------------------------- */    void startMergeTask(Vector coords) {        synchronized(merge_task) {            merge_task.start(coords);        }    }    void stopMergeTask() {        synchronized(merge_task) {            merge_task.stop();        }    }    private void sendJoinResponses(JoinRsp rsp, Collection c) {        if(c != null && rsp != null) {            for(Iterator it=c.iterator(); it.hasNext();) {                sendJoinResponse(rsp, (Address)it.next());            }        }    }    private void sendJoinResponse(JoinRsp rsp, Address dest) {        Message m=new Message(dest, null, null);        GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, rsp);        m.putHeader(gms.getName(), hdr);        gms.passDown(new Event(Event.MSG, m));    }    private void sendLeaveResponses(Collection c) {        for(Iterator i=c.iterator(); i.hasNext();) {            Message msg=new Message((Address)i.next(), null, null); // send an ack to the leaving member            GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP);            msg.putHeader(gms.getName(), hdr);            gms.passDown(new Event(Event.MSG, msg));        }    }    /**     * Sends a MERGE_REQ to all coords and populates a list of MergeData (in merge_rsps). Returns after coords.size()     * response have been received, or timeout msecs have elapsed (whichever is first).<p>     * If a subgroup coordinator rejects the MERGE_REQ (e.g. because of participation in a different merge),     * <em>that member will be removed from coords !</em>     * @param coords A list of Addresses of subgroup coordinators (inluding myself)     * @param timeout Max number of msecs to wait for the merge responses from the subgroup coords     */    private void getMergeDataFromSubgroupCoordinators(Vector coords, long timeout) {        Message msg;        GMS.GmsHeader hdr;        long curr_time, time_to_wait, end_time, start, stop;        int num_rsps_expected;        if(coords == null || coords.size() <= 1) {            if(log.isErrorEnabled()) log.error("coords == null or size <= 1");            return;        }        start=System.currentTimeMillis();        MergeData tmp;        synchronized(merge_rsps) {            merge_rsps.removeAllElements();            if(log.isDebugEnabled()) log.debug("sending MERGE_REQ to " + coords);            Address coord;            for(int i=0; i < coords.size(); i++) {                coord=(Address)coords.elementAt(i);                if(gms.local_addr != null && gms.local_addr.equals(coord)) {                    tmp=getMergeResponse(gms.local_addr, merge_id);                    if(tmp != null)                        merge_rsps.add(tmp);                    continue;                }                // this allows UNICAST to remove coord from previous_members in case of a merge                gms.passDown(new Event(Event.ENABLE_UNICASTS_TO, coord));                msg=new Message(coord, null, null);                hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ);                hdr.mbr=gms.local_addr;                hdr.merge_id=merge_id;                msg.putHeader(gms.getName(), hdr);                gms.passDown(new Event(Event.MSG, msg));            }            // wait until num_rsps_expected >= num_rsps or timeout elapsed            num_rsps_expected=coords.size();            curr_time=System.currentTimeMillis();            end_time=curr_time + timeout;            while(end_time > curr_time) {                time_to_wait=end_time - curr_time;                if(log.isDebugEnabled()) log.debug("waiting " + time_to_wait + " msecs for merge responses");                if(merge_rsps.size() < num_rsps_expected) {                    try {                        merge_rsps.wait(time_to_wait);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -