udp.java

来自「JGRoups源码」· Java 代码 · 共 980 行 · 第 1/3 页

JAVA
980
字号
package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.Global;import org.jgroups.stack.IpAddress;import org.jgroups.util.BoundedList;import org.jgroups.util.Util;import java.io.IOException;import java.io.InterruptedIOException;import java.net.*;import java.util.*;/** * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will * be multicast (to all group members), whereas point-to-point messages * (msg.dest != null) will be unicast to a single member. Uses a multicast and * a unicast socket.<p> * The following properties are read by the UDP protocol: * <ul> * <li> param mcast_addr - the multicast address to use; default is 228.8.8.8. * <li> param mcast_port - (int) the port that the multicast is sent on; default is 7600 * <li> param ip_mcast - (boolean) flag whether to use IP multicast; default is true. * <li> param ip_ttl - the default time-to-live for multicast packets sent out on this * socket; default is 32. * <li> param use_packet_handler - boolean, defaults to false. * If set, the mcast and ucast receiver threads just put * the datagram's payload (a byte buffer) into a queue, from where a separate thread * will dequeue and handle them (unmarshal and pass up). This frees the receiver * threads from having to do message unmarshalling; this time can now be spent * receiving packets. If you have lots of retransmissions because of network * input buffer overflow, consider setting this property to true. * </ul> * @author Bela Ban * @version $Id: UDP.java,v 1.123 2006/10/23 05:52:47 belaban Exp $ */public class UDP extends TP implements Runnable {    /** Socket used for     * <ol>     * <li>sending unicast packets and     * <li>receiving unicast packets     * </ol>     * The address of this socket will be our local address (<tt>local_addr</tt>) */    DatagramSocket  sock=null;    /**     * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket     */    private static volatile BoundedList last_ports_used=null;    /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.     * If bind_port is > 0, then this option will be ignored */    int             num_last_ports=100;    /** IP multicast socket for <em>receiving</em> multicast packets */    MulticastSocket mcast_recv_sock=null;    /** IP multicast socket for <em>sending</em> multicast packets */    MulticastSocket mcast_send_sock=null;    /** If we have multiple mcast send sockets, e.g. send_interfaces or send_on_all_interfaces enabled */    MulticastSocket[] mcast_send_sockets=null;    /**     * Traffic class for sending unicast and multicast datagrams.     * Valid values are (check {@link DatagramSocket#setTrafficClass(int)} );  for details):     * <UL>     * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI>     * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI>     * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI>     * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI>     * </UL>     */    int             tos=8; // valid values: 2, 4, 8 (default), 16    /** The multicast address (mcast address and port) this member uses */    IpAddress       mcast_addr=null;    /** The multicast address used for sending and receiving packets */    String          mcast_addr_name="228.8.8.8";    /** The multicast port used for sending and receiving packets */    int             mcast_port=7600;    /** The multicast receiver thread */    Thread          mcast_receiver=null;    /** The unicast receiver thread */    UcastReceiver   ucast_receiver=null;    /** Whether to enable IP multicasting. If false, multiple unicast datagram     * packets are sent rather than one multicast packet */    boolean         ip_mcast=true;    /** The time-to-live (TTL) for multicast datagram packets */    int             ip_ttl=64;    /** Send buffer size of the multicast datagram socket */    int             mcast_send_buf_size=32000;    /** Receive buffer size of the multicast datagram socket */    int             mcast_recv_buf_size=64000;    /** Send buffer size of the unicast datagram socket */    int             ucast_send_buf_size=32000;    /** Receive buffer size of the unicast datagram socket */    int             ucast_recv_buf_size=64000;    /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However,     * for multiple addresses on a Windows loopback device, this doesn't work     * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same     * value for all members of the same group. Default is true, for performance reasons */    // private boolean null_src_addresses=true;    /**     * Creates the UDP protocol, and initializes the     * state variables, does however not start any sockets or threads.     */    public UDP() {    }    /**     * Setup the Protocol instance acording to the configuration string.     * The following properties are read by the UDP protocol:     * <ul>     * <li> param mcast_addr - the multicast address to use default is 228.8.8.8     * <li> param mcast_port - (int) the port that the multicast is sent on default is 7600     * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true     * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32     * </ul>     * @return true if no other properties are left.     *         false if the properties still have data in them, ie ,     *         properties are left over and not handled by the protocol stack     */    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("num_last_ports");        if(str != null) {            num_last_ports=Integer.parseInt(str);            props.remove("num_last_ports");        }        str=Util.getProperty(new String[]{Global.UDP_MCAST_ADDR, "jboss.partition.udpGroup"}, props,                             "mcast_addr", false, "228.8.8.8");        if(str != null)            mcast_addr_name=str;        str=Util.getProperty(new String[]{Global.UDP_MCAST_PORT, "jboss.partition.udpPort"},                             props, "mcast_port", false, "7600");        if(str != null)            mcast_port=Integer.parseInt(str);        str=props.getProperty("ip_mcast");        if(str != null) {            ip_mcast=Boolean.valueOf(str).booleanValue();            props.remove("ip_mcast");        }        str=Util.getProperty(new String[]{Global.UDP_IP_TTL}, props, "ip_ttl", false, "64");        if(str != null) {            ip_ttl=Integer.parseInt(str);            props.remove("ip_ttl");        }        str=props.getProperty("tos");        if(str != null) {            tos=Integer.parseInt(str);            props.remove("tos");        }        str=props.getProperty("mcast_send_buf_size");        if(str != null) {            mcast_send_buf_size=Integer.parseInt(str);            props.remove("mcast_send_buf_size");        }        str=props.getProperty("mcast_recv_buf_size");        if(str != null) {            mcast_recv_buf_size=Integer.parseInt(str);            props.remove("mcast_recv_buf_size");        }        str=props.getProperty("ucast_send_buf_size");        if(str != null) {            ucast_send_buf_size=Integer.parseInt(str);            props.remove("ucast_send_buf_size");        }        str=props.getProperty("ucast_recv_buf_size");        if(str != null) {            ucast_recv_buf_size=Integer.parseInt(str);            props.remove("ucast_recv_buf_size");        }        str=props.getProperty("null_src_addresses");        if(str != null) {            // null_src_addresses=Boolean.valueOf(str).booleanValue();            props.remove("null_src_addresses");            log.error("null_src_addresses has been deprecated, property will be ignored");        }        if(props.size() > 0) {            log.error("the following properties are not recognized: " + props);            return false;        }        return true;    }    /* ----------------------- Receiving of MCAST UDP packets ------------------------ */    public void run() {        DatagramPacket  packet;        byte            receive_buf[]=new byte[65535];        int             offset, len, sender_port;        byte[]          data;        InetAddress     sender_addr;        Address         sender;        // moved out of loop to avoid excessive object creations (bela March 8 2001)        packet=new DatagramPacket(receive_buf, receive_buf.length);        while(mcast_receiver != null && mcast_recv_sock != null) {            try {                packet.setData(receive_buf, 0, receive_buf.length);                mcast_recv_sock.receive(packet);                sender_addr=packet.getAddress();                sender_port=packet.getPort();                offset=packet.getOffset();                len=packet.getLength();                data=packet.getData();                sender=new IpAddress(sender_addr, sender_port);                if(len > receive_buf.length) {                    if(log.isErrorEnabled())                        log.error("size of the received packet (" + len + ") is bigger than " +                                  "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +                                  "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);                }                receive(mcast_addr, sender, data, offset, len);            }            catch(SocketException sock_ex) {                 if(trace) log.trace("multicast socket is closed, exception=" + sock_ex);                break;            }            catch(InterruptedIOException io_ex) { // thread was interrupted            }            catch(Throwable ex) {                if(log.isErrorEnabled())                    log.error("failure in multicast receive()", ex);                Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)            }        }        if(log.isDebugEnabled()) log.debug("multicast thread terminated");    }    public String getInfo() {        StringBuffer sb=new StringBuffer();        sb.append("group_addr=").append(mcast_addr_name).append(':').append(mcast_port).append("\n");        return sb.toString();    }    public void sendToAllMembers(byte[] data, int offset, int length) throws Exception {        if(ip_mcast && mcast_addr != null) {            _send(mcast_addr.getIpAddress(), mcast_addr.getPort(), true, data, offset, length);        }        else {            ArrayList mbrs=new ArrayList(members);            IpAddress mbr;            for(Iterator it=mbrs.iterator(); it.hasNext();) {                mbr=(IpAddress)it.next();                _send(mbr.getIpAddress(), mbr.getPort(), false, data, offset, length);            }        }    }    public void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception {        _send(((IpAddress)dest).getIpAddress(), ((IpAddress)dest).getPort(), false, data, offset, length);    }    public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) {        msg.setDest(dest);    }    public void postUnmarshallingList(Message msg, Address dest, boolean multicast) {        msg.setDest(dest);    }    private void _send(InetAddress dest, int port, boolean mcast, byte[] data, int offset, int length) throws Exception {        DatagramPacket packet=new DatagramPacket(data, offset, length, dest, port);        try {            if(mcast) {                if(mcast_send_sock != null) {                    mcast_send_sock.send(packet);                }                else {                    if(mcast_send_sockets != null) {                        MulticastSocket s;                        for(int i=0; i < mcast_send_sockets.length; i++) {                            s=mcast_send_sockets[i];                            try {                                s.send(packet);                            }                            catch(Exception e) {                                log.error("failed sending packet on socket " + s);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?