jchannel.java

来自「JGRoups源码」· Java 代码 · 共 1,619 行 · 第 1/5 页

JAVA
1,619
字号
package org.jgroups;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.conf.ConfiguratorFactory;import org.jgroups.conf.ProtocolStackConfigurator;import org.jgroups.stack.ProtocolStack;import org.jgroups.stack.StateTransferInfo;import org.jgroups.stack.IpAddress;import org.jgroups.util.*;import org.w3c.dom.Element;import java.io.File;import java.io.InputStream;import java.io.OutputStream;import java.io.Serializable;import java.net.URL;import java.util.HashMap;import java.util.Map;import java.util.Vector;/** * JChannel is a pure Java implementation of Channel. * When a JChannel object is instantiated it automatically sets up the * protocol stack. * <p> * <B>Properties</B> * <P> * Properties are used to configure a channel, and are accepted in * several forms; the String form is described here. * A property string consists of a number of properties separated by * colons.  For example: * <p> * <pre>"&lt;prop1&gt;(arg1=val1):&lt;prop2&gt;(arg1=val1;arg2=val2):&lt;prop3&gt;:&lt;propn&gt;"</pre> * <p> * Each property relates directly to a protocol layer, which is * implemented as a Java class. When a protocol stack is to be created * based on the above property string, the first property becomes the * bottom-most layer, the second one will be placed on the first, etc.: * the stack is created from the bottom to the top, as the string is * parsed from left to right. Each property has to be the name of a * Java class that resides in the * {@link org.jgroups.protocols} package. * <p> * Note that only the base name has to be given, not the fully specified * class name (e.g., UDP instead of org.jgroups.protocols.UDP). * <p> * Each layer may have 0 or more arguments, which are specified as a * list of name/value pairs in parentheses directly after the property. * In the example above, the first protocol layer has 1 argument, * the second 2, the third none. When a layer is created, these * properties (if there are any) will be set in a layer by invoking * the layer's setProperties() method * <p> * As an example the property string below instructs JGroups to create * a JChannel with protocols UDP, PING, FD and GMS:<p> * <pre>"UDP(mcast_addr=228.10.9.8;mcast_port=5678):PING:FD:GMS"</pre> * <p> * The UDP protocol layer is at the bottom of the stack, and it * should use mcast address 228.10.9.8. and port 5678 rather than * the default IP multicast address and port. The only other argument * instructs FD to output debug information while executing. * Property UDP refers to a class {@link org.jgroups.protocols.UDP}, * which is subsequently loaded and an instance of which is created as protocol layer. * If any of these classes are not found, an exception will be thrown and * the construction of the stack will be aborted. * * @author Bela Ban * @version $Id: JChannel.java,v 1.106 2006/10/28 02:42:32 vlada Exp $ */public class JChannel extends Channel {    /**     * The default protocol stack used by the default constructor.     */    public static final String DEFAULT_PROTOCOL_STACK=            "UDP(down_thread=false;mcast_send_buf_size=640000;mcast_port=45566;discard_incompatible_packets=true;" +                    "ucast_recv_buf_size=20000000;mcast_addr=228.10.10.10;up_thread=false;loopback=false;" +                    "mcast_recv_buf_size=25000000;max_bundle_size=64000;max_bundle_timeout=30;" +                    "use_incoming_packet_handler=true;use_outgoing_packet_handler=false;" +                    "ucast_send_buf_size=640000;tos=16;enable_bundling=true;ip_ttl=2):" +            "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):" +            "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):" +            "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false):" +            "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):" +            "pbcast.NAKACK(max_xmit_size=60000;down_thread=false;use_mcast_xmit=false;gc_lag=0;" +                    "discard_delivered_msgs=true;up_thread=false;retransmit_timeout=100,200,300,600,1200,2400,4800):" +            "UNICAST(timeout=300,600,1200,2400,3600;down_thread=false;up_thread=false):" +            "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=50000;max_bytes=400000;down_thread=false;" +                    "up_thread=false):" +            "VIEW_SYNC(down_thread=false;avg_send_interval=60000;up_thread=false):" +            "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;" +                    "join_retry_timeout=2000;up_thread=false;shun=true):" +            "FC(max_credits=2000000;down_thread=false;up_thread=false;min_threshold=0.10):" +            "FRAG2(frag_size=60000;down_thread=false;up_thread=false):" +            "pbcast.STATE_TRANSFER(down_thread=false;up_thread=false)";    static final String FORCE_PROPS="force.properties";    /* the protocol stack configuration string */    private String props=null;    /*the address of this JChannel instance*/    private Address local_addr=null;    /*the channel (also know as group) name*/    private String cluster_name=null;  // group name    /*the latest view of the group membership*/    private View my_view=null;    /*the queue that is used to receive messages (events) from the protocol stack*/    private final Queue mq=new Queue();    /*the protocol stack, used to send and receive messages from the protocol stack*/    private ProtocolStack prot_stack=null;    /** Thread responsible for closing a channel and potentially reconnecting to it (e.g., when shunned). */    protected CloserThread closer=null;    /** To wait until a local address has been assigned */    private final Promise local_addr_promise=new Promise();    /** To wait until we have connected successfully */    private final Promise connect_promise=new Promise();    /** To wait until we have been disconnected from the channel */    private final Promise disconnect_promise=new Promise();    private final Promise state_promise=new Promise();    private final Promise flush_unblock_promise=new Promise();    private final Promise flush_promise=new Promise();    /** wait until we have a non-null local_addr */    private long LOCAL_ADDR_TIMEOUT=30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));    /*if the states is fetched automatically, this is the default timeout, 5 secs*/    private static final long GET_STATE_DEFAULT_TIMEOUT=5000;    /*if FLUSH is used channel waits for UNBLOCK event, this is the default timeout, 10 secs*/    private static final long FLUSH_UNBLOCK_TIMEOUT=10000;    /*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/    private boolean receive_blocks=false;    /*flag to indicate whether to receive local messages     *if this is set to false, the JChannel will not receive messages sent by itself*/    private boolean receive_local_msgs=true;    /*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/    private boolean auto_reconnect=false;    /*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected     *setting this to true, automatically forces auto_reconnect to true*/    private boolean auto_getstate=false;    /*channel connected flag*/    protected boolean connected=false;    /*channel closed flag*/    protected boolean closed=false;      // close() has been called, channel is unusable    /** True if a state transfer protocol is available, false otherwise */    private boolean state_transfer_supported=false; // set by CONFIG event from STATE_TRANSFER protocol    /** True if a flush protocol is available, false otherwise */    private volatile boolean flush_supported=false; // set by CONFIG event from FLUSH protocol    /** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove     * as soon as JGroups supports logical addresses     */    private byte[] additional_data=null;    protected final Log log=LogFactory.getLog(getClass());    /** Collect statistics */    protected boolean stats=true;    protected long sent_msgs=0, received_msgs=0, sent_bytes=0, received_bytes=0;    /** Used by subclass to create a JChannel without a protocol stack, don't use as application programmer */    protected JChannel(boolean no_op) {        ;    }    /**     * Constructs a <code>JChannel</code> instance with the protocol stack     * specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.     *     * @throws ChannelException if problems occur during the initialization of     *                          the protocol stack.     */    public JChannel() throws ChannelException {        this(DEFAULT_PROTOCOL_STACK);    }    /**     * Constructs a <code>JChannel</code> instance with the protocol stack     * configuration contained by the specified file.     *     * @param properties a file containing a JGroups XML protocol stack     *                   configuration.     *     * @throws ChannelException if problems occur during the configuration or     *                          initialization of the protocol stack.     */    public JChannel(File properties) throws ChannelException {        this(ConfiguratorFactory.getStackConfigurator(properties));    }    /**     * Constructs a <code>JChannel</code> instance with the protocol stack     * configuration contained by the specified XML element.     *     * @param properties a XML element containing a JGroups XML protocol stack     *                   configuration.     *     * @throws ChannelException if problems occur during the configuration or     *                          initialization of the protocol stack.     */    public JChannel(Element properties) throws ChannelException {        this(ConfiguratorFactory.getStackConfigurator(properties));    }    /**     * Constructs a <code>JChannel</code> instance with the protocol stack     * configuration indicated by the specified URL.     *     * @param properties a URL pointing to a JGroups XML protocol stack     *                   configuration.     *     * @throws ChannelException if problems occur during the configuration or     *                          initialization of the protocol stack.     */    public JChannel(URL properties) throws ChannelException {        this(ConfiguratorFactory.getStackConfigurator(properties));    }    /**     * Constructs a <code>JChannel</code> instance with the protocol stack     * configuration based upon the specified properties parameter.     *     * @param properties an old style property string, a string representing a     *                   system resource containing a JGroups XML configuration,     *                   a string representing a URL pointing to a JGroups XML     *                   XML configuration, or a string representing a file name     *                   that contains a JGroups XML configuration.     *     * @throws ChannelException if problems occur during the configuration and     *                          initialization of the protocol stack.     */    public JChannel(String properties) throws ChannelException {        this(ConfiguratorFactory.getStackConfigurator(properties));    }    /**     * Constructs a <code>JChannel</code> instance with the protocol stack     * configuration contained by the protocol stack configurator parameter.     * <p>     * All of the public constructors of this class eventually delegate to this     * method.     *     * @param configurator a protocol stack configurator containing a JGroups     *                     protocol stack configuration.     *     * @throws ChannelException if problems occur during the initialization of     *                          the protocol stack.     */    protected JChannel(ProtocolStackConfigurator configurator) throws ChannelException {        init(configurator);    }    /**     * Creates a new JChannel with the protocol stack as defined in the properties     * parameter. an example of this parameter is<BR>     * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>     * Other examples can be found in the ./conf directory<BR>     * @param properties the protocol stack setup; if null, the default protocol stack will be used.     * 					 The properties can also be a java.net.URL object or a string that is a URL spec.     *                   The JChannel will validate any URL object and String object to see if they are a URL.     *                   In case of the parameter being a url, the JChannel will try to load the xml from there.     *                   In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the     *                   DOM tree with the element as its root element.     * @deprecated Use the constructors with specific parameter types instead.     */    public JChannel(Object properties) throws ChannelException {        if (properties == null)            properties = DEFAULT_PROTOCOL_STACK;        ProtocolStackConfigurator c=null;        try {            c=ConfiguratorFactory.getStackConfigurator(properties);        }        catch(Exception x) {            throw new ChannelException("unable to load protocol stack", x);        }        init(c);    }    /**     * Returns the protocol stack.     * Currently used by Debugger.     * Specific to JChannel, therefore     * not visible in Channel     */    public ProtocolStack getProtocolStack() {        return prot_stack;    }    protected Log getLog() {        return log;    }    /**     * returns the protocol stack configuration in string format.     * an example of this property is<BR>     * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"     */    public String getProperties() {        return props;    }    public boolean statsEnabled() {        return stats;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?