⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udp.java.txt

📁 JGRoups源码
💻 TXT
📖 第 1 页 / 共 5 页
字号:
// $Id: UDP.java.txt,v 1.1 2005/06/24 11:19:28 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.List;import org.jgroups.util.*;import org.jgroups.util.Queue;import java.io.*;import java.net.*;import java.util.*;/** * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will * be multicast (to all group members), whereas point-to-point messages * (msg.dest != null) will be unicast to a single member. Uses a multicast and * a unicast socket.<p> * The following properties are being read by the UDP protocol<p> * param mcast_addr - the multicast address to use default is 228.8.8.8<br> * param mcast_port - (int) the port that the multicast is sent on default is 7600<br> * param ip_mcast - (boolean) flag whether to use IP multicast - default is true<br> * param ip_ttl - Set the default time-to-live for multicast packets sent out on this * socket. default is 32<br> * param use_packet_handler - If set, the mcast and ucast receiver threads just put * the datagram's payload (a byte buffer) into a queue, from where a separate thread * will dequeue and handle them (unmarshal and pass up). This frees the receiver * threads from having to do message unmarshalling; this time can now be spent * receiving packets. If you have lots of retransmissions because of network * input buffer overflow, consider setting this property to true (default is false). * @author Bela Ban */public class UDP extends Protocol implements Runnable {    /** Socket used for     * <ol>     * <li>sending unicast packets and     * <li>receiving unicast packets     * </ol>     * The address of this socket will be our local address (<tt>local_addr</tt>) */    DatagramSocket  sock=null;    /**     * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket     */    private static BoundedList last_ports_used=null;    /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.     * If bind_port is null, then this options will be ignored */    int             num_last_ports=100;    /** IP multicast socket for <em>receiving</em> multicast packets */    MulticastSocket mcast_recv_sock=null;    /** IP multicast socket for <em>sending</em> multicast packets */    MulticastSocket mcast_send_sock=null;    /**     * Traffic class for sending unicast and multicast datagrams.     * Valid values are (check {@link #DatagramSocket.setTrafficClass(int)}  for details):     * <UL>     * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI>     * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI>     * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI>     * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI>     * </UL>     */    int             tos=0; // valid values: 2, 4, 8, 16    /** The address (host and port) of this member */    IpAddress       local_addr=null;    /** The name of the group to which this member is connected */    String          channel_name=null;    UdpHeader       udp_hdr=null;    /** The multicast address (mcast address and port) this member uses */    IpAddress       mcast_addr=null;    /** The interface (NIC) to which the unicast and multicast sockets bind */    InetAddress     bind_addr=null;    /** Bind the receiver multicast socket to all available interfaces (requires JDK 1.4) */    boolean         bind_to_all_interfaces=false;    /** The port to which the unicast receiver socket binds.     * 0 means to bind to any (ephemeral) port */    int             bind_port=0;	int				port_range=1; // 27-6-2003 bgooren, Only try one port by default    /** The multicast address used for sending and receiving packets */    String          mcast_addr_name="228.8.8.8";    /** The multicast port used for sending and receiving packets */    int             mcast_port=7600;    /** The multicast receiver thread */    Thread          mcast_receiver=null;    /** The unicast receiver thread */    UcastReceiver   ucast_receiver=null;    /** Whether to enable IP multicasting. If false, multiple unicast datagram     * packets are sent rather than one multicast packet */    boolean         ip_mcast=true;    /** The time-to-live (TTL) for multicast datagram packets */    int             ip_ttl=64;    /** The members of this group (updated when a member joins or leaves) */    final Vector    members=new Vector(11);    /** Pre-allocated byte stream. Used for serializing datagram packets. Will grow as needed */    final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024);    /** Send buffer size of the multicast datagram socket */    int             mcast_send_buf_size=32000;    /** Receive buffer size of the multicast datagram socket */    int             mcast_recv_buf_size=64000;    /** Send buffer size of the unicast datagram socket */    int             ucast_send_buf_size=32000;    /** Receive buffer size of the unicast datagram socket */    int             ucast_recv_buf_size=64000;    /** If true, messages sent to self are treated specially: unicast messages are     * looped back immediately, multicast messages get a local copy first and -     * when the real copy arrives - it will be discarded. Useful for Window     * media (non)sense */    boolean         loopback=true;    /** Discard packets with a different version. Usually minor version differences are okay. Setting this property     * to true means that we expect the exact same version on all incoming packets */    boolean         discard_incompatible_packets=false;    /** Sometimes receivers are overloaded (they have to handle de-serialization etc).     * Packet handler is a separate thread taking care of de-serialization, receiver     * thread(s) simply put packet in queue and return immediately. Setting this to     * true adds one more thread */    boolean         use_incoming_packet_handler=false;    /** Used by packet handler to store incoming DatagramPackets */    Queue           incoming_queue=null;    /** Dequeues DatagramPackets from packet_queue, unmarshalls them and     * calls <tt>handleIncomingUdpPacket()</tt> */    IncomingPacketHandler   incoming_packet_handler=null;    /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this     * value uses an additional thread */    boolean         use_outgoing_packet_handler=false;    /** Used by packet handler to store outgoing DatagramPackets */    Queue           outgoing_queue=null;    OutgoingPacketHandler outgoing_packet_handler=null;    /** If set it will be added to <tt>local_addr</tt>. Used to implement     * for example transport independent addresses */    byte[]          additional_data=null;    /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller        than the largest UDP datagram packet size */    int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic();    /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or     * max_bundle_timeout has been exceeded (whichever occurs faster)     */    long max_bundle_timeout=20;    /** Enabled bundling of smaller messages into bigger ones */    boolean enable_bundling=false;    /** Used by BundlingOutgoingPacketHandler */    TimeScheduler timer=null;    /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds     * an entry with key=S and value= sender's IP address and port.     */    HashMap addr_translation_table=new HashMap();    boolean use_addr_translation=false;    /** The name of this protocol */    static final String    name="UDP";    static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address";    /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However,     * for multiple addresses on a Windows loopback device, this doesn't work     * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same     * value for all members of the same group. Default is true, for performance reasons */    boolean null_src_addresses=true;    long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;    /**     * public constructor. creates the UDP protocol, and initializes the     * state variables, does however not start any sockets or threads     */    public UDP() {        ;    }    /**     * debug only     */    public String toString() {        return "UDP(local address: " + local_addr + ')';    }    public void resetStats() {        num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;    }    private BoundedList getLastPortsUsed() {        if(last_ports_used == null)            last_ports_used=new BoundedList(num_last_ports);        return last_ports_used;    }    public long getNumMessagesSent()     {return num_msgs_sent;}    public long getNumMessagesReceived() {return num_msgs_received;}    public long getNumBytesSent()        {return num_bytes_sent;}    public long getNumBytesReceived()    {return num_bytes_received;}    public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";}    public void setBindAddress(String bind_addr) throws UnknownHostException {        this.bind_addr=InetAddress.getByName(bind_addr);    }    public boolean getBindToAllInterfaces() {return bind_to_all_interfaces;}    public void setBindToAllInterfaces(boolean flag) {this.bind_to_all_interfaces=flag;}    public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;}    public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;}    public boolean isEnableBundling() {return enable_bundling;}    public void setEnableBundling(boolean flag) {enable_bundling=flag;}    public int getMaxBundleSize() {return max_bundle_size;}    public void setMaxBundleSize(int size) {max_bundle_size=size;}    public long getMaxBundleTimeout() {return max_bundle_timeout;}    public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;}    /* ----------------------- Receiving of MCAST UDP packets ------------------------ */    public void run() {        DatagramPacket  packet;        byte            receive_buf[]=new byte[65535];        int             len, sender_port;        byte[]          tmp, data;        InetAddress     sender_addr;        // moved out of loop to avoid excessive object creations (bela March 8 2001)        packet=new DatagramPacket(receive_buf, receive_buf.length);        while(mcast_receiver != null && mcast_recv_sock != null) {            try {                packet.setData(receive_buf, 0, receive_buf.length);                mcast_recv_sock.receive(packet);                sender_addr=packet.getAddress();                sender_port=packet.getPort();                len=packet.getLength();                data=packet.getData();                if(len == 4) {  // received a diagnostics probe                    if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') {                        handleDiagnosticProbe(sender_addr, sender_port);                        continue;                    }                }                if(log.isTraceEnabled()){                    StringBuffer sb=new StringBuffer("received (mcast) ");                    sb.append(len).append(" bytes from ").append(sender_addr).append(':');                    sb.append(sender_port).append(" (size=").append(len).append(" bytes)");                    log.trace(sb.toString());                }                if(len > receive_buf.length) {                    if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " +                                             "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +                                             "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);                }                if(use_incoming_packet_handler) {                    tmp=new byte[len];                    System.arraycopy(data, 0, tmp, 0, len);                    incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp));                }                else                    handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data);            }            catch(SocketException sock_ex) {                 if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex);                break;            }            catch(InterruptedIOException io_ex) { // thread was interrupted                ; // go back to top of loop, where we will terminate loop            }            catch(Throwable ex) {                if(log.isErrorEnabled())                    log.error("failure in multicast receive()", ex);                Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)            }        }        if(log.isDebugEnabled()) log.debug("multicast thread terminated");    }    private void handleDiagnosticProbe(InetAddress sender, int port) {        try {            byte[] diag_rsp=getDiagResponse().getBytes();            DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port);            if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port);            sock.send(rsp);        }        catch(Throwable t) {            if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port +                                                       ", exception=" + t);        }    }    private String getDiagResponse() {        StringBuffer sb=new StringBuffer();        sb.append(local_addr).append(" (").append(channel_name).append(')');        sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n");        sb.append("Version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n");        sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n');        sb.append("members: ").append(members).append('\n');        return sb.toString();    }    /* ------------------------------------------------------------------------------- */    /*------------------------------ Protocol interface ------------------------------ */    public String getName() {        return name;    }    public void init() throws Exception {        if(use_incoming_packet_handler) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -