📄 protocol.java
字号:
// $Id: Protocol.java,v 1.38 2006/04/05 05:33:09 belaban Exp $package org.jgroups.stack;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Event;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import java.util.Map;import java.util.Properties;import java.util.Vector;class UpHandler extends Thread { private Queue mq=null; private Protocol handler=null; private ProtocolObserver observer=null; protected final Log log=LogFactory.getLog(this.getClass()); public UpHandler(Queue mq, Protocol handler, ProtocolObserver observer) { super(Util.getGlobalThreadGroup(), "UpHandler"); this.mq=mq; this.handler=handler; this.observer=observer; if(handler != null) setName("UpHandler (" + handler.getName() + ')'); else setName("UpHandler"); setDaemon(true); } public void setObserver(ProtocolObserver observer) { this.observer=observer; } /** Removes events from mq and calls handler.up(evt) */ public void run() { while(!mq.closed()) { try { Event evt=(Event)mq.remove(); if(evt == null) { if(log.isWarnEnabled()) log.warn("removed null event"); continue; } if(observer != null) { // call debugger hook (if installed) if(observer.up(evt, mq.size()) == false) { // false means discard event return; } } handler.up(evt); } catch(QueueClosedException queue_closed) { break; } catch(Throwable e) { if(log.isErrorEnabled()) log.error(getName() + " caught exception", e); } } }}class DownHandler extends Thread { private Queue mq=null; private Protocol handler=null; private ProtocolObserver observer=null; protected final Log log=LogFactory.getLog(this.getClass()); public DownHandler(Queue mq, Protocol handler, ProtocolObserver observer) { super(Util.getGlobalThreadGroup(), "DownHandler"); this.mq=mq; this.handler=handler; this.observer=observer; if(handler != null) setName("DownHandler (" + handler.getName() + ')'); else setName("DownHandler"); setDaemon(true); } public void setObserver(ProtocolObserver observer) { this.observer=observer; } /** Removes events from mq and calls handler.down(evt) */ public void run() { while(!mq.closed()) { try { Event evt=(Event)mq.remove(); if(evt == null) { if(log.isWarnEnabled()) log.warn("removed null event"); continue; } if(observer != null) { // call debugger hook (if installed) if(observer.down(evt, mq.size()) == false) { // false means discard event continue; } } int type=evt.getType(); if(type == Event.START || type == Event.STOP) { if(handler.handleSpecialDownEvent(evt) == false) continue; } handler.down(evt); } catch(QueueClosedException queue_closed) { break; } catch(Throwable e) { if(log.isErrorEnabled()) log.error(getName() + " caught exception", e); } } }}/** * The Protocol class provides a set of common services for protocol layers. Each layer has to * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>, * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form * a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom * will be passed to its higher layer as an Event. That layer will in turn pass the Event to * its layer and so on, until a layer handles the Message and sends a response or discards it, * the former resulting in another Event being passed down the stack.<p> * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed * in the up-queue where it will be retrieved by the up-handler thread which will invoke method * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol * class, subclasses will almost never have to override this behavior.<p> * The important thing to bear in mind is that Events have to passed on between layers in FIFO * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses * implementing their on Event queuing.<p> * <b>Note that each class implementing interface Protocol MUST provide an empty, public * constructor !</b> */public abstract class Protocol { protected final Properties props=new Properties(); protected Protocol up_prot=null, down_prot=null; protected ProtocolStack stack=null; protected final Queue up_queue=new Queue(); protected final Queue down_queue=new Queue(); protected UpHandler up_handler=null; protected int up_thread_prio=-1; protected DownHandler down_handler=null; protected int down_thread_prio=-1; protected ProtocolObserver observer=null; // hook for debugger private final static long THREAD_JOIN_TIMEOUT=1000; protected boolean down_thread=true; // determines whether the down_handler thread should be started protected boolean up_thread=true; // determines whether the up_handler thread should be started protected boolean stats=true; // determines whether to collect statistics (and expose them via JMX) protected final Log log=LogFactory.getLog(this.getClass()); protected boolean trace=log.isTraceEnabled(); protected boolean warn=log.isWarnEnabled(); /** * Configures the protocol initially. A configuration string consists of name=value * items, separated by a ';' (semicolon), e.g.:<pre> * "loopback=false;unicast_inport=4444" * </pre> */ public boolean setProperties(Properties props) { if(props != null) this.props.putAll(props); return true; } /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then * calls setProperties(), which might invoke the setProperties() method of the actual protocol instance. */ public boolean setPropertiesInternal(Properties props) { this.props.putAll(props); String str=props.getProperty("down_thread"); if(str != null) { down_thread=Boolean.valueOf(str).booleanValue(); props.remove("down_thread"); } str=props.getProperty("down_thread_prio"); if(str != null) { down_thread_prio=Integer.parseInt(str); props.remove("down_thread_prio"); } str=props.getProperty("up_thread"); if(str != null) { up_thread=Boolean.valueOf(str).booleanValue(); props.remove("up_thread"); } str=props.getProperty("up_thread_prio"); if(str != null) { up_thread_prio=Integer.parseInt(str); props.remove("up_thread_prio"); } str=props.getProperty("stats"); if(str != null) { stats=Boolean.valueOf(str).booleanValue(); props.remove("stats"); } return setProperties(props); } public Properties getProperties() { return props; } public boolean isTrace() { return trace; } public void setTrace(boolean trace) { this.trace=trace; } public boolean isWarn() { return warn; } public void setWarn(boolean warn) { this.warn=warn; } public boolean upThreadEnabled() { return up_thread; } public boolean downThreadEnabled() { return down_thread; } public boolean statsEnabled() { return stats; } public void enableStats(boolean flag) { stats=flag; } public void resetStats() { ; } public String printStats() { return null; } public Map dumpStats() { return null; } public void setObserver(ProtocolObserver observer) { this.observer=observer; observer.setProtocol(this); if(up_handler != null) up_handler.setObserver(observer); if(down_handler != null) down_handler.setObserver(observer); } /** * Called after instance has been created (null constructor) and before protocol is started. * Properties are already set. Other protocols are not yet connected and events cannot yet be sent. * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the * ProtocolStack to fail, so the channel constructor will throw an exception */ public void init() throws Exception { } /** * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work. * Protocols are connected and queues are ready to receive events. * Will be called <em>from bottom to top</em>. This call will replace * the <b>START</b> and <b>START_OK</b> events. * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack * to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception */ public void start() throws Exception { } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -