⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gms.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
     * @param members     */    public void castViewChangeWithDest(View new_view, Digest digest, java.util.List members) {        Message   view_change_msg;        GmsHeader hdr;        long      start, stop;        ViewId    vid=new_view.getVid();        int       size=-1;        if(members == null || members.size() == 0)            members=new_view.getMembers();        if(log.isTraceEnabled())            log.trace("mcasting view {" + new_view + "} (" + new_view.size() + " mbrs)\n");        start=System.currentTimeMillis();        view_change_msg=new Message(); // bcast to all members        hdr=new GmsHeader(GmsHeader.VIEW, new_view);        hdr.my_digest=digest;        view_change_msg.putHeader(name, hdr);        ack_collector.reset(vid, members);        size=ack_collector.size();        passDown(new Event(Event.MSG, view_change_msg));        try {            ack_collector.waitForAllAcks(view_ack_collection_timeout);            stop=System.currentTimeMillis();            if(trace)                log.trace("received all ACKs (" + size + ") for " + vid + " in " + (stop-start) + "ms");        }        catch(TimeoutException e) {            log.warn("failed to collect all ACKs (" + size + ") for view " + new_view + " after " + view_ack_collection_timeout +                    "ms, missing ACKs from " + ack_collector.printMissing() + " (received=" + ack_collector.printReceived() +                    "), local_addr=" + local_addr);        }    }    /**     * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass     * of View), then digest will be non-null and has to be set before installing the view.     */    public void installView(View new_view, Digest digest) {        if(digest != null)            mergeDigest(digest);        installView(new_view);    }    /**     * Sets the new view and sends a VIEW_CHANGE event up and down the stack.     */    public void installView(View new_view) {        Address coord;        int rc;        ViewId vid=new_view.getVid();        Vector mbrs=new_view.getMembers();        if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view);        if(stats) {            num_views++;            prev_views.add(new_view);        }        ack_collector.handleView(new_view);        // Discards view with id lower than our own. Will be installed without check if first view        if(view_id != null) {            rc=vid.compareTo(view_id);            if(rc <= 0) {                if(log.isTraceEnabled() && rc < 0) // only scream if view is smaller, silently discard same views                    log.trace("[" + local_addr + "] received view < current view;" +                            " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');                return;            }        }        ltime=Math.max(vid.getId(), ltime);  // compute Lamport logical time        /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.        This ensures that messages sent in view V1 are only received by members of V1 */        if(checkSelfInclusion(mbrs) == false) {            // only shun if this member was previously part of the group. avoids problem where multiple            // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view            // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership            // bela Nov 20 2003            if(shun && local_addr != null && prev_members.contains(local_addr)) {                if(warn)                    log.warn("I (" + local_addr + ") am not a member of view " + new_view +                            ", shunning myself and leaving the group (prev_members are " + prev_members +                            ", current view is " + view + ")");                if(impl != null)                    impl.handleExit();                passUp(new Event(Event.EXIT));            }            else {                if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + "; discarding view");            }            return;        }        synchronized(members) {   // serialize access to views            // assign new_view to view_id            if(new_view instanceof MergeView)                view=new View(new_view.getVid(), new_view.getMembers());            else                view=new_view;            view_id=vid.copy();            // Set the membership. Take into account joining members            if(mbrs != null && mbrs.size() > 0) {                members.set(mbrs);                tmp_members.set(members);                joining.removeAll(mbrs);  // remove all members in mbrs from joining                // remove all elements from 'leaving' that are not in 'mbrs'                leaving.retainAll(mbrs);                tmp_members.add(joining);    // add members that haven't yet shown up in the membership                tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership                // add to prev_members                for(Iterator it=mbrs.iterator(); it.hasNext();) {                    Address addr=(Address)it.next();                    if(!prev_members.contains(addr))                        prev_members.add(addr);                }            }            // Send VIEW_CHANGE event up and down the stack:            Event view_event=new Event(Event.VIEW_CHANGE, new_view.clone());            // changed order of passing view up and down (http://jira.jboss.com/jira/browse/JGRP-347)            passUp(view_event);            passDown(view_event); // needed e.g. by failure detector or UDP            coord=determineCoordinator();            // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {            // changed on suggestion by yaronr and Nicolas Piedeloupe            if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) {                becomeCoordinator();            }            else {                if(haveCoordinatorRole() && !local_addr.equals(coord))                    becomeParticipant();            }        }    }    protected Address determineCoordinator() {        synchronized(members) {            return members != null && members.size() > 0? (Address)members.elementAt(0) : null;        }    }    /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */    protected boolean wouldBeNewCoordinator(Address potential_new_coord) {        Address new_coord;        if(potential_new_coord == null) return false;        synchronized(members) {            if(members.size() < 2) return false;            new_coord=(Address)members.elementAt(1);  // member at 2nd place            return new_coord != null && new_coord.equals(potential_new_coord);        }    }    /** Returns true if local_addr is member of mbrs, else false */    protected boolean checkSelfInclusion(Vector mbrs) {        Object mbr;        if(mbrs == null)            return false;        for(int i=0; i < mbrs.size(); i++) {            mbr=mbrs.elementAt(i);            if(mbr != null && local_addr.equals(mbr))                return true;        }        return false;    }    public View makeView(Vector mbrs) {        Address coord=null;        long id=0;        if(view_id != null) {            coord=view_id.getCoordAddress();            id=view_id.getId();        }        return new View(coord, id, mbrs);    }    public View makeView(Vector mbrs, ViewId vid) {        Address coord=null;        long id=0;        if(vid != null) {            coord=vid.getCoordAddress();            id=vid.getId();        }        return new View(coord, id, mbrs);    }    /** Send down a SET_DIGEST event */    public void setDigest(Digest d) {        passDown(new Event(Event.SET_DIGEST, d));    }    /** Send down a MERGE_DIGEST event */    public void mergeDigest(Digest d) {        passDown(new Event(Event.MERGE_DIGEST, d));    }    /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or     timeout, whichever occurs first */    public Digest getDigest() {        Digest ret=null;        synchronized(digest_mutex) {            digest_promise.reset();            passDown(Event.GET_DIGEST_EVT);            try {                ret=(Digest)digest_promise.getResultWithTimeout(digest_timeout);            }            catch(TimeoutException e) {                if(log.isErrorEnabled()) log.error("digest could not be fetched from below");            }            return ret;        }    }    void startFlush(View new_view) {                     if (log.isDebugEnabled()) {             log.debug("starting FLUSH, sending SUSPEND event");         }         passUp(new Event(Event.SUSPEND,new_view));         try {            flush_promise.getResultWithTimeout(flush_timeout);         }         catch(TimeoutException e) {            log.warn("Initiator of flush and group coordinator " + local_addr                  + " timed out waiting for flush responses after "                   + flush_timeout + " msec");         }           }    void stopFlush() {		if (log.isDebugEnabled()) {			log.debug("sending RESUME event");		}		passUp(new Event(Event.RESUME));	}    public void up(Event evt) {        Object obj;        Message msg;        GmsHeader hdr;        MergeData merge_data;        switch(evt.getType()) {            case Event.MSG:                msg=(Message)evt.getArg();                obj=msg.getHeader(name);                if(obj == null || !(obj instanceof GmsHeader))                    break;                hdr=(GmsHeader)msg.removeHeader(name);                switch(hdr.type) {                    case GmsHeader.JOIN_REQ:                        view_handler.add(new Request(Request.JOIN, hdr.mbr, false, null));                        break;                    case GmsHeader.JOIN_RSP:                        impl.handleJoinResponse(hdr.join_rsp);                        break;                    case GmsHeader.LEAVE_REQ:                        if(log.isDebugEnabled())                            log.debug("received LEAVE_REQ for " + hdr.mbr + " from " + msg.getSrc());                        if(hdr.mbr == null) {                            if(log.isErrorEnabled()) log.error("LEAVE_REQ's mbr field is null");                            return;                        }                        view_handler.add(new Request(Request.LEAVE, hdr.mbr, false, null));                        break;                    case GmsHeader.LEAVE_RSP:                        impl.handleLeaveResponse();                        break;                    case GmsHeader.VIEW:                        if(hdr.view == null) {                            if(log.isErrorEnabled()) log.error("[VIEW]: view == null");                            return;                        }                        // send VIEW_ACK to sender of view                        Address coord=msg.getSrc();                        Message view_ack=new Message(coord, null, null);                        GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK, hdr.view);                        view_ack.putHeader(name, tmphdr);                        if(trace)                            log.trace("sending VIEW_ACK to " + coord);                        passDown(new Event(Event.MSG, view_ack));                        impl.handleViewChange(hdr.view, hdr.my_digest);                        break;                    case GmsHeader.VIEW_ACK:                        Object sender=msg.getSrc();                        ack_collector.ack(sender);                        return; // don't pass further up                    case GmsHeader.MERGE_REQ:                        impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);                        break;                    case GmsHeader.MERGE_RSP:                        merge_data=new MergeData(msg.getSrc(), hdr.view, hdr.my_digest);                        merge_data.merge_rejected=hdr.merge_rejected;                        impl.handleMergeResponse(merge_data, hdr.merge_id);                        break;                    case GmsHeader.INSTALL_MERGE_VIEW:                        impl.handleMergeView(new MergeData(msg.getSrc(), hdr.view, hdr.my_digest), hdr.merge_id);                        break;                    case GmsHeader.CANCEL_MERGE:                        impl.handleMergeCancelled(hdr.merge_id);                        break;                    default:                        if(log.isErrorEnabled()) log.error("GmsHeader with type=" + hdr.type + " not known");                }                return;  // don't pass up            case Event.CONNECT_OK:     // sent by someone else, but WE are responsible for sending this !            case Event.DISCONNECT_OK:  // dito (e.g. sent by TP layer). Don't send up the stack                return;            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                if(print_local_addr) {                    System.out.println("\n-------------------------------------------------------\n" +                                       "GMS: address is " + local_addr +                                       "\n-------------------------------------------------------");                }                break;                               // pass up            case Event.SUSPECT:                Address suspected=(Address)evt.getArg();                view_handler.add(new Request(Request.SUSPECT, suspected, true, null));                ack_collector.suspect(suspected);                break;                               // pass up            case Event.UNSUSPECT:                impl.unsuspect((Address)evt.getArg());                return;                              // discard            case Event.MERGE:                view_handler.add(new Request(Request.MERGE, null, false, (Vector)evt.getArg()));                return;                              // don't pass up        }        if(impl.handleUpEvent(evt))            passUp(evt);    }    /**     This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs     to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer     responds with a GET_DIGEST_OK event.<p>     However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process     the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -