tp.java

来自「JGRoups源码」· Java 代码 · 共 1,778 行 · 第 1/5 页

JAVA
1,778
字号
package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.stack.IpAddress;import org.jgroups.util.*;import org.jgroups.util.List;import org.jgroups.util.Queue;import java.io.DataInputStream;import java.io.IOException;import java.net.*;import java.text.NumberFormat;import java.util.*;/** * Generic transport - specific implementations should extend this abstract class. * Features which are provided to the subclasses include * <ul> * <li>version checking * <li>marshalling and unmarshalling * <li>message bundling (handling single messages, and message lists) * <li>incoming packet handler * <li>loopback * </ul> * A subclass has to override * <ul> * <li>{@link #sendToAllMembers(byte[], int, int)} * <li>{@link #sendToSingleMember(org.jgroups.Address, byte[], int, int)} * <li>{@link #init()} * <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves * (e.g., created their sockets). * <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves * <li>{@link #destroy()} * </ul> * The create() or start() method has to create a local address.<br> * The {@link #receive(Address, Address, byte[], int, int)} method must * be called by subclasses when a unicast or multicast message has been received. * @author Bela Ban * @version $Id: TP.java,v 1.77 2006/10/24 13:20:25 belaban Exp $ */public abstract class TP extends Protocol {    /** The address (host and port) of this member */    protected Address         local_addr=null;    /** The name of the group to which this member is connected */    protected String          channel_name=null;    /** The interface (NIC) which should be used by this transport */    protected InetAddress     bind_addr=null;    /** Overrides bind_addr, -Djgroups.bind_addr and -Dbind.address: let's the OS return the local host address */    boolean         use_local_host=false;    /** If true, the transport should use all available interfaces to receive multicast messages     * @deprecated  Use {@link receive_on_all_interfaces} instead */    boolean         bind_to_all_interfaces=false;    /** If true, the transport should use all available interfaces to receive multicast messages */    boolean         receive_on_all_interfaces=false;    /** List<NetworkInterface> of interfaces to receive multicasts on. The multicast receive socket will listen     * on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g.     * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once.     * If this property is set, it override receive_on_all_interfaces.     */    java.util.List  receive_interfaces=null;    /** If true, the transport should use all available interfaces to send multicast messages. This means     * the same multicast message is sent N times, so use with care */    boolean         send_on_all_interfaces=false;    /** List<NetworkInterface> of interfaces to send multicasts on. The multicast send socket will send the     * same multicast message on all of these interfaces. This is a comma-separated list of IP addresses or     * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded.     * If this property is set, it override send_on_all_interfaces.     */    java.util.List  send_interfaces=null;    /** The port to which the transport binds. 0 means to bind to any (ephemeral) port */    int             bind_port=0;    int				port_range=1; // 27-6-2003 bgooren, Only try one port by default    /** The members of this group (updated when a member joins or leaves) */    final protected Vector    members=new Vector(11);    protected View            view=null;    /** Pre-allocated byte stream. Used for marshalling messages. Will grow as needed */    final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024);    final ExposedBufferedOutputStream  buf_out_stream=new ExposedBufferedOutputStream(out_stream, 1024);    final ExposedDataOutputStream      dos=new ExposedDataOutputStream(buf_out_stream);    final ExposedByteArrayInputStream  in_stream=new ExposedByteArrayInputStream(new byte[]{'0'});    final ExposedBufferedInputStream   buf_in_stream=new ExposedBufferedInputStream(in_stream);    final DataInputStream              dis=new DataInputStream(buf_in_stream);    /** If true, messages sent to self are treated specially: unicast messages are     * looped back immediately, multicast messages get a local copy first and -     * when the real copy arrives - it will be discarded. Useful for Window     * media (non)sense */    boolean         loopback=false;    /** Discard packets with a different version. Usually minor version differences are okay. Setting this property     * to true means that we expect the exact same version on all incoming packets */    boolean         discard_incompatible_packets=false;    /** Sometimes receivers are overloaded (they have to handle de-serialization etc).     * Packet handler is a separate thread taking care of de-serialization, receiver     * thread(s) simply put packet in queue and return immediately. Setting this to     * true adds one more thread */    boolean         use_incoming_packet_handler=true;    /** Used by packet handler to store incoming DatagramPackets */    Queue           incoming_packet_queue=null;    /** Dequeues DatagramPackets from packet_queue, unmarshalls them and     * calls <tt>handleIncomingUdpPacket()</tt> */    IncomingPacketHandler   incoming_packet_handler=null;    /** Used by packet handler to store incoming Messages */    Queue                  incoming_msg_queue=null;    IncomingMessageHandler incoming_msg_handler;    /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this     * value uses an additional thread */    boolean               use_outgoing_packet_handler=false;    /** Used by packet handler to store outgoing DatagramPackets */    BoundedLinkedQueue    outgoing_queue=null;    /** max number of elements in the bounded outgoing_queue */    int                   outgoing_queue_max_size=2000;    OutgoingPacketHandler outgoing_packet_handler=null;    /** If set it will be added to <tt>local_addr</tt>. Used to implement     * for example transport independent addresses */    byte[]          additional_data=null;    /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller        than the largest datagram packet size in case of UDP */    int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic();    /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or     * max_bundle_timeout has been exceeded (whichever occurs faster)     */    long max_bundle_timeout=20;    /** Enabled bundling of smaller messages into bigger ones */    boolean enable_bundling=false;    private Bundler    bundler=null;    protected TimeScheduler      timer=null;    private DiagnosticsHandler diag_handler=null;    boolean enable_diagnostics=true;    String diagnostics_addr="224.0.0.75";    int    diagnostics_port=7500;    /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds     * an entry with key=S and value= sender's IP address and port.     */    HashMap addr_translation_table=new HashMap();    boolean use_addr_translation=false;    TpHeader header;    final String name=getName();    static final byte LIST      = 1;  // we have a list of messages rather than a single message when set    static final byte MULTICAST = 2;  // message is a multicast (versus a unicast) message when set    long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;    static  NumberFormat f;    static {        f=NumberFormat.getNumberInstance();        f.setGroupingUsed(false);        f.setMaximumFractionDigits(2);    }    /**     * Creates the TP protocol, and initializes the     * state variables, does however not start any sockets or threads.     */    protected TP() {    }    /**     * debug only     */    public String toString() {        return name + "(local address: " + local_addr + ')';    }    public void resetStats() {        num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;    }    public long getNumMessagesSent()     {return num_msgs_sent;}    public long getNumMessagesReceived() {return num_msgs_received;}    public long getNumBytesSent()        {return num_bytes_sent;}    public long getNumBytesReceived()    {return num_bytes_received;}    public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";}    public void setBindAddress(String bind_addr) throws UnknownHostException {        this.bind_addr=InetAddress.getByName(bind_addr);    }    /** @deprecated Use {@link #isReceiveOnAllInterfaces()} instead */    public boolean getBindToAllInterfaces() {return receive_on_all_interfaces;}    public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;}    public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;}    public java.util.List getReceiveInterfaces() {return receive_interfaces;}    public boolean isSendOnAllInterfaces() {return send_on_all_interfaces;}    public java.util.List getSendInterfaces() {return send_interfaces;}    public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;}    public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;}    public boolean isEnableBundling() {return enable_bundling;}    public void setEnableBundling(boolean flag) {enable_bundling=flag;}    public int getMaxBundleSize() {return max_bundle_size;}    public void setMaxBundleSize(int size) {max_bundle_size=size;}    public long getMaxBundleTimeout() {return max_bundle_timeout;}    public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;}    public int getOutgoingQueueSize() {return outgoing_queue != null? outgoing_queue.size() : 0;}    public int getIncomingQueueSize() {return incoming_packet_queue != null? incoming_packet_queue.size() : 0;}    public Address getLocalAddress() {return local_addr;}    public String getChannelName() {return channel_name;}    public boolean isLoopback() {return loopback;}    public void setLoopback(boolean b) {loopback=b;}    public boolean isUseIncomingPacketHandler() {return use_incoming_packet_handler;}    public boolean isUseOutgoingPacketHandler() {return use_outgoing_packet_handler;}    public int getOutgoingQueueMaxSize() {return outgoing_queue != null? outgoing_queue_max_size : 0;}    public void setOutgoingQueueMaxSize(int new_size) {        if(outgoing_queue != null) {            outgoing_queue.setCapacity(new_size);            outgoing_queue_max_size=new_size;        }    }    public Map dumpStats() {        Map retval=super.dumpStats();        if(retval == null)            retval=new HashMap();        retval.put("num_msgs_sent", new Long(num_msgs_sent));        retval.put("num_msgs_received", new Long(num_msgs_received));        retval.put("num_bytes_sent", new Long(num_bytes_sent));        retval.put("num_bytes_received", new Long(num_bytes_received));        return retval;    }    /**     * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N     * messages, one for each member     * @param data The data to be sent. This is not a copy, so don't modify it     * @param offset     * @param length     * @throws Exception     */    public abstract void sendToAllMembers(byte[] data, int offset, int length) throws Exception;    /**     * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N     * messages, one for each member     * @param dest Must be a non-null unicast address     * @param data The data to be sent. This is not a copy, so don't modify it     * @param offset     * @param length     * @throws Exception     */    public abstract void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception;    public abstract String getInfo();    public abstract void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast);    public abstract void postUnmarshallingList(Message msg, Address dest, boolean multicast);    private StringBuffer _getInfo() {        StringBuffer sb=new StringBuffer();        sb.append(local_addr).append(" (").append(channel_name).append(") ").append("\n");        sb.append("local_addr=").append(local_addr).append("\n");        sb.append("group_name=").append(channel_name).append("\n");        sb.append("version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n");        sb.append("view: ").append(view).append('\n');        sb.append(getInfo());        return sb;    }    private void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String request) {        try {            StringTokenizer tok=new StringTokenizer(request);            String req=tok.nextToken();            StringBuffer info=new StringBuffer("n/a");            if(req.trim().toLowerCase().startsWith("query")) {                ArrayList l=new ArrayList(tok.countTokens());                while(tok.hasMoreTokens())                    l.add(tok.nextToken().trim().toLowerCase());                info=_getInfo();                if(l.contains("jmx")) {                    Channel ch=stack.getChannel();                    if(ch != null) {                        Map m=ch.dumpStats();                        StringBuffer sb=new StringBuffer();                        sb.append("stats:\n");                        for(Iterator it=m.entrySet().iterator(); it.hasNext();) {                            sb.append(it.next()).append("\n");                        }                        info.append(sb);                    }                }                if(l.contains("props")) {                    String p=stack.printProtocolSpecAsXML();                    info.append("\nprops:\n").append(p);                }            }            byte[] diag_rsp=info.toString().getBytes();            if(log.isDebugEnabled())                log.debug("sending diag response to " + sender);            sendResponse(sock, sender, diag_rsp);        }        catch(Throwable t) {            if(log.isErrorEnabled())                log.error("failed sending diag rsp to " + sender, t);        }    }    private static void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException {        DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender);        sock.send(p);    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?