📄 protocol.java
字号:
package com.huawei.comm.smap;
// $Id: Protocol.java,v 1.38 2006/04/05 05:33:09 belaban Exp $
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
class UpHandler
extends Thread
{
private Queue mq = null;
private Protocol handler = null;
private ProtocolObserver observer = null;
private Protocol prot = null;
protected final Log log = LogFactory.getLog(this.getClass());
public UpHandler(Protocol prot, Queue mq, Protocol handler,
ProtocolObserver observer)
{
super();
this.mq = mq;
this.handler = handler;
this.observer = observer;
this.prot = prot;
if (handler != null)
{
setName("UpHandler (" + handler.getName() + ')');
}
else
{
setName("UpHandler");
}
setDaemon(true);
}
public void setObserver(ProtocolObserver observer)
{
this.observer = observer;
}
/** Removes events from mq and calls handler.up(evt) */
public void run()
{
System.out.println("UpHandler-----线程启动");
while (!mq.closed())
{
try
{
Event evt = (Event) mq.remove();
if (evt == null)
{
if (log.isWarnEnabled())
{
log.warn("removed null event");
}
continue;
}
if (observer != null)
{ // call debugger hook (if installed)
if (observer.up(evt, mq.size()) == false)
{ // false means discard event
return;
}
}
int type = evt.getType();
if (type != Event.START && type != Event.STOP)
{
evt = prot.handleUpEvent(evt);
}
if (null != evt)
{
handler.up(evt);
System.out.println("smap UpHandler-队列数:"+mq.size());
}
}
catch (QueueClosedException queue_closed)
{
break;
}
catch (Throwable e)
{
if (log.isErrorEnabled())
{
log.error(getName() + " caught exception", e);
}
}
}
System.out.println("UpHandler-----线程exit");
}
}
class DownHandler
extends Thread
{
private Queue mq = null;
private Protocol handler = null;
private Protocol prot = null;
private ProtocolObserver observer = null;
protected final Log log = LogFactory.getLog(this.getClass());
public DownHandler(Protocol prot, Queue mq, Protocol handler,
ProtocolObserver observer)
{
super();
this.mq = mq;
this.handler = handler;
this.observer = observer;
this.prot = prot;
if (handler != null)
{
setName("DownHandler (" + handler.getName() + ')');
}
else
{
setName("DownHandler");
}
setDaemon(true);
}
public void setObserver(ProtocolObserver observer)
{
this.observer = observer;
}
/** Removes events from mq and calls handler.down(evt) */
public void run()
{
System.out.println("DownHandler-----线程启动");
while (!mq.closed())
{
try
{
Event evt = (Event) mq.remove();
if (evt == null)
{
if (log.isWarnEnabled())
{
log.warn("removed null event");
}
continue;
}
if (observer != null)
{ // call debugger hook (if installed)
if (observer.down(evt, mq.size()) == false)
{ // false means discard event
continue;
}
}
int type = evt.getType();
if (type == Event.START || type == Event.STOP)
{
if (handler.handleSpecialDownEvent(evt) == false)
{
continue;
}
}
else
{
evt = prot.handleDownEvent(evt);
}
if (null != evt)
{
handler.down(evt);
}
}
catch (QueueClosedException queue_closed)
{
break;
}
catch (Throwable e)
{
if (log.isErrorEnabled())
{
log.error(getName() + " caught exception", e);
}
}
}
System.out.println("DownHandler-----线程退出");
}
}
/**
* The Protocol class provides a set of common services for protocol layers. Each layer has to
* be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
* <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
* a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower
* layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
* will be passed to its higher layer as an Event. That layer will in turn pass the Event to
* its layer and so on, until a layer handles the Message and sends a response or discards it,
* the former resulting in another Event being passed down the stack.<p>
* Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
* received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
* in the up-queue where it will be retrieved by the up-handler thread which will invoke method
* <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
* of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
* class, subclasses will almost never have to override this behavior.<p>
* The important thing to bear in mind is that Events have to passed on between layers in FIFO
* order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
* implementing their on Event queuing.<p>
* <b>Note that each class implementing interface Protocol MUST provide an empty, public
* constructor !</b>
*/
public abstract class Protocol
{
protected final Properties props = new Properties();
protected Protocol up_prot = null, down_prot = null;
protected ProtocolStack stack = null;
protected final Queue up_queue = new Queue();
protected final Queue down_queue = new Queue();
protected UpHandler up_handler = null;
protected int up_thread_prio = -1;
protected DownHandler down_handler = null;
protected int down_thread_prio = -1;
protected ProtocolObserver observer = null; // hook for debugger
private final static long THREAD_JOIN_TIMEOUT = 1000;
protected boolean down_thread = true; // determines whether the down_handler thread should be started
protected boolean up_thread = true; // determines whether the up_handler thread should be started
protected boolean stats = true; // determines whether to collect statistics (and expose them via JMX)
protected final Log log = LogFactory.getLog(this.getClass());
protected boolean trace = log.isTraceEnabled();
protected boolean warn = log.isWarnEnabled();
/**
* Configures the protocol initially. A configuration string consists of name=value
* items, separated by a ';' (semicolon), e.g.:<pre>
* "loopback=false;unicast_inport=4444"
* </pre>
*/
public boolean setProperties(Properties props)
{
if (props != null)
{
this.props.putAll(props);
}
return true;
}
/** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
* calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
*/
public boolean setPropertiesInternal(Properties props)
{
this.props.putAll(props);
String str = props.getProperty("down_thread");
if (str != null)
{
down_thread = Boolean.valueOf(str).booleanValue();
props.remove("down_thread");
}
str = props.getProperty("down_thread_prio");
if (str != null)
{
down_thread_prio = Integer.parseInt(str);
props.remove("down_thread_prio");
}
str = props.getProperty("up_thread");
if (str != null)
{
up_thread = Boolean.valueOf(str).booleanValue();
props.remove("up_thread");
}
str = props.getProperty("up_thread_prio");
if (str != null)
{
up_thread_prio = Integer.parseInt(str);
props.remove("up_thread_prio");
}
str = props.getProperty("stats");
if (str != null)
{
stats = Boolean.valueOf(str).booleanValue();
props.remove("stats");
}
return setProperties(props);
}
public Properties getProperties()
{
return props;
}
public boolean isTrace()
{
return trace;
}
public void setTrace(boolean trace)
{
this.trace = trace;
}
public boolean isWarn()
{
return warn;
}
public void setWarn(boolean warn)
{
this.warn = warn;
}
public boolean upThreadEnabled()
{
return up_thread;
}
public boolean enableUpThread(boolean flag)
{
return up_thread = flag;
}
public boolean enableDownThread(boolean flag)
{
return down_thread = flag;
}
public boolean downThreadEnabled()
{
return down_thread;
}
public boolean statsEnabled()
{
return stats;
}
public void enableStats(boolean flag)
{
stats = flag;
}
public void resetStats()
{
;
}
public String printStats()
{
return null;
}
public Map dumpStats()
{
return null;
}
public void setObserver(ProtocolObserver observer)
{
this.observer = observer;
observer.setProtocol(this);
if (up_handler != null)
{
up_handler.setObserver(observer);
}
if (down_handler != null)
{
down_handler.setObserver(observer);
}
}
/**
* Called after instance has been created (null constructor) and before protocol is started.
* Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
* @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
* ProtocolStack to fail, so the channel constructor will throw an exception
*/
public void init()
throws Exception
{
}
/**
* This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
* Protocols are connected and queues are ready to receive events.
* Will be called <em>from bottom to top</em>. This call will replace
* the <b>START</b> and <b>START_OK</b> events.
* @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
* to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
*/
public void start()
throws Exception
{
}
/**
* This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
* Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
* neighbor protocol below is still working. This method will replace the
* <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
* when this method is called all messages in the down queue will have been flushed
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -