udp.java.packethandler
来自「JGRoups源码」· PACKETHANDLER 代码 · 共 593 行
PACKETHANDLER
593 行
package JavaGroups.JavaStack.Protocols;import java.io.*;import java.net.*;import java.util.*;import JavaGroups.*;import JavaGroups.JavaStack.*;class UdpHeader implements Serializable { public String group_addr=null; public UdpHeader(String n) {group_addr=n;} public String toString() { return "[UDP:group_addr=" + group_addr + "]"; }}public class UDP extends Protocol implements Runnable { class IncomingPacketHandler extends Thread { // +++ public void run() { DatagramPacket packet; try { while(true) { packet=(DatagramPacket)incoming_packets.Remove(); _Handle(packet); } } catch(QueueClosed closed) { System.err.println("IncomingPacketHandler: " + closed); } } } public class UcastReceiver extends Thread { public void run() { ByteArrayInputStream inp_stream; ObjectInputStream inp; Message msg; DatagramPacket packet; byte receive_buf[]=new byte[65535]; InetAddress tmp_addr=null; if(trace) Util.Print("UDP: ucast receiver was started"); while(true) { try { packet=new DatagramPacket(receive_buf, receive_buf.length); ucast_recv_sock.receive(packet); HandleIncomingUdpPacket(packet); } catch(Exception e) { System.err.println("UDP.UcastReceiver.run(): " + e); } } } } private DatagramSocket send_sock=null; // send UDP packets private DatagramSocket ucast_recv_sock=null; // receive UCAST UDP packets private MulticastSocket mcast_recv_sock=null; // receive MCAST UDP packets private Address local_addr=null; private String group_addr=null; private Address mcast_addr=null; private String mcast_addr_name=null; private String default_mcast_addr="239.255.10.10"; // site-local private int default_mcast_port=7500; private Thread mcast_receiver=null; private UcastReceiver ucast_receiver=null; private boolean is_linux=CheckForLinux(); private final int UDP_MAX_PACKET_SIZE=60000; private Properties properties=null; private boolean trace=false; private boolean ip_mcast=true; private int ip_ttl=31; // site-local private Vector members=new Vector(); Queue incoming_packets=new Queue(); // +++ remove Thread incoming_packet_handler; // +++ public UDP() {} public String toString() { return "Protocol UDP(local address: " + local_addr + ")"; } /* ------------------------------ Private methods -------------------------------- */ private boolean CheckForLinux() { String os=System.getProperty("os.name"); return os != null && os.equals("Linux") ? true : false; } /** If the sender is null, set our own address. We cannot just go ahead and set the address anyway, as we might be sending a message on behalf of someone else ! E.g. in case of retransmission, when the original sender has crashed, or in a FLUSH protocol when we have to return all unstable messages with the FLUSH_OK response. */ private void SetSourceAddress(Message msg) { if(msg.GetSrc() == null) msg.SetSrc(local_addr); } private void HandleIncomingUdpPacket(DatagramPacket packet) { // +++ remove try { incoming_packets.Add(packet); } catch(Exception e) { System.err.println("UDP.HandleIncomingPacket(): " + e); } } private synchronized void _Handle(DatagramPacket packet) { ByteArrayInputStream inp_stream; ObjectInputStream inp; Message msg=null; InetAddress tmp_addr=null; UdpHeader hdr=null; try { inp_stream=new ByteArrayInputStream(packet.getData()); inp=new ObjectInputStream(inp_stream); msg=(Message)inp.readObject(); hdr=(UdpHeader)msg.RemoveHeader(); } catch(Exception e) { System.err.println("UDP.HandleIncomingUdpPacket(): " + e); return; } if(trace) Util.Print("UDP.run(): received msg " + msg); /* Discard all messages destined for a channel with a different name */ String ch_name=null; if(hdr.group_addr != null) ch_name=hdr.group_addr; if(ch_name != null && !group_addr.equals(ch_name)) return; PassUp(new Event(Event.MSG, msg)); } /** Send a message to the address specified in dest */ private synchronized void SendUdpMessage(Message msg) throws Exception { Address dest, src; ByteArrayOutputStream out_stream; ObjectOutputStream out; byte buf[]; DatagramPacket packet; boolean done=false; dest=(Address)msg.GetDest(); if(dest == null) throw new Exception("UDP.SendUdpMessage(): destination address is null ! " + "(msg=" + msg + ")"); if(!(dest instanceof Address)) throw new Exception("UdpSenderThread.SendPacket(): destination address is not of " + "type Address !"); SetSourceAddress(msg); if(trace) Util.Print("UDP.SendUdpMessage(): msg is " + msg); out_stream=new ByteArrayOutputStream(256); out=new ObjectOutputStream(out_stream); out.writeObject(msg); buf=out_stream.toByteArray(); packet=new DatagramPacket(buf, buf.length, dest.GetIpAddress(), dest.GetPort()); if(dest.GetIpAddress().isMulticastAddress()) { try { mcast_recv_sock.send(packet); // all multicast messages } catch(Exception e) { System.err.println("UDP.SendUdpMessage(): " + e); } finally { return; } } while(!done) { try { send_sock.send(packet); // only for unicast messages if(is_linux) { send_sock.close(); send_sock=new DatagramSocket(); } done=true; } catch(Exception e) { System.err.println("UDP.SendUdpMessage: " + e); if(send_sock == null) break; } } } private synchronized void SendMultipleUdpMessages(Message msg, Vector dests) { Address dest; for(int i=0; i < dests.size(); i++) { dest=(Address)dests.elementAt(i); msg.SetDest(dest); try { SendUdpMessage(msg); } catch(Exception e) { System.err.println("UDP.SendMultipleUdpMessages(): " + e); } } } /** Create UDP sender and receiver sockets. Currently there are 2 sockets (sending and receiving). This is due to Linux's non-BSD compatibility in the JDK port (see ./design/sendto.txt). */ private void CreateSockets() throws Exception { InetAddress tmp_addr=null; Address addr; send_sock=new DatagramSocket(); ucast_recv_sock=new DatagramSocket(); // local_addr=new Address(ucast_recv_sock.getLocalAddress(), ucast_recv_sock.getLocalPort()); /** WORKAROUND: DatagramSocket.getLocalAddress() returns 'localhost', but we need the real address. This kludge can be removed as soon as the bug is fixed in the JDK. */ local_addr=new Address(ucast_recv_sock.getLocalPort()); if(ip_mcast) { mcast_recv_sock=new MulticastSocket(default_mcast_port); mcast_recv_sock.setTTL((byte)ip_ttl); tmp_addr=mcast_addr_name != null ? InetAddress.getByName(mcast_addr_name) : InetAddress.getByName(default_mcast_addr); mcast_addr=new Address(tmp_addr, default_mcast_port); mcast_recv_sock.joinGroup(tmp_addr); // Util.Print("UDP joining group address " + tmp_addr); } if(trace) System.out.println("local_addr=" + local_addr + ", mcast_addr=" + mcast_addr); } private void CloseSockets() throws Exception { if(send_sock != null) { send_sock.close(); send_sock=null; } if(ucast_recv_sock != null) { ucast_recv_sock.close(); ucast_recv_sock=null; } if(mcast_recv_sock != null) { mcast_recv_sock.leaveGroup(mcast_addr.GetIpAddress()); mcast_addr=null; mcast_recv_sock.close(); mcast_recv_sock=null; } } private void HandleDownEvent(Event evt) { switch(evt.GetType()) { case Event.TMP_VIEW: case Event.VIEW_CHANGE: synchronized(members) { members.removeAllElements(); Vector tmpvec=((View)evt.GetArg()).GetMembers(); for(int i=0; i < tmpvec.size(); i++) members.addElement(tmpvec.elementAt(i)); } break; case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local) PassUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); break; case Event.FLUSH: PassUp(new Event(Event.FLUSH_OK)); break; case Event.STOP: StopWork(); PassUp(new Event(Event.STOP_OK)); break; case Event.CONNECT: group_addr=(String)evt.GetArg(); break; } } /* ------------------------------------------------------------------------------- */ /* -------------------- Receiving of MCAST/UCAST UDP packets --------------------- */ public void run() { ByteArrayInputStream inp_stream; ObjectInputStream inp; Message msg; DatagramPacket packet; byte receive_buf[]=new byte[65000]; InetAddress tmp_addr=null; if(trace) Util.Print("UDP: mcast receiver was started"); while(true) { try { packet=new DatagramPacket(receive_buf, receive_buf.length); mcast_recv_sock.receive(packet); HandleIncomingUdpPacket(packet); } catch(Exception e) { System.err.println("UDP.run(): " + e); } } } /* ------------------------------------------------------------------------------- */ /*------------------------------ Protocol interface ------------------------------ */ public String GetName() {return "UDP";} /** Setup the Protocol instance acording to the configuration string */ public boolean SetProperties(Properties props) { String str; this.props=props; str=props.getProperty("mcast_addr"); if(str != null) { mcast_addr_name=new String(str); props.remove("mcast_addr"); } str=props.getProperty("mcast_port"); if(str != null) { default_mcast_port=new Integer(str).intValue(); props.remove("mcast_port"); } str=props.getProperty("trace"); if(str != null) { trace=new Boolean(str).booleanValue(); props.remove("trace"); } str=props.getProperty("ip_mcast"); if(str != null) { ip_mcast=new Boolean(str).booleanValue(); props.remove("ip_mcast"); } str=props.getProperty("ip_ttl"); if(str != null) { ip_ttl=new Integer(str).intValue(); props.remove("ip_mcast"); } if(props.size() > 0) { System.err.println("UDP.SetProperties(): the following properties are not recognized:"); props.list(System.out); return false; } return true; } public void Up(Event evt) { if(evt.GetType() == Event.START) { PassUp(evt); StartWork(); PassUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); //PassUp(evt); PassUp(new Event(Event.START_OK)); return; } PassUp(evt); } /** * Caller by the layer above this layer. Usually we just put this Message * into the send queue and let one or more worker threads handle it. A worker thread * then removes the Message from the send queue, performs a conversion and adds the * modified Message to the send queue of the layer below it, by calling Down). */ public void Down(Event evt) { Message msg; Object dest_addr; UdpHeader hdr; if(trace) Util.Print("UDP.Down(): event is " + evt); if(evt.GetType() != Event.MSG) { HandleDownEvent(evt); return; } msg=(Message)evt.GetArg(); /* Add header (includes channel name) */ hdr=new UdpHeader(group_addr); msg.AddHeader(hdr); dest_addr=msg.GetDest(); if(dest_addr == null) { // 'null' means send to all group members if(group_addr == null) { System.err.println("UDP.Down(): dest address of message is null, and " + "sending to default address fails as group_addr is null, too !" + " Discarding message."); return; } else if(group_addr instanceof String) { // send to group if(ip_mcast) msg.SetDest(mcast_addr); else { SendMultipleUdpMessages(msg, members); return; } } } try { SendUdpMessage(msg); } catch(Exception e) { System.err.println(e); } } private void StartWork() { CheckForLinux(); try { CreateSockets(); } catch(Exception e) { System.err.println(e); return; } if(ucast_receiver == null) { ucast_receiver=new UcastReceiver(); ucast_receiver.start(); } if(ip_mcast) { if(mcast_receiver == null) { mcast_receiver=new Thread(this); mcast_receiver.start(); } } incoming_packet_handler=new IncomingPacketHandler(); incoming_packet_handler.start(); } private synchronized void StopWork() { if(ip_mcast) if(mcast_receiver != null) { mcast_receiver.stop(); try { mcast_receiver.join(2000); } catch(Exception e) { System.err.println(e); } mcast_receiver=null; } if(ucast_receiver != null) { ucast_receiver.stop(); try { ucast_receiver.join(2000); } catch(Exception e) { System.err.println(e); } ucast_receiver=null; } try { CloseSockets(); } catch(Exception e) { System.err.println(e); } } /*--------------------------- End of Protocol interface -------------------------- */ }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?