📄 udp.java.txt
字号:
finally { Util.closeOutputStream(out); } } private List bufferToList(DataInputStream instream, IpAddress dest) throws Exception { List l=new List(); DataInputStream in=null; int len; Message msg; Address src; try { len=instream.readInt(); src=Util.readAddress(instream); for(int i=0; i < len; i++) { msg=new Message(); msg.readFrom(instream); msg.setDest(dest); msg.setSrc(src); l.add(msg); } return l; } finally { Util.closeInputStream(in); } } /** * Create UDP sender and receiver sockets. Currently there are 2 sockets * (sending and receiving). This is due to Linux's non-BSD compatibility * in the JDK port (see DESIGN). */ void createSockets() throws Exception { InetAddress tmp_addr=null; // bind_addr not set, try to assign one by default. This is needed on Windows // changed by bela Feb 12 2003: by default multicast sockets will be bound to all network interfaces // CHANGED *BACK* by bela March 13 2003: binding to all interfaces did not result in a correct // local_addr. As a matter of fact, comparison between e.g. 0.0.0.0:1234 (on hostA) and // 0.0.0.0:1.2.3.4 (on hostB) would fail ! if(bind_addr == null) { InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()); if(interfaces != null && interfaces.length > 0) bind_addr=interfaces[0]; } if(bind_addr == null) bind_addr=InetAddress.getLocalHost(); if(bind_addr != null) if(log.isInfoEnabled()) log.info("sockets will use interface " + bind_addr.getHostAddress()); // 2. Create socket for receiving unicast UDP packets. The address and port // of this socket will be our local address (local_addr) if(bind_port > 0) { sock=createDatagramSocketWithBindPort(); } else { sock=createEphemeralDatagramSocket(); } if(tos > 0) { try { sock.setTrafficClass(tos); } catch(SocketException e) { log.warn("traffic class of " + tos + " could not be set, will be ignored", e); } } if(sock == null) throw new Exception("UDP.createSocket(): sock is null"); local_addr=new IpAddress(sock.getLocalAddress(), sock.getLocalPort()); if(additional_data != null) local_addr.setAdditionalData(additional_data); // 3. Create socket for receiving IP multicast packets if(ip_mcast) { // 3a. Create mcast receiver socket mcast_recv_sock=new MulticastSocket(mcast_port); mcast_recv_sock.setTimeToLive(ip_ttl); tmp_addr=InetAddress.getByName(mcast_addr_name); mcast_addr=new IpAddress(tmp_addr, mcast_port); if(bind_to_all_interfaces) { bindToAllInterfaces(mcast_recv_sock, mcast_addr.getIpAddress()); } else { if(bind_addr != null) mcast_recv_sock.setInterface(bind_addr); mcast_recv_sock.joinGroup(tmp_addr); } // 3b. Create mcast sender socket mcast_send_sock=new MulticastSocket(); mcast_send_sock.setTimeToLive(ip_ttl); if(bind_addr != null) mcast_send_sock.setInterface(bind_addr); if(tos > 0) { try { mcast_send_sock.setTrafficClass(tos); // high throughput } catch(SocketException e) { log.warn("traffic class of " + tos + " could not be set, will be ignored", e); } } } setBufferSizes(); if(log.isInfoEnabled()) log.info("socket information:\n" + dumpSocketInfo()); } private void bindToAllInterfaces(MulticastSocket s, InetAddress mcastAddr) throws IOException { SocketAddress tmp_mcast_addr=new InetSocketAddress(mcastAddr, mcast_port); Enumeration en=NetworkInterface.getNetworkInterfaces(); while(en.hasMoreElements()) { NetworkInterface i=(NetworkInterface)en.nextElement(); for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) { InetAddress addr=(InetAddress)en2.nextElement(); // if(addr.isLoopbackAddress()) // continue; s.joinGroup(tmp_mcast_addr, i); if(log.isTraceEnabled()) log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")"); break; } } } /** Creates a DatagramSocket with a random port. Because in certain operating systems, ports are reused, * we keep a list of the n last used ports, and avoid port reuse */ private DatagramSocket createEphemeralDatagramSocket() throws SocketException { DatagramSocket tmp=null; int localPort=0; while(true) { tmp=new DatagramSocket(localPort, bind_addr); // first time localPort is 0 if(num_last_ports <= 0) break; localPort=tmp.getLocalPort(); if(getLastPortsUsed().contains(new Integer(localPort))) { if(log.isDebugEnabled()) log.debug("local port " + localPort + " already seen in this session; will try to get other port"); try {tmp.close();} catch(Throwable e) {} localPort++; continue; } else { getLastPortsUsed().add(new Integer(localPort)); break; } } return tmp; } /** * Creates a DatagramSocket when bind_port > 0. Attempts to allocate the socket with port == bind_port, and * increments until it finds a valid port, or until port_range has been exceeded * @return DatagramSocket The newly created socket * @throws Exception */ private DatagramSocket createDatagramSocketWithBindPort() throws Exception { DatagramSocket tmp=null; // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range) int rcv_port=bind_port, max_port=bind_port + port_range; while(rcv_port <= max_port) { try { tmp=new DatagramSocket(rcv_port, bind_addr); break; } catch(SocketException bind_ex) { // Cannot listen on this port rcv_port++; } catch(SecurityException sec_ex) { // Not allowed to listen on this port rcv_port++; } // Cannot listen at all, throw an Exception if(rcv_port >= max_port + 1) { // +1 due to the increment above throw new Exception("UDP.createSockets(): cannot list on any port in range " + bind_port + '-' + (bind_port + port_range)); } } return tmp; } private String dumpSocketInfo() throws Exception { StringBuffer sb=new StringBuffer(128); sb.append("local_addr=").append(local_addr); sb.append(", mcast_addr=").append(mcast_addr); sb.append(", bind_addr=").append(bind_addr); sb.append(", ttl=").append(ip_ttl); if(sock != null) { sb.append("\nsock: bound to "); sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort()); sb.append(", receive buffer size=").append(sock.getReceiveBufferSize()); sb.append(", send buffer size=").append(sock.getSendBufferSize()); } if(mcast_recv_sock != null) { sb.append("\nmcast_recv_sock: bound to "); sb.append(mcast_recv_sock.getInterface().getHostAddress()).append(':').append(mcast_recv_sock.getLocalPort()); sb.append(", send buffer size=").append(mcast_recv_sock.getSendBufferSize()); sb.append(", receive buffer size=").append(mcast_recv_sock.getReceiveBufferSize()); } if(mcast_send_sock != null) { sb.append("\nmcast_send_sock: bound to "); sb.append(mcast_send_sock.getInterface().getHostAddress()).append(':').append(mcast_send_sock.getLocalPort()); sb.append(", send buffer size=").append(mcast_send_sock.getSendBufferSize()); sb.append(", receive buffer size=").append(mcast_send_sock.getReceiveBufferSize()); } return sb.toString(); } void setBufferSizes() { if(sock != null) { try { sock.setSendBufferSize(ucast_send_buf_size); } catch(Throwable ex) { if(log.isWarnEnabled()) log.warn("failed setting ucast_send_buf_size in sock: " + ex); } try { sock.setReceiveBufferSize(ucast_recv_buf_size); } catch(Throwable ex) { if(log.isWarnEnabled()) log.warn("failed setting ucast_recv_buf_size in sock: " + ex); } } if(mcast_recv_sock != null) { try { mcast_recv_sock.setSendBufferSize(mcast_send_buf_size); } catch(Throwable ex) { if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_recv_sock: " + ex); } try { mcast_recv_sock.setReceiveBufferSize(mcast_recv_buf_size); } catch(Throwable ex) { if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_recv_sock: " + ex); } } if(mcast_send_sock != null) { try { mcast_send_sock.setSendBufferSize(mcast_send_buf_size); } catch(Throwable ex) { if(log.isWarnEnabled()) log.warn("failed setting mcast_send_buf_size in mcast_send_sock: " + ex); } try { mcast_send_sock.setReceiveBufferSize(mcast_recv_buf_size); } catch(Throwable ex) { if(log.isWarnEnabled()) log.warn("failed setting mcast_recv_buf_size in mcast_send_sock: " + ex); } } } /** * Closed UDP unicast and multicast sockets */ void closeSockets() { // 1. Close multicast socket closeMulticastSocket(); // 2. Close socket closeSocket(); } void closeMulticastSocket() { if(mcast_recv_sock != null) { try { if(mcast_addr != null) { mcast_recv_sock.leaveGroup(mcast_addr.getIpAddress()); } mcast_recv_sock.close(); // this will cause the mcast receiver thread to break out of its loop mcast_recv_sock=null; if(log.isDebugEnabled()) log.debug("multicast receive socket closed"); } catch(IOException ex) { } mcast_addr=null; } if(mcast_send_sock != null) { mcast_send_sock.close(); // this will cause the mcast receiver thread to break out of its loop mcast_send_sock=null; if(log.isDebugEnabled()) log.debug("multicast send socket closed"); } } void closeSocket() { if(sock != null) { sock.close(); sock=null; if(log.isDebugEnabled()) log.debug("socket closed"); } } /** * Starts the unicast and multicast receiver threads */ void startThreads() throws Exception { if(ucast_receiver == null) { //start the listener thread of the ucast_recv_sock ucast_receiver=new UcastReceiver(); ucast_receiver.start(); if(log.isDebugEnabled()) log.debug("created unicast receiver thread"); } if(ip_mcast) { if(mcast_receiver != null) { if(mcast_receiver.isAlive()) { if(log.isDebugEnabled()) log.debug("did not create new multicastreceiver thread as existing " + "multicast receiver thread is still running"); } else mcast_receiver=null; // will be created just below... } if(mcast_receiver == null) { mcast_receiver=new Thread(this, "UDP mcast receiver"); mcast_receiver.setPriority(Thread.MAX_PRIORITY); // needed ???? mcast_receiver.setDaemon(true); mcast_receiver.start(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -