📄 udp.java.txt
字号:
if(use_outgoing_packet_handler) outgoing_packet_handler.start(); if(use_incoming_packet_handler) incoming_packet_handler.start(); } /** * Stops unicast and multicast receiver threads */ void stopThreads() { Thread tmp; // 1. Stop the multicast receiver thread if(mcast_receiver != null) { if(mcast_receiver.isAlive()) { tmp=mcast_receiver; mcast_receiver=null; closeMulticastSocket(); // will cause the multicast thread to terminate tmp.interrupt(); try { tmp.join(100); } catch(Exception e) { } tmp=null; } mcast_receiver=null; } // 2. Stop the unicast receiver thread if(ucast_receiver != null) { ucast_receiver.stop(); ucast_receiver=null; } // 3. Stop the in_packet_handler thread if(incoming_packet_handler != null) incoming_packet_handler.stop(); // 4. Stop the outgoing packet handler thread if(outgoing_packet_handler != null) outgoing_packet_handler.stop(); } void handleDownEvent(Event evt) { switch(evt.getType()) { case Event.TMP_VIEW: case Event.VIEW_CHANGE: synchronized(members) { members.removeAllElements(); Vector tmpvec=((View)evt.getArg()).getMembers(); for(int i=0; i < tmpvec.size(); i++) members.addElement(tmpvec.elementAt(i)); } break; case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local) passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); break; case Event.CONNECT: channel_name=(String)evt.getArg(); udp_hdr=new UdpHeader(channel_name); // removed March 18 2003 (bela), not needed (handled by GMS) // changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might // be needed if we run without GMS though passUp(new Event(Event.CONNECT_OK)); break; case Event.DISCONNECT: passUp(new Event(Event.DISCONNECT_OK)); break; case Event.CONFIG: if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); handleConfigEvent((HashMap)evt.getArg()); break; } } void handleConfigEvent(HashMap map) { if(map == null) return; if(map.containsKey("additional_data")) additional_data=(byte[])map.get("additional_data"); if(map.containsKey("send_buf_size")) { mcast_send_buf_size=((Integer)map.get("send_buf_size")).intValue(); ucast_send_buf_size=mcast_send_buf_size; } if(map.containsKey("recv_buf_size")) { mcast_recv_buf_size=((Integer)map.get("recv_buf_size")).intValue(); ucast_recv_buf_size=mcast_recv_buf_size; } setBufferSizes(); } /* ----------------------------- End of Private Methods ---------------------------------------- */ /* ----------------------------- Inner Classes ---------------------------------------- */ class IncomingQueueEntry { IpAddress dest=null; InetAddress sender=null; int port=-1; byte[] buf; public IncomingQueueEntry(IpAddress dest, InetAddress sender, int port, byte[] buf) { this.dest=dest; this.sender=sender; this.port=port; this.buf=buf; } public IncomingQueueEntry(byte[] buf) { this.buf=buf; } } public class UcastReceiver implements Runnable { boolean running=true; Thread thread=null; public void start() { if(thread == null) { thread=new Thread(this, "UDP.UcastReceiverThread"); thread.setDaemon(true); running=true; thread.start(); } } public void stop() { Thread tmp; if(thread != null && thread.isAlive()) { running=false; tmp=thread; thread=null; closeSocket(); // this will cause the thread to break out of its loop tmp.interrupt(); tmp=null; } thread=null; } public void run() { DatagramPacket packet; byte receive_buf[]=new byte[65535]; int len; byte[] data, tmp; InetAddress sender_addr; int sender_port; // moved out of loop to avoid excessive object creations (bela March 8 2001) packet=new DatagramPacket(receive_buf, receive_buf.length); while(running && thread != null && sock != null) { try { packet.setData(receive_buf, 0, receive_buf.length); sock.receive(packet); sender_addr=packet.getAddress(); sender_port=packet.getPort(); len=packet.getLength(); data=packet.getData(); if(log.isTraceEnabled()) log.trace(new StringBuffer("received (ucast) ").append(len).append(" bytes from "). append(sender_addr).append(':').append(sender_port)); if(len > receive_buf.length) { if(log.isErrorEnabled()) log.error("size of the received packet (" + len + ") is bigger than allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " + "Use the FRAG protocol and make its frag_size lower than " + receive_buf.length); } if(use_incoming_packet_handler) { tmp=new byte[len]; System.arraycopy(data, 0, tmp, 0, len); incoming_queue.add(new IncomingQueueEntry(local_addr, sender_addr, sender_port, tmp)); } else handleIncomingUdpPacket(local_addr, sender_addr, sender_port, data); } catch(SocketException sock_ex) { if(log.isDebugEnabled()) log.debug("unicast receiver socket is closed, exception=" + sock_ex); break; } catch(InterruptedIOException io_ex) { // thread was interrupted ; // go back to top of loop, where we will terminate loop } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("[" + local_addr + "] failed receiving unicast packet", ex); Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !) } } if(log.isDebugEnabled()) log.debug("unicast receiver thread terminated"); } } /** * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up * to the higher layer (done in handleIncomingUdpPacket()). */ class IncomingPacketHandler implements Runnable { Thread t=null; public void run() { byte[] data; IncomingQueueEntry entry; while(incoming_queue != null && incoming_packet_handler != null) { try { entry=(IncomingQueueEntry)incoming_queue.remove(); data=entry.buf; } catch(QueueClosedException closed_ex) { if(log.isDebugEnabled()) log.debug("packet_handler thread terminating"); break; } handleIncomingUdpPacket(entry.dest, entry.sender, entry.port, data); } } void start() { if(t == null || !t.isAlive()) { t=new Thread(this, "UDP.IncomingPacketHandler thread"); t.setDaemon(true); t.start(); } } void stop() { if(incoming_queue != null) incoming_queue.close(false); // should terminate the packet_handler thread too t=null; incoming_queue=null; } } /** * This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them * using the unicast or multicast socket */ class OutgoingPacketHandler implements Runnable { Thread t=null; byte[] buf; DatagramPacket packet; IpAddress dest; public void run() { Message msg; while(outgoing_queue != null && outgoing_packet_handler != null) { try { msg=(Message)outgoing_queue.remove(); handleMessage(msg); } catch(QueueClosedException closed_ex) { break; } catch(Throwable th) { if(log.isErrorEnabled()) log.error("exception sending packet", th); } msg=null; // let's give the poor garbage collector a hand... } if(log.isTraceEnabled()) log.trace("packet_handler thread terminating"); } protected void handleMessage(Message msg) throws Exception { send(msg); } void start() { if(t == null || !t.isAlive()) { t=new Thread(this, "UDP.OutgoingPacketHandler thread"); t.setDaemon(true); t.start(); } } void stop() { if(outgoing_queue != null) outgoing_queue.close(false); // should terminate the packet_handler thread too t=null; // outgoing_queue=null; } } /** * Bundles smaller messages into bigger ones. Collects messages in a list until * messages of a total of <tt>max_bundle_size bytes</tt> have accumulated, or until * <tt>max_bundle_timeout</tt> milliseconds have elapsed, whichever is first. Messages * are unbundled at the receiver. */ class BundlingOutgoingPacketHandler extends OutgoingPacketHandler { long total_bytes=0; /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */ final HashMap msgs=new HashMap(11); void start() { super.start(); t.setName("UDP.BundlingOutgoingPacketHandler thread"); } public void run() { Message msg=null, leftover=null; long start=0; while(outgoing_queue != null) { try { total_bytes=0; msg=leftover != null? leftover : (Message)outgoing_queue.remove(); // blocks until message is available start=System.currentTimeMillis(); leftover=waitForMessagesToAccumulate(msg, outgoing_queue, max_bundle_size, start, max_bundle_timeout); bundleAndSend(start); } catch(QueueClosedException closed_ex) { break; } catch(Throwable th) { if(log.isErrorEnabled()) log.error("exception sending packet", th); } } bundleAndSend(start); if(log.isTraceEnabled()) log.trace("packet_handler thread terminating"); } /** * Waits until max_size bytes have accumulated in the queue, or max_time milliseconds have elapsed. * When a message cannot be added to the ready-to-send bundle, it is returned, so the caller can * re-submit it again next time. * @param m * @param q * @param max_size * @param max_time
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -