jgroupsmember.java
来自「JGRoups源码」· Java 代码 · 共 468 行 · 第 1/2 页
JAVA
468 行
package org.jgroups.tests;
/**
* @author Bela Ban
* @version $Id$
*/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.util.Util;
import java.util.Iterator;
import java.util.Vector;
/**
* Usage: JGroupsMember <role> <protocolConfigXmlFileName> <message> <numberOfRepeats>
* role = client or server. A client will connect and close, and a server will connect and
* stay on the channel forever.
* protocolConfigXmlFileName = the path name to the JGroups protocol stack configuration
* XML file.
* message = any message, use quote if blank spaces are embeded. A special message "stop"
* from a client role would shutdown the server.
* numberOfRepeats = number of time to repeat the sending of the message. Applicable only
* if role is client.
*/
public class JGroupsMember implements RequestHandler, MembershipListener, MessageListener {
private Log logger;
private String role;
private String protocolConfigXmlFile;
private String messageContents=null;
private int numOfRepeats=1;
private Channel channel=null;
private String groupName="win2kschouProto";
private MessageDispatcher disp=null;
private long jtimeout=30000; // miliseconds
private long startTime;
private Address serverAddress=null;
private final Object serverAddressMutex=new Object();
private final Object serverEnds=new Object();
public JGroupsMember(String role, String protocolConfigXmlFile, String message,
int numOfRepeats) {
this.role=role;
this.protocolConfigXmlFile=protocolConfigXmlFile;
this.messageContents=message;
this.numOfRepeats=(numOfRepeats > 0? numOfRepeats : 1);
this.logger=LogFactory.getLog(getClass());
Runtime.getRuntime().addShutdownHook(new Thread("Shutdown") {
public void run() {
if(null != channel) {
logger.info("[" + getLocalAddress() +
"] shutdown and close channel...");
channel.close();
}
else
logger.info("[" + getLocalAddress() +
"] shutdown");
}
});
}
public boolean init() {
boolean initComplete=false;
String addr=null;
try {
// create a channel
logger.debug("Creating a channel...");
channel=new JChannel(protocolConfigXmlFile);
}
catch(NullPointerException nx) {
logger.error("Missing required JGroups protocol stack configuration XML file.");
return initComplete=false;
}
catch(ChannelException cx) {
logger.error("Failed to init JGroups protocal stack when creating Channel." +
" Error: ", cx);
return initComplete=false;
}
try {
logger.debug("Creating a MessageDispatcher using the channel...");
// MessageDispatcher(Channel, MessageListener, MembershipListener, RequestHandler)
disp=new MessageDispatcher(channel, this, this, this);
try {
// connect the channel to the group
logger.debug("Connecting the channel to the group \"" + groupName + "\"");
channel.connect(groupName);
}
catch(ChannelClosedException csx) {
logger.error("Attempt to connect using a closed channel.", csx);
return initComplete=false;
}
catch(ChannelException cex) {
logger.error("Failed to start JGroups protocal stack. Error: ", cex);
return initComplete=false;
}
if(!channel.isConnected()) {
logger.error("Failed to connect to the group.");
return initComplete=false;
}
addr=getLocalAddress();
logger.debug("The channel is connected using the address " + addr);
try {
// request for the state of the group, i.e. the server address
while(null == serverAddress) {
if(logger.isDebugEnabled())
logger.debug("[" + addr + "] Requesting for group State from the" +
" coordinator using timeout " + jtimeout + "ms...");
startTime=System.currentTimeMillis();
if(!channel.getState(null, jtimeout)) {
if((System.currentTimeMillis() - startTime) < jtimeout) {
if(logger.isDebugEnabled())
logger.debug("[" + addr + "] unable to get group state, may be" +
" because this is the first member joining the" +
" group.");
}
else {
if(logger.isWarnEnabled())
logger.warn("[" + addr + "] unable to get group state after " +
jtimeout + "ms timeout.");
}
}
if(role.equals("server"))
break;
}
}
catch(ChannelNotConnectedException cnex) {
if(logger.isErrorEnabled())
logger.error("[" + addr + "] failed to get group state," +
" ChannelNotConnectedException: " + cnex.getMessage());
return initComplete=false;
}
catch(ChannelClosedException ccex) {
if(logger.isErrorEnabled())
logger.error("[" + addr + "] failed to get group state, the channel is" +
" already closed and cannot be reused -" +
" ChannelClosedException: " + ccex.getMessage());
return initComplete=false;
}
Message msg=new Message(null, null, role);
logger.debug("[" + addr + "] casting my role (" + role + ") to the group...");
disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // timeout 0 has no effect
// because of GET_NONE
return initComplete=true;
}
finally {
if(null != channel && !initComplete) {
channel.close();
channel=null;
}
}
}
public void run() {
String addr=getLocalAddress();
if(role.equals("server"))
runServer();
else
runClient(messageContents, numOfRepeats);
if(null != channel) {
logger.info("[" + addr + "] closing channel...");
channel.close();
channel=null;
logger.info("[" + addr + "] exits___ __ _ .. .");
}
}
public void runServer() {
String addr=getLocalAddress();
synchronized(serverEnds) {
try {
logger.debug("[" + addr + "] server main thread goes into wait...");
serverEnds.wait(0);
}
catch(InterruptedException ix) {
logger.info("[" + addr + "] server main thread is waken up.");
}
}
logger.info("[" + addr + "] runServer() stops.");
}
public void runClient(String clientMessage, int repeats) {
String addr=getLocalAddress();
for(int i=0; i < repeats; i++) {
Message msg=new Message(serverAddress, null, clientMessage);
if(logger.isDebugEnabled())
logger.debug("[" + addr + "] sending Message \"" + clientMessage + "\" to server at" +
serverAddress);
try {
Object reply=disp.sendMessage(msg, GroupRequest.GET_FIRST, 3000);
if(null != reply) {
if(logger.isDebugEnabled())
logger.debug("[" + addr + "] received reply \"" + reply.toString() + "\"");
}
else
logger.debug("[" + addr + "] received null reply");
}
catch(TimeoutException tex) {
logger.error("[" + addr + "] timeout exception when sending Message \"" +
clientMessage + "\"; Exception: " + tex.getMessage());
break;
}
catch(SuspectedException sex) {
logger.error("[" + addr + "] suspected exception when sending Message \"" +
clientMessage + "\"; Exception: " + sex.getMessage());
break;
}
}
}
/**
* A MembershipListener callback. It is called by the JGroups to notify this member to
* stop sending messages to the group.
*/
public void block() {
if(logger.isDebugEnabled())
logger.debug("[" + getLocalAddress() + "] MembershipListener.block() is called" +
" notifying this member to stop sending messages...");
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?