📄 udp.java.txt
字号:
// $Id: UDP.java.txt,v 1.1 2005/06/24 11:19:28 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.List;import org.jgroups.util.*;import org.jgroups.util.Queue;import java.io.*;import java.net.*;import java.util.*;/** * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will * be multicast (to all group members), whereas point-to-point messages * (msg.dest != null) will be unicast to a single member. Uses a multicast and * a unicast socket.<p> * The following properties are being read by the UDP protocol<p> * param mcast_addr - the multicast address to use default is 228.8.8.8<br> * param mcast_port - (int) the port that the multicast is sent on default is 7600<br> * param ip_mcast - (boolean) flag whether to use IP multicast - default is true<br> * param ip_ttl - Set the default time-to-live for multicast packets sent out on this * socket. default is 32<br> * param use_packet_handler - If set, the mcast and ucast receiver threads just put * the datagram's payload (a byte buffer) into a queue, from where a separate thread * will dequeue and handle them (unmarshal and pass up). This frees the receiver * threads from having to do message unmarshalling; this time can now be spent * receiving packets. If you have lots of retransmissions because of network * input buffer overflow, consider setting this property to true (default is false). * @author Bela Ban */public class UDP extends Protocol implements Runnable { /** Socket used for * <ol> * <li>sending unicast packets and * <li>receiving unicast packets * </ol> * The address of this socket will be our local address (<tt>local_addr</tt>) */ DatagramSocket sock=null; /** * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket */ private static BoundedList last_ports_used=null; /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off. * If bind_port is null, then this options will be ignored */ int num_last_ports=100; /** IP multicast socket for <em>receiving</em> multicast packets */ MulticastSocket mcast_recv_sock=null; /** IP multicast socket for <em>sending</em> multicast packets */ MulticastSocket mcast_send_sock=null; /** * Traffic class for sending unicast and multicast datagrams. * Valid values are (check {@link #DatagramSocket.setTrafficClass(int)} for details): * <UL> * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI> * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI> * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI> * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI> * </UL> */ int tos=0; // valid values: 2, 4, 8, 16 /** The address (host and port) of this member */ IpAddress local_addr=null; /** The name of the group to which this member is connected */ String channel_name=null; UdpHeader udp_hdr=null; /** The multicast address (mcast address and port) this member uses */ IpAddress mcast_addr=null; /** The interface (NIC) to which the unicast and multicast sockets bind */ InetAddress bind_addr=null; /** Bind the receiver multicast socket to all available interfaces (requires JDK 1.4) */ boolean bind_to_all_interfaces=false; /** The port to which the unicast receiver socket binds. * 0 means to bind to any (ephemeral) port */ int bind_port=0; int port_range=1; // 27-6-2003 bgooren, Only try one port by default /** The multicast address used for sending and receiving packets */ String mcast_addr_name="228.8.8.8"; /** The multicast port used for sending and receiving packets */ int mcast_port=7600; /** The multicast receiver thread */ Thread mcast_receiver=null; /** The unicast receiver thread */ UcastReceiver ucast_receiver=null; /** Whether to enable IP multicasting. If false, multiple unicast datagram * packets are sent rather than one multicast packet */ boolean ip_mcast=true; /** The time-to-live (TTL) for multicast datagram packets */ int ip_ttl=64; /** The members of this group (updated when a member joins or leaves) */ final Vector members=new Vector(11); /** Pre-allocated byte stream. Used for serializing datagram packets. Will grow as needed */ final ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(1024); /** Send buffer size of the multicast datagram socket */ int mcast_send_buf_size=32000; /** Receive buffer size of the multicast datagram socket */ int mcast_recv_buf_size=64000; /** Send buffer size of the unicast datagram socket */ int ucast_send_buf_size=32000; /** Receive buffer size of the unicast datagram socket */ int ucast_recv_buf_size=64000; /** If true, messages sent to self are treated specially: unicast messages are * looped back immediately, multicast messages get a local copy first and - * when the real copy arrives - it will be discarded. Useful for Window * media (non)sense */ boolean loopback=true; /** Discard packets with a different version. Usually minor version differences are okay. Setting this property * to true means that we expect the exact same version on all incoming packets */ boolean discard_incompatible_packets=false; /** Sometimes receivers are overloaded (they have to handle de-serialization etc). * Packet handler is a separate thread taking care of de-serialization, receiver * thread(s) simply put packet in queue and return immediately. Setting this to * true adds one more thread */ boolean use_incoming_packet_handler=false; /** Used by packet handler to store incoming DatagramPackets */ Queue incoming_queue=null; /** Dequeues DatagramPackets from packet_queue, unmarshalls them and * calls <tt>handleIncomingUdpPacket()</tt> */ IncomingPacketHandler incoming_packet_handler=null; /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this * value uses an additional thread */ boolean use_outgoing_packet_handler=false; /** Used by packet handler to store outgoing DatagramPackets */ Queue outgoing_queue=null; OutgoingPacketHandler outgoing_packet_handler=null; /** If set it will be added to <tt>local_addr</tt>. Used to implement * for example transport independent addresses */ byte[] additional_data=null; /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller than the largest UDP datagram packet size */ int max_bundle_size=AUTOCONF.senseMaxFragSizeStatic(); /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or * max_bundle_timeout has been exceeded (whichever occurs faster) */ long max_bundle_timeout=20; /** Enabled bundling of smaller messages into bigger ones */ boolean enable_bundling=false; /** Used by BundlingOutgoingPacketHandler */ TimeScheduler timer=null; /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds * an entry with key=S and value= sender's IP address and port. */ HashMap addr_translation_table=new HashMap(); boolean use_addr_translation=false; /** The name of this protocol */ static final String name="UDP"; static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address"; /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However, * for multiple addresses on a Windows loopback device, this doesn't work * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same * value for all members of the same group. Default is true, for performance reasons */ boolean null_src_addresses=true; long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0; /** * public constructor. creates the UDP protocol, and initializes the * state variables, does however not start any sockets or threads */ public UDP() { ; } /** * debug only */ public String toString() { return "UDP(local address: " + local_addr + ')'; } public void resetStats() { num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0; } private BoundedList getLastPortsUsed() { if(last_ports_used == null) last_ports_used=new BoundedList(num_last_ports); return last_ports_used; } public long getNumMessagesSent() {return num_msgs_sent;} public long getNumMessagesReceived() {return num_msgs_received;} public long getNumBytesSent() {return num_bytes_sent;} public long getNumBytesReceived() {return num_bytes_received;} public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";} public void setBindAddress(String bind_addr) throws UnknownHostException { this.bind_addr=InetAddress.getByName(bind_addr); } public boolean getBindToAllInterfaces() {return bind_to_all_interfaces;} public void setBindToAllInterfaces(boolean flag) {this.bind_to_all_interfaces=flag;} public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;} public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;} public boolean isEnableBundling() {return enable_bundling;} public void setEnableBundling(boolean flag) {enable_bundling=flag;} public int getMaxBundleSize() {return max_bundle_size;} public void setMaxBundleSize(int size) {max_bundle_size=size;} public long getMaxBundleTimeout() {return max_bundle_timeout;} public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;} /* ----------------------- Receiving of MCAST UDP packets ------------------------ */ public void run() { DatagramPacket packet; byte receive_buf[]=new byte[65535]; int len, sender_port; byte[] tmp, data; InetAddress sender_addr; // moved out of loop to avoid excessive object creations (bela March 8 2001) packet=new DatagramPacket(receive_buf, receive_buf.length); while(mcast_receiver != null && mcast_recv_sock != null) { try { packet.setData(receive_buf, 0, receive_buf.length); mcast_recv_sock.receive(packet); sender_addr=packet.getAddress(); sender_port=packet.getPort(); len=packet.getLength(); data=packet.getData(); if(len == 4) { // received a diagnostics probe if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') { handleDiagnosticProbe(sender_addr, sender_port); continue; } } if(log.isTraceEnabled()){ StringBuffer sb=new StringBuffer("received (mcast) "); sb.append(len).append(" bytes from ").append(sender_addr).append(':'); sb.append(sender_port).append(" (size=").append(len).append(" bytes)"); log.trace(sb.toString()); } if(len > receive_buf.length) { if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " + "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " + "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length); } if(use_incoming_packet_handler) { tmp=new byte[len]; System.arraycopy(data, 0, tmp, 0, len); incoming_queue.add(new IncomingQueueEntry(mcast_addr, sender_addr, sender_port, tmp)); } else handleIncomingUdpPacket(mcast_addr, sender_addr, sender_port, data); } catch(SocketException sock_ex) { if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex); break; } catch(InterruptedIOException io_ex) { // thread was interrupted ; // go back to top of loop, where we will terminate loop } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("failure in multicast receive()", ex); Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !) } } if(log.isDebugEnabled()) log.debug("multicast thread terminated"); } private void handleDiagnosticProbe(InetAddress sender, int port) { try { byte[] diag_rsp=getDiagResponse().getBytes(); DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender, port); if(log.isDebugEnabled()) log.debug("sending diag response to " + sender + ':' + port); sock.send(rsp); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ':' + port + ", exception=" + t); } } private String getDiagResponse() { StringBuffer sb=new StringBuffer(); sb.append(local_addr).append(" (").append(channel_name).append(')'); sb.append(" [").append(mcast_addr_name).append(':').append(mcast_port).append("]\n"); sb.append("Version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n"); sb.append("bound to ").append(bind_addr).append(':').append(bind_port).append('\n'); sb.append("members: ").append(members).append('\n'); return sb.toString(); } /* ------------------------------------------------------------------------------- */ /*------------------------------ Protocol interface ------------------------------ */ public String getName() { return name; } public void init() throws Exception { if(use_incoming_packet_handler) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -