jgroupsmember.java

来自「JGRoups源码」· Java 代码 · 共 468 行 · 第 1/2 页

JAVA
468
字号
package org.jgroups.tests;

/**
 * @author Bela Ban
 * @version $Id$
 */

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.util.Util;

import java.util.Iterator;
import java.util.Vector;

/**
 * Usage: JGroupsMember <role> <protocolConfigXmlFileName> <message> <numberOfRepeats>
 * role = client or server. A client will connect and close, and a server will connect and
 * stay on the channel forever.
 * protocolConfigXmlFileName = the path name to the JGroups protocol stack configuration
 * XML file.
 * message = any message, use quote if blank spaces are embeded. A special message "stop"
 * from a client role would shutdown the server.
 * numberOfRepeats = number of time to repeat the sending of the message. Applicable only
 * if role is client.
 */
public class JGroupsMember implements RequestHandler, MembershipListener, MessageListener {
    private Log logger;
    private String role;
    private String protocolConfigXmlFile;
    private String messageContents=null;
    private int numOfRepeats=1;
    private Channel channel=null;
    private String groupName="win2kschouProto";
    private MessageDispatcher disp=null;
    private long jtimeout=30000; // miliseconds
    private long startTime;
    private Address serverAddress=null;
    private final Object serverAddressMutex=new Object();
    private final Object serverEnds=new Object();

    public JGroupsMember(String role, String protocolConfigXmlFile, String message,
                         int numOfRepeats) {
        this.role=role;
        this.protocolConfigXmlFile=protocolConfigXmlFile;
        this.messageContents=message;
        this.numOfRepeats=(numOfRepeats > 0? numOfRepeats : 1);
        this.logger=LogFactory.getLog(getClass());
        Runtime.getRuntime().addShutdownHook(new Thread("Shutdown") {
            public void run() {
                if(null != channel) {
                    logger.info("[" + getLocalAddress() +
                            "] shutdown and close channel...");
                    channel.close();
                }
                else
                    logger.info("[" + getLocalAddress() +
                            "] shutdown");
            }
        });

    }

    public boolean init() {
        boolean initComplete=false;
        String addr=null;

        try {
// create a channel
            logger.debug("Creating a channel...");
            channel=new JChannel(protocolConfigXmlFile);
        }
        catch(NullPointerException nx) {
            logger.error("Missing required JGroups protocol stack configuration XML file.");
            return initComplete=false;
        }
        catch(ChannelException cx) {
            logger.error("Failed to init JGroups protocal stack when creating Channel." +
                    " Error: ", cx);
            return initComplete=false;
        }

        try {
            logger.debug("Creating a MessageDispatcher using the channel...");
// MessageDispatcher(Channel, MessageListener, MembershipListener, RequestHandler)
            disp=new MessageDispatcher(channel, this, this, this);

            try {
// connect the channel to the group
                logger.debug("Connecting the channel to the group \"" + groupName + "\"");
                channel.connect(groupName);
            }
            catch(ChannelClosedException csx) {
                logger.error("Attempt to connect using a closed channel.", csx);
                return initComplete=false;
            }
            catch(ChannelException cex) {
                logger.error("Failed to start JGroups protocal stack. Error: ", cex);
                return initComplete=false;
            }

            if(!channel.isConnected()) {
                logger.error("Failed to connect to the group.");
                return initComplete=false;
            }

            addr=getLocalAddress();
            logger.debug("The channel is connected using the address " + addr);

            try {
// request for the state of the group, i.e. the server address
                while(null == serverAddress) {
                    if(logger.isDebugEnabled())
                        logger.debug("[" + addr + "] Requesting for group State from the" +
                                " coordinator using timeout " + jtimeout + "ms...");
                    startTime=System.currentTimeMillis();
                    if(!channel.getState(null, jtimeout)) {
                        if((System.currentTimeMillis() - startTime) < jtimeout) {
                            if(logger.isDebugEnabled())
                                logger.debug("[" + addr + "] unable to get group state, may be" +
                                        " because this is the first member joining the" +
                                        " group.");
                        }
                        else {
                            if(logger.isWarnEnabled())
                                logger.warn("[" + addr + "] unable to get group state after " +
                                        jtimeout + "ms timeout.");
                        }
                    }
                    if(role.equals("server"))
                        break;
                }
            }
            catch(ChannelNotConnectedException cnex) {
                if(logger.isErrorEnabled())
                    logger.error("[" + addr + "] failed to get group state," +
                            " ChannelNotConnectedException: " + cnex.getMessage());
                return initComplete=false;
            }
            catch(ChannelClosedException ccex) {
                if(logger.isErrorEnabled())
                    logger.error("[" + addr + "] failed to get group state, the channel is" +
                            " already closed and cannot be reused -" +
                            " ChannelClosedException: " + ccex.getMessage());
                return initComplete=false;
            }

            Message msg=new Message(null, null, role);
            logger.debug("[" + addr + "] casting my role (" + role + ") to the group...");
            disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // timeout 0 has no effect
// because of GET_NONE
            return initComplete=true;
        }
        finally {
            if(null != channel && !initComplete) {
                channel.close();
                channel=null;
            }
        }
    }

    public void run() {
        String addr=getLocalAddress();

        if(role.equals("server"))
            runServer();
        else
            runClient(messageContents, numOfRepeats);

        if(null != channel) {
            logger.info("[" + addr + "] closing channel...");
            channel.close();
            channel=null;
            logger.info("[" + addr + "] exits___ __ _ .. .");
        }
    }

    public void runServer() {
        String addr=getLocalAddress();

        synchronized(serverEnds) {
            try {
                logger.debug("[" + addr + "] server main thread goes into wait...");
                serverEnds.wait(0);
            }
            catch(InterruptedException ix) {
                logger.info("[" + addr + "] server main thread is waken up.");
            }
        }
        logger.info("[" + addr + "] runServer() stops.");
    }

    public void runClient(String clientMessage, int repeats) {
        String addr=getLocalAddress();
        for(int i=0; i < repeats; i++) {
            Message msg=new Message(serverAddress, null, clientMessage);
            if(logger.isDebugEnabled())
                logger.debug("[" + addr + "] sending Message \"" + clientMessage + "\" to server at" +
                        serverAddress);
            try {
                Object reply=disp.sendMessage(msg, GroupRequest.GET_FIRST, 3000);
                if(null != reply) {
                    if(logger.isDebugEnabled())
                        logger.debug("[" + addr + "] received reply \"" + reply.toString() + "\"");
                }
                else
                    logger.debug("[" + addr + "] received null reply");
            }
            catch(TimeoutException tex) {
                logger.error("[" + addr + "] timeout exception when sending Message \"" +
                        clientMessage + "\"; Exception: " + tex.getMessage());
                break;
            }
            catch(SuspectedException sex) {
                logger.error("[" + addr + "] suspected exception when sending Message \"" +
                        clientMessage + "\"; Exception: " + sex.getMessage());
                break;
            }
        }
    }

    /**
     * A MembershipListener callback. It is called by the JGroups to notify this member to
     * stop sending messages to the group.
     */
    public void block() {
        if(logger.isDebugEnabled())
            logger.debug("[" + getLocalAddress() + "] MembershipListener.block() is called" +
                    " notifying this member to stop sending messages...");
    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?