📄 groupchannel.java
字号:
protected synchronized void setupDefaultStack() throws ChannelException {
if ( getFirstInterceptor() != null &&
((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
ChannelInterceptor interceptor = null;
Class clazz = null;
try {
clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
true,GroupChannel.class.getClassLoader());
clazz.newInstance();
} catch ( Throwable x ) {
clazz = MessageDispatchInterceptor.class;
}//catch
try {
interceptor = (ChannelInterceptor) clazz.newInstance();
} catch (Exception x) {
throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x);
}
this.addInterceptor(interceptor);
}
}
/**
* Validates the option flags that each interceptor is using and reports
* an error if two interceptor share the same flag.
* @throws ChannelException
*/
protected void checkOptionFlags() throws ChannelException {
StringBuffer conflicts = new StringBuffer();
ChannelInterceptor first = interceptors;
while ( first != null ) {
int flag = first.getOptionFlag();
if ( flag != 0 ) {
ChannelInterceptor next = first.getNext();
while ( next != null ) {
int nflag = next.getOptionFlag();
if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
conflicts.append("[");
conflicts.append(first.getClass().getName());
conflicts.append(":");
conflicts.append(flag);
conflicts.append(" == ");
conflicts.append(next.getClass().getName());
conflicts.append(":");
conflicts.append(nflag);
conflicts.append("] ");
}//end if
next = next.getNext();
}//while
}//end if
first = first.getNext();
}//while
if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());
}
/**
* Starts the channel
* @param svc int - what service to start
* @throws ChannelException
* @see org.apache.catalina.tribes.Channel#start(int)
*/
public synchronized void start(int svc) throws ChannelException {
setupDefaultStack();
if (optionCheck) checkOptionFlags();
super.start(svc);
if ( hbthread == null && heartbeat ) {
hbthread = new HeartbeatThread(this,heartbeatSleeptime);
hbthread.start();
}
}
/**
* Stops the channel
* @param svc int
* @throws ChannelException
* @see org.apache.catalina.tribes.Channel#stop(int)
*/
public synchronized void stop(int svc) throws ChannelException {
if (hbthread != null) {
hbthread.stopHeartbeat();
hbthread = null;
}
super.stop(svc);
}
/**
* Returns the first interceptor of the stack. Useful for traversal.
* @return ChannelInterceptor
*/
public ChannelInterceptor getFirstInterceptor() {
if (interceptors != null) return interceptors;
else return coordinator;
}
/**
* Returns the channel receiver component
* @return ChannelReceiver
*/
public ChannelReceiver getChannelReceiver() {
return coordinator.getClusterReceiver();
}
/**
* Returns the channel sender component
* @return ChannelSender
*/
public ChannelSender getChannelSender() {
return coordinator.getClusterSender();
}
/**
* Returns the membership service component
* @return MembershipService
*/
public MembershipService getMembershipService() {
return coordinator.getMembershipService();
}
/**
* Sets the channel receiver component
* @param clusterReceiver ChannelReceiver
*/
public void setChannelReceiver(ChannelReceiver clusterReceiver) {
coordinator.setClusterReceiver(clusterReceiver);
}
/**
* Sets the channel sender component
* @param clusterSender ChannelSender
*/
public void setChannelSender(ChannelSender clusterSender) {
coordinator.setClusterSender(clusterSender);
}
/**
* Sets the membership component
* @param membershipService MembershipService
*/
public void setMembershipService(MembershipService membershipService) {
coordinator.setMembershipService(membershipService);
}
/**
* Adds a membership listener to the channel.<br>
* Membership listeners are uniquely identified using the equals(Object) method
* @param membershipListener MembershipListener
*/
public void addMembershipListener(MembershipListener membershipListener) {
if (!this.membershipListeners.contains(membershipListener) )
this.membershipListeners.add(membershipListener);
}
/**
* Removes a membership listener from the channel.<br>
* Membership listeners are uniquely identified using the equals(Object) method
* @param membershipListener MembershipListener
*/
public void removeMembershipListener(MembershipListener membershipListener) {
membershipListeners.remove(membershipListener);
}
/**
* Adds a channel listener to the channel.<br>
* Channel listeners are uniquely identified using the equals(Object) method
* @param channelListener ChannelListener
*/
public void addChannelListener(ChannelListener channelListener) {
if (!this.channelListeners.contains(channelListener) ) {
this.channelListeners.add(channelListener);
} else {
throw new IllegalArgumentException("Listener already exists:"+channelListener+"["+channelListener.getClass().getName()+"]");
}
}
/**
*
* Removes a channel listener from the channel.<br>
* Channel listeners are uniquely identified using the equals(Object) method
* @param channelListener ChannelListener
*/
public void removeChannelListener(ChannelListener channelListener) {
channelListeners.remove(channelListener);
}
/**
* Returns an iterator of all the interceptors in this stack
* @return Iterator
*/
public Iterator getInterceptors() {
return new InterceptorIterator(this.getNext(),this.coordinator);
}
/**
* Enables/disables the option check<br>
* Setting this to true, will make the GroupChannel perform a conflict check
* on the interceptors. If two interceptors are using the same option flag
* and throw an error upon start.
* @param optionCheck boolean
*/
public void setOptionCheck(boolean optionCheck) {
this.optionCheck = optionCheck;
}
/**
* Configure local heartbeat sleep time<br>
* Only used when <code>getHeartbeat()==true</code>
* @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
*/
public void setHeartbeatSleeptime(long heartbeatSleeptime) {
this.heartbeatSleeptime = heartbeatSleeptime;
}
/**
* Enables or disables local heartbeat.
* if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
* thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
* @param heartbeat boolean
*/
public void setHeartbeat(boolean heartbeat) {
this.heartbeat = heartbeat;
}
/**
* @see #setOptionCheck(boolean)
* @return boolean
*/
public boolean getOptionCheck() {
return optionCheck;
}
/**
* @see #setHeartbeat(boolean)
* @return boolean
*/
public boolean getHeartbeat() {
return heartbeat;
}
/**
* Returns the sleep time in milliseconds that the internal heartbeat will
* sleep in between invokations of <code>Channel.heartbeat()</code>
* @return long
*/
public long getHeartbeatSleeptime() {
return heartbeatSleeptime;
}
/**
*
* <p>Title: Interceptor Iterator</p>
*
* <p>Description: An iterator to loop through the interceptors in a channel</p>
*
* @version 1.0
*/
public static class InterceptorIterator implements Iterator {
private ChannelInterceptor end;
private ChannelInterceptor start;
public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {
this.end = end;
this.start = start;
}
public boolean hasNext() {
return start!=null && start != end;
}
public Object next() {
Object result = null;
if ( hasNext() ) {
result = start;
start = start.getNext();
}
return result;
}
public void remove() {
//empty operation
}
}
/**
*
* <p>Title: Internal heartbeat thread</p>
*
* <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
* is created</p>
*
* @version 1.0
*/
public static class HeartbeatThread extends Thread {
protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(HeartbeatThread.class);
protected static int counter = 1;
protected static synchronized int inc() {
return counter++;
}
protected boolean doRun = true;
protected GroupChannel channel;
protected long sleepTime;
public HeartbeatThread(GroupChannel channel, long sleepTime) {
super();
this.setPriority(MIN_PRIORITY);
setName("GroupChannel-Heartbeat-"+inc());
setDaemon(true);
this.channel = channel;
this.sleepTime = sleepTime;
}
public void stopHeartbeat() {
doRun = false;
interrupt();
}
public void run() {
while (doRun) {
try {
Thread.sleep(sleepTime);
channel.heartbeat();
} catch ( InterruptedException x ) {
interrupted();
} catch ( Exception x ) {
log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x);
}//catch
}//while
}//run
}//HeartbeatThread
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -