clientgmsimpl.java
来自「JGRoups源码」· Java 代码 · 共 304 行
JAVA
304 行
// $Id: ClientGmsImpl.java,v 1.12 2006/01/06 12:23:04 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.View;import org.jgroups.ViewId;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.MethodCall;import org.jgroups.util.Util;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;/** * Client part of GMS. Whenever a new member wants to join a group, it starts in the CLIENT role. * No multicasts to the group will be received and processed until the member has been joined and * turned into a SERVER (either coordinator or participant, mostly just participant). This class * only implements <code>Join</code> (called by clients who want to join a certain group, and * <code>ViewChange</code> which is called by the coordinator that was contacted by this client, to * tell the client what its initial membership is. * * @author Bela Ban * @version $Revision: 1.12 $ */public class ClientGmsImpl extends GmsImpl { final Vector initial_mbrs=new Vector(7); final Object view_installation_mutex=new Object(); boolean joined=false; public ClientGmsImpl(GMS g) { gms=g; } public void init() { initial_mbrs.removeAllElements(); joined=false; } /** * Will generate a CONNECT_OK event. Determines the coordinator and sends a unicast * join() message to it. If successful, we wait for a ViewChange (can time out). * If view change is received, impl is changed to an instance of ParticipantGmsImpl. * Otherwise, we continue trying to send join() messages to the coordinator, * until we succeed (or there is no member in the group. In this case, we create * our own singleton group). * <p>When GMS.disable_initial_coord is set to true, then we won't become coordinator on receiving an initial * membership of 0, but instead will retry (forever) until we get an initial membership of > 0. * * @param mbr Our own address (assigned through SET_LOCAL_ADDRESS) */ public void join(Address mbr) { Address coord; Event view_evt; while(!joined) { findInitialMembers(); if(joined) { if(log.isInfoEnabled()) log.info("joined successfully"); return; } if(initial_mbrs.size() == 0) { if(gms.disable_initial_coord) { if(log.isInfoEnabled()) log.info("received an initial membership of 0, but " + "cannot become coordinator (disable_initial_coord=" + gms.disable_initial_coord + "), will retry fetching the initial membership"); continue; } joined=true; gms.view_id=new ViewId(mbr); // create singleton view with mbr as only member gms.mbrs.add(mbr); view_evt=new Event(Event.VIEW_CHANGE, GMS.makeView(gms.mbrs.getMembers(), gms.view_id)); gms.passDown(view_evt); gms.passUp(view_evt); gms.becomeCoordinator(); gms.passUp(new Event(Event.BECOME_SERVER)); gms.passDown(new Event(Event.BECOME_SERVER)); if(log.isInfoEnabled()) log.info("created group (first member)"); break; } coord=determineCoord(initial_mbrs); if(coord == null) { if(warn) log.warn("could not determine coordinator from responses " + initial_mbrs); continue; } synchronized(view_installation_mutex) { try { if(log.isInfoEnabled()) log.info("sending handleJoin() to " + coord); MethodCall call=new MethodCall("handleJoin", new Object[]{mbr}, new Class[]{Address.class}); gms.callRemoteMethod(coord, call, GroupRequest.GET_NONE, 0); view_installation_mutex.wait(gms.join_timeout); // wait for view -> handleView() } catch(Exception e) { if(log.isErrorEnabled()) log.error("exception is " + e); continue; } } // end synchronized if(joined) { if(log.isInfoEnabled()) log.info("joined successfully"); return; // --> SUCCESS } else { if(log.isInfoEnabled()) log.info("failed, retrying"); Util.sleep(gms.join_retry_timeout); } } // end while } public void leave(Address mbr) { wrongMethod("leave"); } public void suspect(Address mbr) { // wrongMethod("suspect"); } public void merge(Vector other_coords) { wrongMethod("merge"); } public boolean handleJoin(Address mbr) { wrongMethod("handleJoin"); return false; } /** * Returns false. Clients don't handle leave() requests */ public void handleLeave(Address mbr, boolean suspected) { wrongMethod("handleLeave"); } /** * Install the first view in which we are a member. This is essentially a confirmation * of our JOIN request (see join() above). */ public void handleViewChange(ViewId new_view, Vector mems) { if(gms.local_addr != null && mems != null && mems.contains(gms.local_addr)) { synchronized(view_installation_mutex) { // wait until JOIN is sent (above) joined=true; view_installation_mutex.notifyAll(); gms.installView(new_view, mems); gms.becomeParticipant(); gms.passUp(new Event(Event.BECOME_SERVER)); gms.passDown(new Event(Event.BECOME_SERVER)); } synchronized(initial_mbrs) { // in case findInitialMembers() is still running: initial_mbrs.notifyAll(); // this will unblock it } } else if(warn) log.warn("am not member of " + mems + ", will not install view"); } /** * Returns immediately. Clients don't handle merge() requests */ public View handleMerge(ViewId other_view, Vector other_members) { wrongMethod("handleMerge"); return null; } /** * Returns immediately. Clients don't handle suspect() requests */ public void handleSuspect(Address mbr) { wrongMethod("handleSuspect"); } public boolean handleUpEvent(Event evt) { Vector tmp; switch(evt.getType()) { case Event.FIND_INITIAL_MBRS_OK: tmp=(Vector)evt.getArg(); synchronized(initial_mbrs) { if(tmp != null && tmp.size() > 0) for(int i=0; i < tmp.size(); i++) initial_mbrs.addElement(tmp.elementAt(i)); initial_mbrs.notifyAll(); } return false; // don't pass up the stack } return true; } /* --------------------------- Private Methods ------------------------------------ */ /** * Pings initial members. Removes self before returning vector of initial members. * Uses IP multicast or gossiping, depending on parameters. */ void findInitialMembers() { PingRsp ping_rsp; synchronized(initial_mbrs) { initial_mbrs.removeAllElements(); gms.passDown(Event.FIND_INITIAL_MBRS_EVT); if(initial_mbrs.size() == 0) { try { initial_mbrs.wait(); } catch(Exception e) { } } for(int i=0; i < initial_mbrs.size(); i++) { ping_rsp=(PingRsp)initial_mbrs.elementAt(i); if(ping_rsp.own_addr != null && gms.local_addr != null && ping_rsp.own_addr.equals(gms.local_addr)) { initial_mbrs.removeElementAt(i); break; } } } } /** * The coordinator is determined by a majority vote. If there are an equal number of votes for * more than 1 candidate, we determine the winner randomly. */ Address determineCoord(Vector mbrs) { PingRsp mbr; Hashtable votes; int count, most_votes; Address winner=null, tmp; if(mbrs == null || mbrs.size() < 1) return null; votes=new Hashtable(5); // count *all* the votes (unlike the 2000 election) for(int i=0; i < mbrs.size(); i++) { mbr=(PingRsp)mbrs.elementAt(i); if(mbr.coord_addr != null) { if(!votes.containsKey(mbr.coord_addr)) votes.put(mbr.coord_addr, new Integer(1)); else { count=((Integer)votes.get(mbr.coord_addr)).intValue(); votes.put(mbr.coord_addr, new Integer(count + 1)); } } } if(votes.size() > 1) if(warn) log.warn("there was more than 1 candidate for coordinator: " + votes); else if(log.isInfoEnabled()) log.info("election results: " + votes); // determine who got the most votes most_votes=0; for(Enumeration e=votes.keys(); e.hasMoreElements();) { tmp=(Address)e.nextElement(); count=((Integer)votes.get(tmp)).intValue(); if(count > most_votes) { winner=tmp;// fixed July 15 2003 (patch submitted by Darren Hobbs, patch-id=771418) most_votes=count; } } votes.clear(); return winner; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?