udp_nio.java.txt

来自「JGRoups源码」· 文本 代码 · 共 1,513 行 · 第 1/4 页

TXT
1,513
字号
package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.LogicalAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.*;import java.net.*;import java.util.*;/** * Multicast transport. Similar to UDP, but binds to multiple (or all) interfaces for sending and receiving * multicast and unicast traffic.<br/> * The list of interfaces can be set via a property (comma-delimited list of IP addresses or "all" for all * interfaces). Note that this class only works under JDK 1.4 and higher.<br/> * For each of the interfaces listed we create a Listener, which listens on the group multicast address and creates * a unicast datagram socket. The address of this member is determined at startup time, and is the host name plus * a timestamp (LogicalAddress). It does not change during the lifetime of the process. The LogicalAddress contains * a list of all unicast socket addresses to which we can send back unicast messages. When we send a message, the * Listener adds the sender's return address. When we receive a message, we add that address to our routing cache, which * contains logical addresses and physical addresses. When we need to send a unicast address, we first check whether * the logical address has a physical address associated with it in the cache. If so, we send a message to that address. * If not, we send the unicast message to <em>all</em> physical addresses contained in the LogicalAddress.<br/> * UDP_NIO guarantees that - in scenarios with multiple subnets and multi-homed machines - members do see each other. * There is some overhead in multicasting the same message on multiple interfaces, and potentially sending a unicast * on multiple interfaces as well, but the advantage is that we don't need stuff like bind_addr any longer. Plus, * the unicast routing caches should ensure that unicasts are only sent via 1 interface in almost all cases. *  * @author Bela Ban Oct 2003 * @version $Id: UDP_NIO.java.txt,v 1.1 2006/05/02 08:56:59 belaban Exp $ * @deprecated Use UDP instead with send_interfaces or receive_interfaces properties defined */public class UDP_NIO extends Protocol implements  Receiver {    static final String name="UDP_NIO";    /** Maintains a list of Connectors, one for each interface we're listening on */    ConnectorTable ct=null;    /** A List<String> of bind addresses, we create 1 Connector for each interface */    List bind_addrs=null;    /** The name of the group to which this member is connected */    String group_name=null;    /** The multicast address (mcast address and port) this member uses (default: 230.1.2.3:7500) */    InetSocketAddress mcast_addr=null;    /** The address of this member. Valid for the lifetime of the JVM in which this member runs */    LogicalAddress local_addr=new LogicalAddress(null, null);    /** Logical address without list of physical addresses */    LogicalAddress local_addr_canonical=local_addr.copy();    /** Pre-allocated byte stream. Used for serializing datagram packets */    ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535);    /**     * The port to which the unicast receiver socket binds.     * 0 means to bind to any (ephemeral) port     */    int local_bind_port=0;    int port_range=1; // 27-6-2003 bgooren, Only try one port by default    /**     * Whether to enable IP multicasting. If false, multiple unicast datagram     * packets are sent rather than one multicast packet     */    boolean ip_mcast=true;    /** The time-to-live (TTL) for multicast datagram packets */    int ip_ttl=32;    /** The members of this group (updated when a member joins or leaves) */    Vector members=new Vector();    /**     * Header to be added to all messages sent via this protocol. It is     * preallocated for efficiency     */    UdpHeader udp_hdr=null;    /** Send buffer size of the multicast datagram socket */    int mcast_send_buf_size=300000;    /** Receive buffer size of the multicast datagram socket */    int mcast_recv_buf_size=300000;    /** Send buffer size of the unicast datagram socket */    int ucast_send_buf_size=300000;    /** Receive buffer size of the unicast datagram socket */    int ucast_recv_buf_size=300000;    /**     * If true, messages sent to self are treated specially: unicast messages are     * looped back immediately, multicast messages get a local copy first and -     * when the real copy arrives - it will be discarded. Useful for Window     * media (non)sense     * @deprecated This is used by default now     */    boolean loopback=true; //todo: remove    /**     * Sometimes receivers are overloaded (they have to handle de-serialization etc).     * Packet handler is a separate thread taking care of de-serialization, receiver     * thread(s) simply put packet in queue and return immediately. Setting this to     * true adds one more thread     */    boolean use_packet_handler=false;    /** Used by packet handler to store incoming DatagramPackets */    Queue packet_queue=null;    /**     * If set it will be added to <tt>local_addr</tt>. Used to implement     * for example transport independent addresses     */    byte[] additional_data=null;    /**     * Dequeues DatagramPackets from packet_queue, unmarshalls them and     * calls <tt>handleIncomingUdpPacket()</tt>     */    PacketHandler packet_handler=null;    /** Number of bytes to allocate to receive a packet. Needs to be set to be higher than frag_size     * (handle CONFIG event)     */    static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000;  // todo: make settable and/or use CONFIG event    /**     * Creates the UDP_NIO protocol, and initializes the     * state variables, does however not start any sockets or threads.     */    public UDP_NIO() {    }    /**     * debug only     */    public String toString() {        return "Protocol UDP(local address: " + local_addr + ')';    }    public void receive(DatagramPacket packet) {        int           len=packet.getLength();        byte[]        data=packet.getData();        SocketAddress sender=packet.getSocketAddress();        if(len == 4) {  // received a diagnostics probe            if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {                handleDiagnosticProbe(sender);                return;            }        }        if(trace)            log.trace("received " + len + " bytes from " + sender);        if(use_packet_handler && packet_queue != null) {            byte[] tmp=new byte[len];            System.arraycopy(data, 0, tmp, 0, len);            try {                Object[] arr=new Object[]{tmp, sender};                packet_queue.add(arr);                return;            }            catch(QueueClosedException e) {                if(warn) log.warn("packet queue for packet handler thread is closed");                // pass through to handleIncomingPacket()            }        }        handleIncomingUdpPacket(data, sender);    }    /* ----------------------- Receiving of MCAST UDP packets ------------------------ *///    public void run() {//        DatagramPacket packet;//        byte receive_buf[]=new byte[65000];//        int len;//        byte[] tmp1, tmp2;////        // moved out of loop to avoid excessive object creations (bela March 8 2001)//        packet=new DatagramPacket(receive_buf, receive_buf.length);////        while(mcast_receiver != null && mcast_sock != null) {//            try {//                packet.setData(receive_buf, 0, receive_buf.length);//                mcast_sock.receive(packet);//                len=packet.getLength();//                if(len == 1 && packet.getData()[0] == 0) {//                    if(trace) if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "received dummy packet");//                    continue;//                }////                if(len == 4) {  // received a diagnostics probe//                    byte[] tmp=packet.getData();//                    if(tmp[0] == 'd' && tmp[1] == 'i' && tmp[2] == 'a' && tmp[3] == 'g') {//                        handleDiagnosticProbe(null, null);//                        continue;//                    }//                }////                if(trace)//                    if(log.isInfoEnabled()) log.info("UDP_NIO.receive()", "received (mcast) " + packet.getLength() + " bytes from " +//                            packet.getAddress() + ":" + packet.getPort() + " (size=" + len + " bytes)");//                if(len > receive_buf.length) {//                    if(log.isErrorEnabled()) log.error("UDP_NIO.run()", "size of the received packet (" + len + ") is bigger than " +//                            "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +//                            "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);//                }////                if(Version.compareTo(packet.getData()) == false) {//                    if(warn) log.warn("UDP_NIO.run()",//                            "packet from " + packet.getAddress() + ":" + packet.getPort() +//                            " has different version (" +//                            Version.printVersionId(packet.getData(), Version.version_id.length) +//                            ") from ours (" + Version.printVersionId(Version.version_id) +//                            "). This may cause problems");//                }////                if(use_packet_handler) {//                    tmp1=packet.getData();//                    tmp2=new byte[len];//                    System.arraycopy(tmp1, 0, tmp2, 0, len);//                    packet_queue.add(tmp2);//                } else//                    handleIncomingUdpPacket(packet.getData());//            } catch(SocketException sock_ex) {//                 if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "multicast socket is closed, exception=" + sock_ex);//                break;//            } catch(InterruptedIOException io_ex) { // thread was interrupted//                ; // go back to top of loop, where we will terminate loop//            } catch(Throwable ex) {//                if(log.isErrorEnabled()) log.error("UDP_NIO.run()", "exception=" + ex + ", stack trace=" + Util.printStackTrace(ex));//                Util.sleep(1000); // so we don't get into 100% cpu spinning (should NEVER happen !)//            }//        }//         if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "multicast thread terminated");//    }    void handleDiagnosticProbe(SocketAddress sender) {        try {            byte[] diag_rsp=getDiagResponse().getBytes();            DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender);                if(log.isInfoEnabled()) log.info("sending diag response to " + sender);            ct.send(rsp);        } catch(Throwable t) {            if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ", exception=" + t);        }    }    String getDiagResponse() {        StringBuffer sb=new StringBuffer();        sb.append(local_addr).append(" (").append(group_name).append(')');        sb.append(" [").append(mcast_addr).append("]\n");        sb.append("Version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n");        sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n');        sb.append("members: ").append(members).append('\n');        return sb.toString();    }    /* ------------------------------------------------------------------------------- */    /*------------------------------ Protocol interface ------------------------------ */    public String getName() {        return name;    }    public void init() throws Exception {        if(use_packet_handler) {            packet_queue=new Queue();            packet_handler=new PacketHandler();        }    }    /**     * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads     */    public void start() throws Exception {         if(log.isInfoEnabled()) log.info("creating sockets and starting threads");        if(ct == null) {            ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this);            for(Iterator it=bind_addrs.iterator(); it.hasNext();) {                String bind_addr=(String)it.next();                ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size,                        ucast_send_buf_size, ip_ttl, this);            }            // add physical addresses to local_addr            List physical_addrs=ct.getConnectorAddresses(); // must be non-null and size() >= 1            for(Iterator it=physical_addrs.iterator(); it.hasNext();) {                SocketAddress address=(SocketAddress)it.next();                local_addr.addPhysicalAddress(address);            }            if(additional_data != null)                local_addr.setAdditionalData(additional_data);            ct.start();            passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));            if(use_packet_handler)            packet_handler.start();        }    }    public void stop() {         if(log.isInfoEnabled()) log.info("closing sockets and stopping threads");        if(packet_handler != null)            packet_handler.stop();        if(ct != null) {            ct.stop();            ct=null;        }        local_addr.removeAllPhysicalAddresses();    }    /**     * Setup the Protocol instance acording to the configuration string.     * The following properties are being read by the UDP protocol:     * <ul>     * <li> param mcast_addr - the multicast address to use default is 224.0.0.200     * <li> param mcast_port - (int) the port that the multicast is sent on default is 7500     * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true     * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32     * </ul>     * @return true if no other properties are left.     *         false if the properties still have data in them, ie ,     *         properties are left over and not handled by the protocol stack     */    public boolean setProperties(Properties props) {        String  str;        List    exclude_list=null;        String  mcast_addr_name="230.8.8.8";        int     mcast_port=7500;        super.setProperties(props);        str=props.getProperty("bind_addrs");        if(str != null) {            str=str.trim();            if("all".equals(str.toLowerCase())) {                try {                    bind_addrs=determineAllBindInterfaces();                }                catch(SocketException e) {                    e.printStackTrace();                    bind_addrs=null;                }            }            else {                bind_addrs=Util.parseCommaDelimitedStrings(str);            }            props.remove("bind_addrs");

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?