⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientgmsimpl.java

📁 JGRoups源码
💻 JAVA
字号:
// $Id: ClientGmsImpl.java,v 1.34 2006/08/03 07:53:12 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.protocols.PingRsp;import org.jgroups.util.Promise;import org.jgroups.util.Util;import java.util.*;/** * Client part of GMS. Whenever a new member wants to join a group, it starts in the CLIENT role. * No multicasts to the group will be received and processed until the member has been joined and * turned into a SERVER (either coordinator or participant, mostly just participant). This class * only implements <code>Join</code> (called by clients who want to join a certain group, and * <code>ViewChange</code> which is called by the coordinator that was contacted by this client, to * tell the client what its initial membership is. * @author Bela Ban * @version $Revision: 1.34 $ */public class ClientGmsImpl extends GmsImpl {    private final Vector  initial_mbrs=new Vector(11);    private boolean       initial_mbrs_received=false;    private final Promise join_promise=new Promise();    public ClientGmsImpl(GMS g) {        super(g);    }    public void init() throws Exception {        super.init();        synchronized(initial_mbrs) {            initial_mbrs.clear();            initial_mbrs_received=false;        }        join_promise.reset();    }    /**     * Joins this process to a group. Determines the coordinator and sends a unicast     * handleJoin() message to it. The coordinator returns a JoinRsp and then broadcasts the new view, which     * contains a message digest and the current membership (including the joiner). The joiner is then     * supposed to install the new view and the digest and starts accepting mcast messages. Previous     * mcast messages were discarded (this is done in PBCAST).<p>     * If successful, impl is changed to an instance of ParticipantGmsImpl.     * Otherwise, we continue trying to send join() messages to	the coordinator,     * until we succeed (or there is no member in the group. In this case, we create our own singleton group).     * <p>When GMS.disable_initial_coord is set to true, then we won't become coordinator on receiving an initial     * membership of 0, but instead will retry (forever) until we get an initial membership of > 0.     * @param mbr Our own address (assigned through SET_LOCAL_ADDRESS)     */    public void join(Address mbr) {        Address coord;        JoinRsp rsp;        Digest  tmp_digest;        View    tmp_view;        leaving=false;        join_promise.reset();        while(!leaving) {            findInitialMembers();            if(log.isDebugEnabled()) log.debug("initial_mbrs are " + initial_mbrs);            if(initial_mbrs.size() == 0) {                if(gms.disable_initial_coord) {                    if(trace)                        log.trace("received an initial membership of 0, but cannot become coordinator " +                                "(disable_initial_coord=true), will retry fetching the initial membership");                    continue;                }                if(log.isDebugEnabled())                    log.debug("no initial members discovered: creating group as first member");                becomeSingletonMember(mbr);                return;            }            coord=determineCoord(initial_mbrs);            if(coord == null) { // e.g. because we have all clients only                if(gms.handle_concurrent_startup == false) {                    if(trace)                        log.trace("handle_concurrent_startup is false; ignoring responses of initial clients");                    becomeSingletonMember(mbr);                    return;                }                if(trace)                    log.trace("could not determine coordinator from responses " + initial_mbrs);                // so the member to become singleton member (and thus coord) is the first of all clients                Set clients=new TreeSet(); // sorted                clients.add(mbr); // add myself again (was removed by findInitialMembers())                for(int i=0; i < initial_mbrs.size(); i++) {                    PingRsp pingRsp=(PingRsp)initial_mbrs.elementAt(i);                    Address client_addr=pingRsp.getAddress();                    if(client_addr != null)                        clients.add(client_addr);                }                if(trace)                    log.trace("clients to choose new coord from are: " + clients);                Address new_coord=(Address)clients.iterator().next();                if(new_coord.equals(mbr)) {                    if(trace)                        log.trace("I (" + mbr + ") am the first of the clients, will become coordinator");                    becomeSingletonMember(mbr);                    return;                }                else {                    if(trace)                        log.trace("I (" + mbr + ") am not the first of the clients, " +                                "waiting for another client to become coordinator");                    Util.sleep(500);                }                continue;            }            try {                if(log.isDebugEnabled())                    log.debug("sending handleJoin(" + mbr + ") to " + coord);                sendJoinMessage(coord, mbr);                rsp=(JoinRsp)join_promise.getResult(gms.join_timeout);                if(rsp == null) {                    if(warn) log.warn("join(" + mbr + ") sent to " + coord + " timed out, retrying");                }                else {                    // 1. check whether JOIN was rejected                    String failure=rsp.getFailReason();                    if(failure != null)                        throw new SecurityException(failure);                    // 2. Install digest                    tmp_digest=rsp.getDigest();                    tmp_view=rsp.getView();                    if(tmp_digest == null || tmp_view == null) {                        if(log.isErrorEnabled())                            log.error("JoinRsp has a null view or digest: view=" + tmp_view + ", digest=" +                                    tmp_digest + ", skipping it");                    }                    else {                        tmp_digest.incrementHighSeqno(coord); 	// see DESIGN for an explanantion                        gms.setDigest(tmp_digest);                        if(log.isDebugEnabled()) log.debug("[" + gms.local_addr + "]: JoinRsp=" + tmp_view +                                " [size=" + tmp_view.size() + "]\n\n");                        if(!installView(tmp_view)) {                            if(log.isErrorEnabled()) log.error("view installation failed, retrying to join group");                            continue;                        }                        // send VIEW_ACK to sender of view                        Message view_ack=new Message(coord, null, null);                        GMS.GmsHeader tmphdr=new GMS.GmsHeader(GMS.GmsHeader.VIEW_ACK, tmp_view);                        view_ack.putHeader(GMS.name, tmphdr);                        gms.passDown(new Event(Event.MSG, view_ack));                        gms.passUp(new Event(Event.BECOME_SERVER));                        gms.passDown(new Event(Event.BECOME_SERVER));                        return;                    }                }            }            catch(SecurityException security_ex) {                throw security_ex;            }            catch(Throwable e) {                if(log.isDebugEnabled()) log.debug("exception=" + e + ", retrying");            }            Util.sleep(gms.join_retry_timeout);        }    }    public void leave(Address mbr) {        leaving=true;        wrongMethod("leave");    }    public void handleJoinResponse(JoinRsp join_rsp) {        join_promise.setResult(join_rsp); // will wake up join() method    }    public void handleLeaveResponse() {    }    public void suspect(Address mbr) {    }    public void unsuspect(Address mbr) {    }    public void handleMembershipChange (Collection newMembers, Collection leavingMembers, Collection suspectedMembers) {    }    /**     * Does nothing. Discards all views while still client.     */    public synchronized void handleViewChange(View new_view, Digest digest) {        if(log.isTraceEnabled())            log.trace("view " + new_view.getVid() + " is discarded as we are not a participant");    }    /**     * Called by join(). Installs the view returned by calling Coord.handleJoin() and     * becomes coordinator.     */    private boolean installView(View new_view) {        Vector mems=new_view.getMembers();         if(log.isDebugEnabled()) log.debug("new_view=" + new_view);        if(gms.local_addr == null || mems == null || !mems.contains(gms.local_addr)) {            if(log.isErrorEnabled()) log.error("I (" + gms.local_addr +                                                       ") am not member of " + mems + ", will not install view");            return false;        }        gms.installView(new_view);        gms.becomeParticipant();        gms.passUp(new Event(Event.BECOME_SERVER));        gms.passDown(new Event(Event.BECOME_SERVER));        return true;    }    /** Returns immediately. Clients don't handle suspect() requests */    // public void handleSuspect(Address mbr) {    // }    public boolean handleUpEvent(Event evt) {        Vector tmp;        switch(evt.getType()) {            case Event.FIND_INITIAL_MBRS_OK:                tmp=(Vector)evt.getArg();                synchronized(initial_mbrs) {                    if(tmp != null && tmp.size() > 0) {                        initial_mbrs.addAll(tmp);                    }                    initial_mbrs_received=true;                    initial_mbrs.notifyAll();                }                return false;  // don't pass up the stack        }        return true;    }    /* --------------------------- Private Methods ------------------------------------ */    void sendJoinMessage(Address coord, Address mbr) {        Message msg;        GMS.GmsHeader hdr;        msg=new Message(coord, null, null);        hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_REQ, mbr);        msg.putHeader(gms.getName(), hdr);        gms.passDown(new Event(Event.MSG, msg));    }    /**     * Pings initial members. Removes self before returning vector of initial members.     * Uses IP multicast or gossiping, depending on parameters.     */    void findInitialMembers() {        PingRsp ping_rsp;        synchronized(initial_mbrs) {            initial_mbrs.removeAllElements();            initial_mbrs_received=false;            gms.passDown(new Event(Event.FIND_INITIAL_MBRS));            // the initial_mbrs_received flag is needed when passDown() is executed on the same thread, so when            // it returns, a response might actually have been received (even though the initial_mbrs might still be empty)            if(initial_mbrs_received == false) {                try {                    initial_mbrs.wait();                }                catch(Exception e) {                }            }            for(int i=0; i < initial_mbrs.size(); i++) {                ping_rsp=(PingRsp)initial_mbrs.elementAt(i);                if(ping_rsp.own_addr != null && gms.local_addr != null &&                        ping_rsp.own_addr.equals(gms.local_addr)) {                    initial_mbrs.removeElementAt(i);                    break;                }            }        }    }    /**     The coordinator is determined by a majority vote. If there are an equal number of votes for     more than 1 candidate, we determine the winner randomly.     */    private Address determineCoord(Vector mbrs) {        PingRsp mbr;        Hashtable votes;        int count, most_votes;        Address winner=null, tmp;        if(mbrs == null || mbrs.size() < 1)            return null;        votes=new Hashtable(5);        // count *all* the votes (unlike the 2000 election)        for(int i=0; i < mbrs.size(); i++) {            mbr=(PingRsp)mbrs.elementAt(i);            if(mbr.is_server && mbr.coord_addr != null) {                if(!votes.containsKey(mbr.coord_addr))                    votes.put(mbr.coord_addr, new Integer(1));                else {                    count=((Integer)votes.get(mbr.coord_addr)).intValue();                    votes.put(mbr.coord_addr, new Integer(count + 1));                }            }        }        if(votes.size() > 1) {            if(warn) log.warn("there was more than 1 candidate for coordinator: " + votes);        }        else {            if(log.isDebugEnabled()) log.debug("election results: " + votes);        }        // determine who got the most votes        most_votes=0;        for(Enumeration e=votes.keys(); e.hasMoreElements();) {            tmp=(Address)e.nextElement();            count=((Integer)votes.get(tmp)).intValue();            if(count > most_votes) {                winner=tmp;                // fixed July 15 2003 (patch submitted by Darren Hobbs, patch-id=771418)                most_votes=count;            }        }        votes.clear();        return winner;    }    void becomeSingletonMember(Address mbr) {        Digest initial_digest;        ViewId view_id;        Vector mbrs=new Vector(1);        // set the initial digest (since I'm the first member)        initial_digest=new Digest(1);             // 1 member (it's only me)        initial_digest.add(gms.local_addr, 0, 0); // initial seqno mcast by me will be 1 (highest seen +1)        gms.setDigest(initial_digest);        view_id=new ViewId(mbr);       // create singleton view with mbr as only member        mbrs.addElement(mbr);        gms.installView(new View(view_id, mbrs));        gms.becomeCoordinator(); // not really necessary - installView() should do it        gms.passUp(new Event(Event.BECOME_SERVER));        gms.passDown(new Event(Event.BECOME_SERVER));        if(log.isDebugEnabled()) log.debug("created group (first member). My view is " + gms.view_id +                                           ", impl is " + gms.getImpl().getClass().getName());    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -