⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 coordgmsimpl.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
// $Id: CoordGmsImpl.java,v 1.52 2006/10/23 16:12:23 vlada Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.util.TimeScheduler;import java.util.*;/** * Coordinator role of the Group MemberShip (GMS) protocol. Accepts JOIN and LEAVE requests and emits view changes * accordingly. * @author Bela Ban */public class CoordGmsImpl extends GmsImpl {    private boolean          merging=false;    private final MergeTask  merge_task=new MergeTask();    private final Vector     merge_rsps=new Vector(11);    // for MERGE_REQ/MERGE_RSP correlation, contains MergeData elements    private ViewId           merge_id=null;    private Address          merge_leader=null;    private MergeCanceller   merge_canceller=null;    private final Object     merge_canceller_mutex=new Object();    /** the max time in ms to suspend message garbage collection */    private final Long       MAX_SUSPEND_TIMEOUT=new Long(30000);    public CoordGmsImpl(GMS g) {        super(g);    }    private void setMergeId(ViewId merge_id) {        this.merge_id=merge_id;        synchronized(merge_canceller_mutex) {            if(this.merge_id != null) {                stopMergeCanceller();                merge_canceller=new MergeCanceller(this.merge_id, gms.merge_timeout);                gms.timer.add(merge_canceller);            }            else { // merge completed                stopMergeCanceller();            }        }    }    private void stopMergeCanceller() {        synchronized(merge_canceller_mutex) {            if(merge_canceller != null) {                merge_canceller.cancel();                merge_canceller=null;            }        }    }    public void init() throws Exception {        super.init();        cancelMerge();    }    public void join(Address mbr) {        wrongMethod("join");    }    /** The coordinator itself wants to leave the group */    public void leave(Address mbr) {        if(mbr == null) {            if(log.isErrorEnabled()) log.error("member's address is null !");            return;        }        if(mbr.equals(gms.local_addr))            leaving=true;        gms.getViewHandler().add(new GMS.Request(GMS.Request.LEAVE, mbr, false, null));        gms.getViewHandler().stop(true); // wait until all requests have been processed, then close the queue and leave        gms.getViewHandler().waitUntilCompleted(gms.leave_timeout);    }    public void handleJoinResponse(JoinRsp join_rsp) {    }    public void handleLeaveResponse() {    }    public void suspect(Address mbr) {        if(mbr.equals(gms.local_addr)) {            if(warn) log.warn("I am the coord and I'm being am suspected -- will probably leave shortly");            return;        }        Collection emptyVector=new LinkedHashSet(0);        Collection suspected=new LinkedHashSet(1);        suspected.add(mbr);        handleMembershipChange(emptyVector, emptyVector, suspected);    }    public void unsuspect(Address mbr) {    }    /**     * Invoked upon receiving a MERGE event from the MERGE layer. Starts the merge protocol.     * See description of protocol in DESIGN.     * @param other_coords A list of coordinators (including myself) found by MERGE protocol     */    public void merge(Vector other_coords) {        Membership tmp;        if(merging) {            if(warn) log.warn("merge already in progress, discarded MERGE event (I am " + gms.local_addr + ")");            return;        }        merge_leader=null;        if(other_coords == null) {            if(warn) log.warn("list of other coordinators is null. Will not start merge.");            return;        }        if(other_coords.size() <= 1) {            if(log.isErrorEnabled())                log.error("number of coordinators found is " + other_coords.size() + "; will not perform merge");            return;        }        /* Establish deterministic order, so that coords can elect leader */        tmp=new Membership(other_coords);        tmp.sort();        merge_leader=(Address)tmp.elementAt(0);        // if(log.isDebugEnabled()) log.debug("coordinators in merge protocol are: " + tmp);        if(merge_leader.equals(gms.local_addr) || gms.merge_leader) {            if(trace)                log.trace("I (" + gms.local_addr + ") will be the leader. Starting the merge task");            startMergeTask(other_coords);        }        else {            if(trace) log.trace("I (" + gms.local_addr + ") am not the merge leader, " +                    "waiting for merge leader (" + merge_leader + ")to initiate merge");        }    }    /**     * Get the view and digest and send back both (MergeData) in the form of a MERGE_RSP to the sender.     * If a merge is already in progress, send back a MergeData with the merge_rejected field set to true.     */    public void handleMergeRequest(Address sender, ViewId merge_id) {        Digest digest;        View view;        if(sender == null) {            if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response");            return;        }        if(merging) {            if(log.isErrorEnabled()) log.error("merge already in progress");            sendMergeRejectedResponse(sender, merge_id);            return;        }        merging=true;        /* Clears the view handler queue and discards all JOIN/LEAVE/MERGE requests until after the MERGE  */        gms.getViewHandler().suspend(merge_id);        setMergeId(merge_id);        if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id);        digest=gms.getDigest();        view=new View(gms.view_id.copy(), gms.members.getMembers());        gms.passDown(new Event(Event.ENABLE_UNICASTS_TO, sender));        sendMergeResponse(sender, view, digest);    }    private MergeData getMergeResponse(Address sender, ViewId merge_id) {        Digest         digest;        View           view;        MergeData      retval;        if(sender == null) {            if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response");            return null;        }        if(merging) {            if(log.isErrorEnabled()) log.error("merge already in progress");            retval=new MergeData(sender, null, null);            retval.merge_rejected=true;            return retval;        }        merging=true;        setMergeId(merge_id);        if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id);        try {            digest=gms.getDigest();            view=new View(gms.view_id.copy(), gms.members.getMembers());            retval=new MergeData(sender, view, digest);            retval.view=view;            retval.digest=digest;        }        catch(NullPointerException null_ex) {            return null;        }        return retval;    }    public void handleMergeResponse(MergeData data, ViewId merge_id) {        if(data == null) {            if(log.isErrorEnabled()) log.error("merge data is null");            return;        }        if(merge_id == null || this.merge_id == null) {            if(log.isErrorEnabled())                log.error("merge_id ("                    + merge_id                    + ") or this.merge_id ("                    + this.merge_id                    + ") is null (sender="                    + data.getSender()                    + ").");            return;        }        if(!this.merge_id.equals(merge_id)) {            if(log.isErrorEnabled()) log.error("this.merge_id ("                    + this.merge_id                    + ") is different from merge_id ("                    + merge_id                    + ')');            return;        }        synchronized(merge_rsps) {            if(!merge_rsps.contains(data)) {                merge_rsps.addElement(data);                merge_rsps.notifyAll();            }        }    }    /**     * If merge_id is not equal to this.merge_id then discard.     * Else cast the view/digest to all members of this group.     */    public void handleMergeView(MergeData data, ViewId merge_id) {        if(merge_id == null                || this.merge_id == null                || !this.merge_id.equals(merge_id)) {            if(log.isErrorEnabled()) log.error("merge_ids don't match (or are null); merge view discarded");            return;        }        java.util.List my_members=gms.view != null? gms.view.getMembers() : null;        // only send to our *current* members, if we have A and B being merged (we are B), then we would *not*        // receive a VIEW_ACK from A because A doesn't see us in the pre-merge view yet and discards the view        GMS.Request req=new GMS.Request(GMS.Request.VIEW);        req.view=data.view;        req.digest=data.digest;        req.target_members=my_members;        gms.getViewHandler().add(req, true, // at head so it is processed next                                 true);     // un-suspend the queue        merging=false;    }    public void handleMergeCancelled(ViewId merge_id) {        if(merge_id != null                && this.merge_id != null                && this.merge_id.equals(merge_id)) {            if(log.isDebugEnabled()) log.debug("merge was cancelled (merge_id=" + merge_id + ", local_addr=" +                    gms.local_addr +")");            setMergeId(null);            this.merge_leader=null;            merging=false;            gms.getViewHandler().resume(merge_id);        }    }    private void cancelMerge() {        Object tmp=merge_id;        if(merge_id != null && log.isDebugEnabled()) log.debug("cancelling merge (merge_id=" + merge_id + ')');        setMergeId(null);        this.merge_leader=null;        stopMergeTask();        merging=false;        synchronized(merge_rsps) {            merge_rsps.clear();        }        gms.getViewHandler().resume(tmp);    }    /**     * Computes the new view (including the newly joined member) and get the digest from PBCAST.     * Returns both in the form of a JoinRsp     */    /*private synchronized void handleJoin(Address mbr) {        View v;        Digest d, tmp;        JoinRsp join_rsp;        if(mbr == null) {            if(log.isErrorEnabled()) log.error("mbr is null");            return;        }        if(gms.local_addr.equals(mbr)) {            if(log.isErrorEnabled()) log.error("cannot join myself !");            return;        }        if(log.isDebugEnabled()) log.debug("mbr=" + mbr);        if(gms.members.contains(mbr)) { // already joined: return current digest and membership            if(log.isWarnEnabled())                log.warn(mbr + " already present; returning existing view " + gms.view);            join_rsp=new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest());            sendJoinResponse(join_rsp, mbr);            return;        }        try {            // we cannot garbage collect during joining a new member *if* we're the only member            // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those in the            // digest returned to the client, so the client will *not* be able to ask for retransmission of those            // messages if he misses them            gms.passDown(new Event(Event.SUSPEND_STABLE, MAX_SUSPEND_TIMEOUT));            Vector new_mbrs=new Vector(1);            new_mbrs.addElement(mbr);            tmp=gms.getDigest(); // get existing digest            if(tmp == null) {                if(log.isErrorEnabled()) log.error("received null digest from GET_DIGEST: will cause JOIN to fail");                return;            }            d=new Digest(tmp.size() + 1); // create a new digest, which contains 1 more member            d.add(tmp); // add the existing digest to the new one

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -