📄 simpletcpcluster.java
字号:
/**
* Use as base to handle start/stop/periodic Events from host. Currently
* only log the messages as trace level.
*
* @see org.apache.catalina.LifecycleListener#lifecycleEvent(org.apache.catalina.LifecycleEvent)
*/
public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
if (log.isTraceEnabled())
log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent.getType(), lifecycleEvent.getData()));
}
// ------------------------------------------------------ public
/**
* Prepare for the beginning of active use of the public methods of this
* component. This method should be called after <code>configure()</code>,
* and before any of the public methods of the component are utilized. <BR>
* Starts the cluster communication channel, this will connect with the
* other nodes in the cluster, and request the current session state to be
* transferred to this node.
*
* @exception IllegalStateException
* if this component has already been started
* @exception LifecycleException
* if this component detects a fatal error that prevents this
* component from being used
*/
public void start() throws LifecycleException {
if (started)
throw new LifecycleException(sm.getString("cluster.alreadyStarted"));
if (log.isInfoEnabled()) log.info("Cluster is about to start");
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, this);
try {
checkDefaults();
registerClusterValve();
channel.addMembershipListener(this);
channel.addChannelListener(this);
channel.start(channel.DEFAULT);
if (clusterDeployer != null) clusterDeployer.start();
this.started = true;
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(AFTER_START_EVENT, this);
} catch (Exception x) {
log.error("Unable to start cluster.", x);
throw new LifecycleException(x);
}
}
protected void checkDefaults() {
if ( clusterListeners.size() == 0 ) {
addClusterListener(new JvmRouteSessionIDBinderListener());
addClusterListener(new ClusterSessionListener());
}
if ( valves.size() == 0 ) {
addValve(new JvmRouteBinderValve());
addValve(new ReplicationValve());
}
if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
if ( channel == null ) channel = new GroupChannel();
if ( channel instanceof GroupChannel && !((GroupChannel)channel).getInterceptors().hasNext()) {
channel.addInterceptor(new MessageDispatch15Interceptor());
channel.addInterceptor(new TcpFailureDetector());
}
}
/**
* register all cluster valve to host or engine
* @throws Exception
* @throws ClassNotFoundException
*/
protected void registerClusterValve() throws Exception {
if(container != null ) {
for (Iterator iter = valves.iterator(); iter.hasNext();) {
ClusterValve valve = (ClusterValve) iter.next();
if (log.isDebugEnabled())
log.debug("Invoking addValve on " + getContainer()
+ " with class=" + valve.getClass().getName());
if (valve != null) {
IntrospectionUtils.callMethodN(getContainer(), "addValve",
new Object[] { valve },
new Class[] { org.apache.catalina.Valve.class });
}
valve.setCluster(this);
}
}
}
/**
* unregister all cluster valve to host or engine
* @throws Exception
* @throws ClassNotFoundException
*/
protected void unregisterClusterValve() throws Exception {
for (Iterator iter = valves.iterator(); iter.hasNext();) {
ClusterValve valve = (ClusterValve) iter.next();
if (log.isDebugEnabled())
log.debug("Invoking removeValve on " + getContainer()
+ " with class=" + valve.getClass().getName());
if (valve != null) {
IntrospectionUtils.callMethodN(getContainer(), "removeValve",
new Object[] { valve }, new Class[] { org.apache.catalina.Valve.class });
}
valve.setCluster(this);
}
}
/**
* Gracefully terminate the active cluster component.<br/>
* This will disconnect the cluster communication channel, stop the
* listener and deregister the valves from host or engine.<br/><br/>
* <b>Note:</b><br/>The sub elements receiver, sender, membership,
* listener or valves are not removed. You can easily start the cluster again.
*
* @exception IllegalStateException
* if this component has not been started
* @exception LifecycleException
* if this component detects a fatal error that needs to be
* reported
*/
public void stop() throws LifecycleException {
if (!started)
throw new IllegalStateException(sm.getString("cluster.notStarted"));
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(BEFORE_STOP_EVENT, this);
if (clusterDeployer != null) clusterDeployer.stop();
this.managers.clear();
try {
if ( clusterDeployer != null ) clusterDeployer.setCluster(null);
channel.stop(Channel.DEFAULT);
channel.removeChannelListener(this);
channel.removeMembershipListener(this);
this.unregisterClusterValve();
} catch (Exception x) {
log.error("Unable to stop cluster valve.", x);
}
started = false;
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, this);
}
/**
* send message to all cluster members
* @param msg message to transfer
*
* @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage)
*/
public void send(ClusterMessage msg) {
send(msg, null);
}
/**
* send message to all cluster members same cluster domain
*
* @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage)
*/
public void sendClusterDomain(ClusterMessage msg) {
send(msg,null);
}
/**
* send a cluster message to one member
*
* @param msg message to transfer
* @param dest Receiver member
* @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage,
* org.apache.catalina.ha.Member)
*/
public void send(ClusterMessage msg, Member dest) {
try {
msg.setAddress(getLocalMember());
if (dest != null) {
if (!getLocalMember().equals(dest)) {
channel.send(new Member[] {dest}, msg,channelSendOptions);
} else
log.error("Unable to send message to local member " + msg);
} else {
channel.send(channel.getMembers(),msg,channelSendOptions);
}
} catch (Exception x) {
log.error("Unable to send message through cluster sender.", x);
}
}
/**
* New cluster member is registered
*
* @see org.apache.catalina.ha.MembershipListener#memberAdded(org.apache.catalina.ha.Member)
*/
public void memberAdded(Member member) {
try {
hasMembers = channel.hasMembers();
if (log.isInfoEnabled()) log.info("Replication member added:" + member);
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(BEFORE_MEMBERREGISTER_EVENT, member);
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(AFTER_MEMBERREGISTER_EVENT, member);
} catch (Exception x) {
log.error("Unable to connect to replication system.", x);
}
}
/**
* Cluster member is gone
*
* @see org.apache.catalina.ha.MembershipListener#memberDisappeared(org.apache.catalina.ha.Member)
*/
public void memberDisappeared(Member member) {
try {
hasMembers = channel.hasMembers();
if (log.isInfoEnabled()) log.info("Received member disappeared:" + member);
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(BEFORE_MEMBERUNREGISTER_EVENT, member);
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(AFTER_MEMBERUNREGISTER_EVENT, member);
} catch (Exception x) {
log.error("Unable remove cluster node from replication system.", x);
}
}
// --------------------------------------------------------- receiver
// messages
/**
* notify all listeners from receiving a new message is not ClusterMessage
* emitt Failure Event to LifecylceListener
*
* @param message
* receveived Message
*/
public boolean accept(Serializable msg, Member sender) {
return (msg instanceof ClusterMessage);
}
public void messageReceived(Serializable message, Member sender) {
ClusterMessage fwd = (ClusterMessage)message;
fwd.setAddress(sender);
messageReceived(fwd);
}
public void messageReceived(ClusterMessage message) {
long start = 0;
if (log.isDebugEnabled() && message != null)
log.debug("Assuming clocks are synched: Replication for "
+ message.getUniqueId() + " took="
+ (System.currentTimeMillis() - (message).getTimestamp())
+ " ms.");
//invoke all the listeners
boolean accepted = false;
if (message != null) {
for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {
ClusterListener listener = (ClusterListener) iter.next();
if (listener.accept(message)) {
accepted = true;
listener.messageReceived(message);
}
}
}
if (!accepted && log.isDebugEnabled()) {
if (notifyLifecycleListenerOnFailure) {
Member dest = message.getAddress();
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,
new SendMessageData(message, dest, null));
}
log.debug("Message " + message.toString() + " from type "
+ message.getClass().getName()
+ " transfered but no listener registered");
}
return;
}
// --------------------------------------------------------- Logger
public Log getLogger() {
return log;
}
// ------------------------------------------------------------- deprecated
/**
*
* @see org.apache.catalina.Cluster#setProtocol(java.lang.String)
*/
public void setProtocol(String protocol) {
}
/**
* @see org.apache.catalina.Cluster#getProtocol()
*/
public String getProtocol() {
return null;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -