udp.java
来自「JGRoups源码」· Java 代码 · 共 980 行 · 第 1/3 页
JAVA
980 行
package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.Global;import org.jgroups.stack.IpAddress;import org.jgroups.util.BoundedList;import org.jgroups.util.Util;import java.io.IOException;import java.io.InterruptedIOException;import java.net.*;import java.util.*;/** * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will * be multicast (to all group members), whereas point-to-point messages * (msg.dest != null) will be unicast to a single member. Uses a multicast and * a unicast socket.<p> * The following properties are read by the UDP protocol: * <ul> * <li> param mcast_addr - the multicast address to use; default is 228.8.8.8. * <li> param mcast_port - (int) the port that the multicast is sent on; default is 7600 * <li> param ip_mcast - (boolean) flag whether to use IP multicast; default is true. * <li> param ip_ttl - the default time-to-live for multicast packets sent out on this * socket; default is 32. * <li> param use_packet_handler - boolean, defaults to false. * If set, the mcast and ucast receiver threads just put * the datagram's payload (a byte buffer) into a queue, from where a separate thread * will dequeue and handle them (unmarshal and pass up). This frees the receiver * threads from having to do message unmarshalling; this time can now be spent * receiving packets. If you have lots of retransmissions because of network * input buffer overflow, consider setting this property to true. * </ul> * @author Bela Ban * @version $Id: UDP.java,v 1.123 2006/10/23 05:52:47 belaban Exp $ */public class UDP extends TP implements Runnable { /** Socket used for * <ol> * <li>sending unicast packets and * <li>receiving unicast packets * </ol> * The address of this socket will be our local address (<tt>local_addr</tt>) */ DatagramSocket sock=null; /** * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket */ private static volatile BoundedList last_ports_used=null; /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off. * If bind_port is > 0, then this option will be ignored */ int num_last_ports=100; /** IP multicast socket for <em>receiving</em> multicast packets */ MulticastSocket mcast_recv_sock=null; /** IP multicast socket for <em>sending</em> multicast packets */ MulticastSocket mcast_send_sock=null; /** If we have multiple mcast send sockets, e.g. send_interfaces or send_on_all_interfaces enabled */ MulticastSocket[] mcast_send_sockets=null; /** * Traffic class for sending unicast and multicast datagrams. * Valid values are (check {@link DatagramSocket#setTrafficClass(int)} ); for details): * <UL> * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI> * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI> * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI> * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI> * </UL> */ int tos=8; // valid values: 2, 4, 8 (default), 16 /** The multicast address (mcast address and port) this member uses */ IpAddress mcast_addr=null; /** The multicast address used for sending and receiving packets */ String mcast_addr_name="228.8.8.8"; /** The multicast port used for sending and receiving packets */ int mcast_port=7600; /** The multicast receiver thread */ Thread mcast_receiver=null; /** The unicast receiver thread */ UcastReceiver ucast_receiver=null; /** Whether to enable IP multicasting. If false, multiple unicast datagram * packets are sent rather than one multicast packet */ boolean ip_mcast=true; /** The time-to-live (TTL) for multicast datagram packets */ int ip_ttl=64; /** Send buffer size of the multicast datagram socket */ int mcast_send_buf_size=32000; /** Receive buffer size of the multicast datagram socket */ int mcast_recv_buf_size=64000; /** Send buffer size of the unicast datagram socket */ int ucast_send_buf_size=32000; /** Receive buffer size of the unicast datagram socket */ int ucast_recv_buf_size=64000; /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However, * for multiple addresses on a Windows loopback device, this doesn't work * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same * value for all members of the same group. Default is true, for performance reasons */ // private boolean null_src_addresses=true; /** * Creates the UDP protocol, and initializes the * state variables, does however not start any sockets or threads. */ public UDP() { } /** * Setup the Protocol instance acording to the configuration string. * The following properties are read by the UDP protocol: * <ul> * <li> param mcast_addr - the multicast address to use default is 228.8.8.8 * <li> param mcast_port - (int) the port that the multicast is sent on default is 7600 * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32 * </ul> * @return true if no other properties are left. * false if the properties still have data in them, ie , * properties are left over and not handled by the protocol stack */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("num_last_ports"); if(str != null) { num_last_ports=Integer.parseInt(str); props.remove("num_last_ports"); } str=Util.getProperty(new String[]{Global.UDP_MCAST_ADDR, "jboss.partition.udpGroup"}, props, "mcast_addr", false, "228.8.8.8"); if(str != null) mcast_addr_name=str; str=Util.getProperty(new String[]{Global.UDP_MCAST_PORT, "jboss.partition.udpPort"}, props, "mcast_port", false, "7600"); if(str != null) mcast_port=Integer.parseInt(str); str=props.getProperty("ip_mcast"); if(str != null) { ip_mcast=Boolean.valueOf(str).booleanValue(); props.remove("ip_mcast"); } str=Util.getProperty(new String[]{Global.UDP_IP_TTL}, props, "ip_ttl", false, "64"); if(str != null) { ip_ttl=Integer.parseInt(str); props.remove("ip_ttl"); } str=props.getProperty("tos"); if(str != null) { tos=Integer.parseInt(str); props.remove("tos"); } str=props.getProperty("mcast_send_buf_size"); if(str != null) { mcast_send_buf_size=Integer.parseInt(str); props.remove("mcast_send_buf_size"); } str=props.getProperty("mcast_recv_buf_size"); if(str != null) { mcast_recv_buf_size=Integer.parseInt(str); props.remove("mcast_recv_buf_size"); } str=props.getProperty("ucast_send_buf_size"); if(str != null) { ucast_send_buf_size=Integer.parseInt(str); props.remove("ucast_send_buf_size"); } str=props.getProperty("ucast_recv_buf_size"); if(str != null) { ucast_recv_buf_size=Integer.parseInt(str); props.remove("ucast_recv_buf_size"); } str=props.getProperty("null_src_addresses"); if(str != null) { // null_src_addresses=Boolean.valueOf(str).booleanValue(); props.remove("null_src_addresses"); log.error("null_src_addresses has been deprecated, property will be ignored"); } if(props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } /* ----------------------- Receiving of MCAST UDP packets ------------------------ */ public void run() { DatagramPacket packet; byte receive_buf[]=new byte[65535]; int offset, len, sender_port; byte[] data; InetAddress sender_addr; Address sender; // moved out of loop to avoid excessive object creations (bela March 8 2001) packet=new DatagramPacket(receive_buf, receive_buf.length); while(mcast_receiver != null && mcast_recv_sock != null) { try { packet.setData(receive_buf, 0, receive_buf.length); mcast_recv_sock.receive(packet); sender_addr=packet.getAddress(); sender_port=packet.getPort(); offset=packet.getOffset(); len=packet.getLength(); data=packet.getData(); sender=new IpAddress(sender_addr, sender_port); if(len > receive_buf.length) { if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than " + "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " + "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length); } receive(mcast_addr, sender, data, offset, len); } catch(SocketException sock_ex) { if(trace) log.trace("multicast socket is closed, exception=" + sock_ex); break; } catch(InterruptedIOException io_ex) { // thread was interrupted } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("failure in multicast receive()", ex); Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !) } } if(log.isDebugEnabled()) log.debug("multicast thread terminated"); } public String getInfo() { StringBuffer sb=new StringBuffer(); sb.append("group_addr=").append(mcast_addr_name).append(':').append(mcast_port).append("\n"); return sb.toString(); } public void sendToAllMembers(byte[] data, int offset, int length) throws Exception { if(ip_mcast && mcast_addr != null) { _send(mcast_addr.getIpAddress(), mcast_addr.getPort(), true, data, offset, length); } else { ArrayList mbrs=new ArrayList(members); IpAddress mbr; for(Iterator it=mbrs.iterator(); it.hasNext();) { mbr=(IpAddress)it.next(); _send(mbr.getIpAddress(), mbr.getPort(), false, data, offset, length); } } } public void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception { _send(((IpAddress)dest).getIpAddress(), ((IpAddress)dest).getPort(), false, data, offset, length); } public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) { msg.setDest(dest); } public void postUnmarshallingList(Message msg, Address dest, boolean multicast) { msg.setDest(dest); } private void _send(InetAddress dest, int port, boolean mcast, byte[] data, int offset, int length) throws Exception { DatagramPacket packet=new DatagramPacket(data, offset, length, dest, port); try { if(mcast) { if(mcast_send_sock != null) { mcast_send_sock.send(packet); } else { if(mcast_send_sockets != null) { MulticastSocket s; for(int i=0; i < mcast_send_sockets.length; i++) { s=mcast_send_sockets[i]; try { s.send(packet); } catch(Exception e) { log.error("failed sending packet on socket " + s);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?