📄 udp.java.txt
字号:
incoming_queue=new Queue(); incoming_packet_handler=new IncomingPacketHandler(); } if(use_outgoing_packet_handler) { outgoing_queue=new Queue(); if(enable_bundling) { timer=stack != null? stack.timer : null; if(timer == null) throw new Exception("timer could not be retrieved"); outgoing_packet_handler=new BundlingOutgoingPacketHandler(); } else outgoing_packet_handler=new OutgoingPacketHandler(); } } /** * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads */ public void start() throws Exception { if(log.isDebugEnabled()) log.debug("creating sockets and starting threads"); createSockets(); passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); startThreads(); } public void stop() { if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads"); stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but... closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed) } /** * Setup the Protocol instance acording to the configuration string * The following properties are being read by the UDP protocol * param mcast_addr - the multicast address to use default is 228.8.8.8 * param mcast_port - (int) the port that the multicast is sent on default is 7600 * param ip_mcast - (boolean) flag whether to use IP multicast - default is true * param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32 * @return true if no other properties are left. * false if the properties still have data in them, ie , * properties are left over and not handled by the protocol stack */ public boolean setProperties(Properties props) { String str; String tmp = null; super.setProperties(props); // PropertyPermission not granted if running in an untrusted environment with JNLP. try { tmp=System.getProperty("bind.address"); if(Boolean.getBoolean(IGNORE_BIND_ADDRESS_PROPERTY)) { tmp=null; } } catch (SecurityException ex){ } if(tmp != null) str=tmp; else str=props.getProperty("bind_addr"); if(str != null) { try { bind_addr=InetAddress.getByName(str); } catch(UnknownHostException unknown) { if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); return false; } props.remove("bind_addr"); } str=props.getProperty("bind_to_all_interfaces"); if(str != null) { bind_to_all_interfaces=new Boolean(str).booleanValue(); props.remove("bind_to_all_interfaces"); } str=props.getProperty("bind_port"); if(str != null) { bind_port=Integer.parseInt(str); props.remove("bind_port"); } str=props.getProperty("num_last_ports"); if(str != null) { num_last_ports=Integer.parseInt(str); props.remove("num_last_ports"); } str=props.getProperty("start_port"); if(str != null) { bind_port=Integer.parseInt(str); props.remove("start_port"); } str=props.getProperty("port_range"); if(str != null) { port_range=Integer.parseInt(str); props.remove("port_range"); } str=props.getProperty("mcast_addr"); if(str != null) { mcast_addr_name=str; props.remove("mcast_addr"); } str=props.getProperty("mcast_port"); if(str != null) { mcast_port=Integer.parseInt(str); props.remove("mcast_port"); } str=props.getProperty("ip_mcast"); if(str != null) { ip_mcast=Boolean.valueOf(str).booleanValue(); props.remove("ip_mcast"); } str=props.getProperty("ip_ttl"); if(str != null) { ip_ttl=Integer.parseInt(str); props.remove("ip_ttl"); } str=props.getProperty("tos"); if(str != null) { tos=Integer.parseInt(str); props.remove("tos"); } str=props.getProperty("mcast_send_buf_size"); if(str != null) { mcast_send_buf_size=Integer.parseInt(str); props.remove("mcast_send_buf_size"); } str=props.getProperty("mcast_recv_buf_size"); if(str != null) { mcast_recv_buf_size=Integer.parseInt(str); props.remove("mcast_recv_buf_size"); } str=props.getProperty("ucast_send_buf_size"); if(str != null) { ucast_send_buf_size=Integer.parseInt(str); props.remove("ucast_send_buf_size"); } str=props.getProperty("ucast_recv_buf_size"); if(str != null) { ucast_recv_buf_size=Integer.parseInt(str); props.remove("ucast_recv_buf_size"); } str=props.getProperty("loopback"); if(str != null) { loopback=Boolean.valueOf(str).booleanValue(); props.remove("loopback"); } str=props.getProperty("discard_incompatible_packets"); if(str != null) { discard_incompatible_packets=Boolean.valueOf(str).booleanValue(); props.remove("discard_incompatible_packets"); } // this is deprecated, just left for compatibility (use use_incoming_packet_handler) str=props.getProperty("use_packet_handler"); if(str != null) { use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); props.remove("use_packet_handler"); if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead"); } str=props.getProperty("use_incoming_packet_handler"); if(str != null) { use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); props.remove("use_incoming_packet_handler"); } str=props.getProperty("use_outgoing_packet_handler"); if(str != null) { use_outgoing_packet_handler=Boolean.valueOf(str).booleanValue(); props.remove("use_outgoing_packet_handler"); } str=props.getProperty("max_bundle_size"); if(str != null) { int bundle_size=Integer.parseInt(str); if(bundle_size > max_bundle_size) { if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is greater than largest UDP fragmentation size (" + max_bundle_size + ')'); return false; } if(bundle_size <= 0) { if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0"); return false; } max_bundle_size=bundle_size; props.remove("max_bundle_size"); } str=props.getProperty("max_bundle_timeout"); if(str != null) { max_bundle_timeout=Long.parseLong(str); if(max_bundle_timeout <= 0) { if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid"); return false; } props.remove("max_bundle_timeout"); } str=props.getProperty("enable_bundling"); if(str != null) { enable_bundling=Boolean.valueOf(str).booleanValue(); props.remove("enable_bundling"); } str=props.getProperty("use_addr_translation"); if(str != null) { use_addr_translation=Boolean.valueOf(str).booleanValue(); props.remove("use_addr_translation"); } str=props.getProperty("null_src_addresses"); if(str != null) { null_src_addresses=Boolean.valueOf(str).booleanValue(); props.remove("null_src_addresses"); } if(props.size() > 0) { log.error("UDP.setProperties(): the following properties are not recognized: " + props); return false; } if(enable_bundling) { if(use_outgoing_packet_handler == false) if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true"); use_outgoing_packet_handler=true; } return true; } /** * DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous: * messages are received from the network rather than from a layer below. */ public void startUpHandler() { ; } /** * handle the UP event. * @param evt - the event being send from the stack */ public void up(Event evt) { switch(evt.getType()) { case Event.CONFIG: passUp(evt); if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); return; } passUp(evt); } /** * Caller by the layer above this layer. Usually we just put this Message * into the send queue and let one or more worker threads handle it. A worker thread * then removes the Message from the send queue, performs a conversion and adds the * modified Message to the send queue of the layer below it, by calling Down). */ public void down(Event evt) { Message msg; Object dest_addr; if(evt.getType() != Event.MSG) { // unless it is a message handle it and respond handleDownEvent(evt); return; } msg=(Message)evt.getArg(); if(channel_name != null) { // added patch by Roland Kurmann (March 20 2003) // msg.putHeader(name, new UdpHeader(channel_name)); msg.putHeader(name, udp_hdr); } dest_addr=msg.getDest(); // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver). // This way, we still have performance numbers for UDP if(observer != null) observer.passDown(evt); if(dest_addr == null) { // 'null' means send to all group members if(ip_mcast) { if(mcast_addr == null) { if(log.isErrorEnabled()) log.error("dest address of message is null, and " + "sending to default address fails as mcast_addr is null, too !" + " Discarding message " + Util.printEvent(evt)); return; } // if we want to use IP multicast, then set the destination of the message msg.setDest(mcast_addr); } else { //sends a separate UDP message to each address sendMultipleUdpMessages(msg, members); return; } } try { sendUdpMessage(msg); } catch(Exception e) { if(log.isErrorEnabled()) log.error("exception=" + e + ", msg=" + msg + ", mcast_addr=" + mcast_addr); } } /*--------------------------- End of Protocol interface -------------------------- */ /* ------------------------------ Private Methods -------------------------------- */ /** * If the sender is null, set our own address. We cannot just go ahead and set the address * anyway, as we might be sending a message on behalf of someone else ! E.gin case of * retransmission, when the original sender has crashed, or in a FLUSH protocol when we * have to return all unstable messages with the FLUSH_OK response. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -