udp_nio.java.txt
来自「JGRoups源码」· 文本 代码 · 共 1,513 行 · 第 1/4 页
TXT
1,513 行
package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.LogicalAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import org.jgroups.util.Util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.*;import java.net.*;import java.util.*;/** * Multicast transport. Similar to UDP, but binds to multiple (or all) interfaces for sending and receiving * multicast and unicast traffic.<br/> * The list of interfaces can be set via a property (comma-delimited list of IP addresses or "all" for all * interfaces). Note that this class only works under JDK 1.4 and higher.<br/> * For each of the interfaces listed we create a Listener, which listens on the group multicast address and creates * a unicast datagram socket. The address of this member is determined at startup time, and is the host name plus * a timestamp (LogicalAddress). It does not change during the lifetime of the process. The LogicalAddress contains * a list of all unicast socket addresses to which we can send back unicast messages. When we send a message, the * Listener adds the sender's return address. When we receive a message, we add that address to our routing cache, which * contains logical addresses and physical addresses. When we need to send a unicast address, we first check whether * the logical address has a physical address associated with it in the cache. If so, we send a message to that address. * If not, we send the unicast message to <em>all</em> physical addresses contained in the LogicalAddress.<br/> * UDP_NIO guarantees that - in scenarios with multiple subnets and multi-homed machines - members do see each other. * There is some overhead in multicasting the same message on multiple interfaces, and potentially sending a unicast * on multiple interfaces as well, but the advantage is that we don't need stuff like bind_addr any longer. Plus, * the unicast routing caches should ensure that unicasts are only sent via 1 interface in almost all cases. * * @author Bela Ban Oct 2003 * @version $Id: UDP_NIO.java.txt,v 1.1 2006/05/02 08:56:59 belaban Exp $ * @deprecated Use UDP instead with send_interfaces or receive_interfaces properties defined */public class UDP_NIO extends Protocol implements Receiver { static final String name="UDP_NIO"; /** Maintains a list of Connectors, one for each interface we're listening on */ ConnectorTable ct=null; /** A List<String> of bind addresses, we create 1 Connector for each interface */ List bind_addrs=null; /** The name of the group to which this member is connected */ String group_name=null; /** The multicast address (mcast address and port) this member uses (default: 230.1.2.3:7500) */ InetSocketAddress mcast_addr=null; /** The address of this member. Valid for the lifetime of the JVM in which this member runs */ LogicalAddress local_addr=new LogicalAddress(null, null); /** Logical address without list of physical addresses */ LogicalAddress local_addr_canonical=local_addr.copy(); /** Pre-allocated byte stream. Used for serializing datagram packets */ ByteArrayOutputStream out_stream=new ByteArrayOutputStream(65535); /** * The port to which the unicast receiver socket binds. * 0 means to bind to any (ephemeral) port */ int local_bind_port=0; int port_range=1; // 27-6-2003 bgooren, Only try one port by default /** * Whether to enable IP multicasting. If false, multiple unicast datagram * packets are sent rather than one multicast packet */ boolean ip_mcast=true; /** The time-to-live (TTL) for multicast datagram packets */ int ip_ttl=32; /** The members of this group (updated when a member joins or leaves) */ Vector members=new Vector(); /** * Header to be added to all messages sent via this protocol. It is * preallocated for efficiency */ UdpHeader udp_hdr=null; /** Send buffer size of the multicast datagram socket */ int mcast_send_buf_size=300000; /** Receive buffer size of the multicast datagram socket */ int mcast_recv_buf_size=300000; /** Send buffer size of the unicast datagram socket */ int ucast_send_buf_size=300000; /** Receive buffer size of the unicast datagram socket */ int ucast_recv_buf_size=300000; /** * If true, messages sent to self are treated specially: unicast messages are * looped back immediately, multicast messages get a local copy first and - * when the real copy arrives - it will be discarded. Useful for Window * media (non)sense * @deprecated This is used by default now */ boolean loopback=true; //todo: remove /** * Sometimes receivers are overloaded (they have to handle de-serialization etc). * Packet handler is a separate thread taking care of de-serialization, receiver * thread(s) simply put packet in queue and return immediately. Setting this to * true adds one more thread */ boolean use_packet_handler=false; /** Used by packet handler to store incoming DatagramPackets */ Queue packet_queue=null; /** * If set it will be added to <tt>local_addr</tt>. Used to implement * for example transport independent addresses */ byte[] additional_data=null; /** * Dequeues DatagramPackets from packet_queue, unmarshalls them and * calls <tt>handleIncomingUdpPacket()</tt> */ PacketHandler packet_handler=null; /** Number of bytes to allocate to receive a packet. Needs to be set to be higher than frag_size * (handle CONFIG event) */ static final int DEFAULT_RECEIVE_BUFFER_SIZE=120000; // todo: make settable and/or use CONFIG event /** * Creates the UDP_NIO protocol, and initializes the * state variables, does however not start any sockets or threads. */ public UDP_NIO() { } /** * debug only */ public String toString() { return "Protocol UDP(local address: " + local_addr + ')'; } public void receive(DatagramPacket packet) { int len=packet.getLength(); byte[] data=packet.getData(); SocketAddress sender=packet.getSocketAddress(); if(len == 4) { // received a diagnostics probe if(data[0] == 'd' && data[1] == 'i' && data[2] == 'a' && data[3] == 'g') { handleDiagnosticProbe(sender); return; } } if(trace) log.trace("received " + len + " bytes from " + sender); if(use_packet_handler && packet_queue != null) { byte[] tmp=new byte[len]; System.arraycopy(data, 0, tmp, 0, len); try { Object[] arr=new Object[]{tmp, sender}; packet_queue.add(arr); return; } catch(QueueClosedException e) { if(warn) log.warn("packet queue for packet handler thread is closed"); // pass through to handleIncomingPacket() } } handleIncomingUdpPacket(data, sender); } /* ----------------------- Receiving of MCAST UDP packets ------------------------ */// public void run() {// DatagramPacket packet;// byte receive_buf[]=new byte[65000];// int len;// byte[] tmp1, tmp2;//// // moved out of loop to avoid excessive object creations (bela March 8 2001)// packet=new DatagramPacket(receive_buf, receive_buf.length);//// while(mcast_receiver != null && mcast_sock != null) {// try {// packet.setData(receive_buf, 0, receive_buf.length);// mcast_sock.receive(packet);// len=packet.getLength();// if(len == 1 && packet.getData()[0] == 0) {// if(trace) if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "received dummy packet");// continue;// }//// if(len == 4) { // received a diagnostics probe// byte[] tmp=packet.getData();// if(tmp[0] == 'd' && tmp[1] == 'i' && tmp[2] == 'a' && tmp[3] == 'g') {// handleDiagnosticProbe(null, null);// continue;// }// }//// if(trace)// if(log.isInfoEnabled()) log.info("UDP_NIO.receive()", "received (mcast) " + packet.getLength() + " bytes from " +// packet.getAddress() + ":" + packet.getPort() + " (size=" + len + " bytes)");// if(len > receive_buf.length) {// if(log.isErrorEnabled()) log.error("UDP_NIO.run()", "size of the received packet (" + len + ") is bigger than " +// "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +// "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length);// }//// if(Version.compareTo(packet.getData()) == false) {// if(warn) log.warn("UDP_NIO.run()",// "packet from " + packet.getAddress() + ":" + packet.getPort() +// " has different version (" +// Version.printVersionId(packet.getData(), Version.version_id.length) +// ") from ours (" + Version.printVersionId(Version.version_id) +// "). This may cause problems");// }//// if(use_packet_handler) {// tmp1=packet.getData();// tmp2=new byte[len];// System.arraycopy(tmp1, 0, tmp2, 0, len);// packet_queue.add(tmp2);// } else// handleIncomingUdpPacket(packet.getData());// } catch(SocketException sock_ex) {// if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "multicast socket is closed, exception=" + sock_ex);// break;// } catch(InterruptedIOException io_ex) { // thread was interrupted// ; // go back to top of loop, where we will terminate loop// } catch(Throwable ex) {// if(log.isErrorEnabled()) log.error("UDP_NIO.run()", "exception=" + ex + ", stack trace=" + Util.printStackTrace(ex));// Util.sleep(1000); // so we don't get into 100% cpu spinning (should NEVER happen !)// }// }// if(log.isInfoEnabled()) log.info("UDP_NIO.run()", "multicast thread terminated");// } void handleDiagnosticProbe(SocketAddress sender) { try { byte[] diag_rsp=getDiagResponse().getBytes(); DatagramPacket rsp=new DatagramPacket(diag_rsp, 0, diag_rsp.length, sender); if(log.isInfoEnabled()) log.info("sending diag response to " + sender); ct.send(rsp); } catch(Throwable t) { if(log.isErrorEnabled()) log.error("failed sending diag rsp to " + sender + ", exception=" + t); } } String getDiagResponse() { StringBuffer sb=new StringBuffer(); sb.append(local_addr).append(" (").append(group_name).append(')'); sb.append(" [").append(mcast_addr).append("]\n"); sb.append("Version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n"); sb.append("physical addresses: ").append(local_addr.getPhysicalAddresses()).append('\n'); sb.append("members: ").append(members).append('\n'); return sb.toString(); } /* ------------------------------------------------------------------------------- */ /*------------------------------ Protocol interface ------------------------------ */ public String getName() { return name; } public void init() throws Exception { if(use_packet_handler) { packet_queue=new Queue(); packet_handler=new PacketHandler(); } } /** * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads */ public void start() throws Exception { if(log.isInfoEnabled()) log.info("creating sockets and starting threads"); if(ct == null) { ct=new ConnectorTable(mcast_addr, DEFAULT_RECEIVE_BUFFER_SIZE, mcast_recv_buf_size, ip_mcast, this); for(Iterator it=bind_addrs.iterator(); it.hasNext();) { String bind_addr=(String)it.next(); ct.listenOn(bind_addr, local_bind_port, port_range, DEFAULT_RECEIVE_BUFFER_SIZE, ucast_recv_buf_size, ucast_send_buf_size, ip_ttl, this); } // add physical addresses to local_addr List physical_addrs=ct.getConnectorAddresses(); // must be non-null and size() >= 1 for(Iterator it=physical_addrs.iterator(); it.hasNext();) { SocketAddress address=(SocketAddress)it.next(); local_addr.addPhysicalAddress(address); } if(additional_data != null) local_addr.setAdditionalData(additional_data); ct.start(); passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); if(use_packet_handler) packet_handler.start(); } } public void stop() { if(log.isInfoEnabled()) log.info("closing sockets and stopping threads"); if(packet_handler != null) packet_handler.stop(); if(ct != null) { ct.stop(); ct=null; } local_addr.removeAllPhysicalAddresses(); } /** * Setup the Protocol instance acording to the configuration string. * The following properties are being read by the UDP protocol: * <ul> * <li> param mcast_addr - the multicast address to use default is 224.0.0.200 * <li> param mcast_port - (int) the port that the multicast is sent on default is 7500 * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32 * </ul> * @return true if no other properties are left. * false if the properties still have data in them, ie , * properties are left over and not handled by the protocol stack */ public boolean setProperties(Properties props) { String str; List exclude_list=null; String mcast_addr_name="230.8.8.8"; int mcast_port=7500; super.setProperties(props); str=props.getProperty("bind_addrs"); if(str != null) { str=str.trim(); if("all".equals(str.toLowerCase())) { try { bind_addrs=determineAllBindInterfaces(); } catch(SocketException e) { e.printStackTrace(); bind_addrs=null; } } else { bind_addrs=Util.parseCommaDelimitedStrings(str); } props.remove("bind_addrs");
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?