udp.java.packethandler

来自「JGRoups源码」· PACKETHANDLER 代码 · 共 593 行

PACKETHANDLER
593
字号
package JavaGroups.JavaStack.Protocols;import java.io.*;import java.net.*;import java.util.*;import JavaGroups.*;import JavaGroups.JavaStack.*;class UdpHeader implements Serializable {    public String group_addr=null;    public UdpHeader(String n) {group_addr=n;}    public String toString() {	return "[UDP:group_addr=" + group_addr + "]";    }}public class UDP extends Protocol implements Runnable {    class IncomingPacketHandler extends Thread { // +++	public void run() {	    DatagramPacket packet;	    try {		while(true) {		    packet=(DatagramPacket)incoming_packets.Remove();		    _Handle(packet);		}	    }	    catch(QueueClosed closed) {		System.err.println("IncomingPacketHandler: " + closed);	    }	}    }    public class UcastReceiver extends Thread {		public void run() {	    ByteArrayInputStream inp_stream;	    ObjectInputStream    inp;	    Message              msg;	    DatagramPacket       packet;	    byte                 receive_buf[]=new byte[65535];	    InetAddress          tmp_addr=null;	    if(trace) Util.Print("UDP: ucast receiver was started");	    while(true) {				try {		    packet=new DatagramPacket(receive_buf, receive_buf.length);		    ucast_recv_sock.receive(packet);		    HandleIncomingUdpPacket(packet);		}		catch(Exception e) {		    System.err.println("UDP.UcastReceiver.run(): " + e);		}	    	    }	}    }    private DatagramSocket     send_sock=null;         // send UDP packets    private DatagramSocket     ucast_recv_sock=null;   // receive UCAST UDP packets    private MulticastSocket    mcast_recv_sock=null;   // receive MCAST UDP packets    private Address            local_addr=null;    private String             group_addr=null;    private Address            mcast_addr=null;    private String             mcast_addr_name=null;    private String             default_mcast_addr="239.255.10.10";  // site-local    private int                default_mcast_port=7500;    private Thread             mcast_receiver=null;    private UcastReceiver      ucast_receiver=null;    private boolean            is_linux=CheckForLinux();    private final int          UDP_MAX_PACKET_SIZE=60000;    private Properties         properties=null;    private boolean            trace=false;    private boolean            ip_mcast=true;    private int                ip_ttl=31; // site-local    private Vector             members=new Vector();    Queue                      incoming_packets=new Queue(); // +++ remove    Thread                     incoming_packet_handler;      // +++    public UDP() {}    public String toString() {	return "Protocol UDP(local address: " + local_addr + ")";    }    /* ------------------------------ Private methods -------------------------------- */    private boolean CheckForLinux() {	String os=System.getProperty("os.name");	return os != null && os.equals("Linux") ? true : false;    }    /**       If the sender is null, set our own address. We cannot just go ahead and set the address       anyway, as we might be sending a message on behalf of someone else ! E.g. in case of       retransmission, when the original sender has crashed, or in a FLUSH protocol when we       have to return all unstable messages with the FLUSH_OK response.     */    private void SetSourceAddress(Message msg) {	if(msg.GetSrc() == null)	    msg.SetSrc(local_addr);    }    private void HandleIncomingUdpPacket(DatagramPacket packet) {  // +++ remove	try {	    incoming_packets.Add(packet);	}	catch(Exception e) {	    System.err.println("UDP.HandleIncomingPacket(): " + e);	}    }        private synchronized void _Handle(DatagramPacket packet) {	ByteArrayInputStream inp_stream;	ObjectInputStream    inp;	Message              msg=null;	InetAddress          tmp_addr=null;	UdpHeader            hdr=null;	try {	    inp_stream=new ByteArrayInputStream(packet.getData());	    inp=new ObjectInputStream(inp_stream);	    msg=(Message)inp.readObject();	    hdr=(UdpHeader)msg.RemoveHeader();	}	catch(Exception e) {	    System.err.println("UDP.HandleIncomingUdpPacket(): " + e);	    return;	}	if(trace) Util.Print("UDP.run(): received msg " + msg);		/* Discard all messages destined for a channel with a different name */	String ch_name=null;	if(hdr.group_addr != null)	    ch_name=hdr.group_addr;	if(ch_name != null && !group_addr.equals(ch_name))	    return;			PassUp(new Event(Event.MSG, msg));	    }            /** Send a message to the address specified in dest */    private synchronized void SendUdpMessage(Message msg) throws Exception {	Address                 dest, src;	ByteArrayOutputStream   out_stream;	ObjectOutputStream      out;	byte                    buf[];	DatagramPacket          packet;	boolean                 done=false;	dest=(Address)msg.GetDest();	if(dest == null)	    throw new Exception("UDP.SendUdpMessage(): destination address is null ! " +				"(msg=" + msg + ")");	if(!(dest instanceof Address))	    throw new Exception("UdpSenderThread.SendPacket(): destination address is not of " +				"type Address !");	SetSourceAddress(msg);	if(trace) Util.Print("UDP.SendUdpMessage(): msg is " + msg);	out_stream=new ByteArrayOutputStream(256);	out=new ObjectOutputStream(out_stream);		out.writeObject(msg);	buf=out_stream.toByteArray();	packet=new DatagramPacket(buf, buf.length, dest.GetIpAddress(), dest.GetPort());	if(dest.GetIpAddress().isMulticastAddress()) {	    try {		mcast_recv_sock.send(packet); // all multicast messages	    }	    catch(Exception e) {		System.err.println("UDP.SendUdpMessage(): " + e);	    }	    finally {		return;	    }	}	while(!done) {		    try {		send_sock.send(packet);  // only for unicast messages		if(is_linux) {		  send_sock.close();		  send_sock=new DatagramSocket();		}		done=true;	    }	    catch(Exception e) {		System.err.println("UDP.SendUdpMessage: " + e);		if(send_sock == null)		    break;	    }	}    }    private synchronized void SendMultipleUdpMessages(Message msg, Vector dests) {	Address dest;	for(int i=0; i < dests.size(); i++) {	    dest=(Address)dests.elementAt(i);	    msg.SetDest(dest);	    try {		SendUdpMessage(msg);	    }	    catch(Exception e) {		System.err.println("UDP.SendMultipleUdpMessages(): " + e);	    }	}    }        /**       Create UDP sender and receiver sockets. Currently there are 2 sockets        (sending and receiving). This is due to Linux's non-BSD compatibility        in the JDK port (see ./design/sendto.txt).    */    private void CreateSockets() throws Exception {	InetAddress tmp_addr=null;	Address     addr;	send_sock=new DatagramSocket();		ucast_recv_sock=new DatagramSocket();	// local_addr=new Address(ucast_recv_sock.getLocalAddress(), ucast_recv_sock.getLocalPort());	/**	   WORKAROUND: DatagramSocket.getLocalAddress() returns 'localhost', but we need the real	   address. This kludge can be removed as soon as the bug is fixed in the JDK.	 */	local_addr=new Address(ucast_recv_sock.getLocalPort());	if(ip_mcast) {	    mcast_recv_sock=new MulticastSocket(default_mcast_port);	    mcast_recv_sock.setTTL((byte)ip_ttl);	    tmp_addr=mcast_addr_name != null ? InetAddress.getByName(mcast_addr_name) :		                               InetAddress.getByName(default_mcast_addr);	    mcast_addr=new Address(tmp_addr, default_mcast_port);	    mcast_recv_sock.joinGroup(tmp_addr);	    // Util.Print("UDP joining group address " + tmp_addr);	}	if(trace)	    System.out.println("local_addr=" + local_addr + ", mcast_addr=" + mcast_addr);    }    private void CloseSockets() throws Exception {	if(send_sock != null) {	    send_sock.close();	    send_sock=null;	}	if(ucast_recv_sock != null) {	    ucast_recv_sock.close();	    ucast_recv_sock=null;	}	if(mcast_recv_sock != null) {	    mcast_recv_sock.leaveGroup(mcast_addr.GetIpAddress());	    mcast_addr=null;	    mcast_recv_sock.close();	    mcast_recv_sock=null;	}    }    private void HandleDownEvent(Event evt) {	switch(evt.GetType()) {	case Event.TMP_VIEW:	case Event.VIEW_CHANGE:	    synchronized(members) {		members.removeAllElements();		Vector tmpvec=((View)evt.GetArg()).GetMembers();		for(int i=0; i < tmpvec.size(); i++)		    members.addElement(tmpvec.elementAt(i));			    }	    break;	case Event.GET_LOCAL_ADDRESS:   // return local address -> Event(SET_LOCAL_ADDRESS, local)	    PassUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));	    break;	case Event.FLUSH:	    PassUp(new Event(Event.FLUSH_OK));	    break;	case Event.STOP:	    StopWork();	    PassUp(new Event(Event.STOP_OK));	    break;	case Event.CONNECT:	    group_addr=(String)evt.GetArg();	    break;	}    }    /* ------------------------------------------------------------------------------- */    /* -------------------- Receiving of MCAST/UCAST UDP packets --------------------- */    public void run() {	ByteArrayInputStream inp_stream;	ObjectInputStream    inp;	Message              msg;	DatagramPacket       packet;	byte                 receive_buf[]=new byte[65000];	InetAddress          tmp_addr=null;	if(trace) Util.Print("UDP: mcast receiver was started");	while(true) {			    try {		packet=new DatagramPacket(receive_buf, receive_buf.length);		mcast_recv_sock.receive(packet);		HandleIncomingUdpPacket(packet);	    }	    catch(Exception e) {		System.err.println("UDP.run(): " + e);	    }	    	}    }    /* ------------------------------------------------------------------------------- */    /*------------------------------ Protocol interface ------------------------------ */    public String GetName() {return "UDP";}    /** Setup the Protocol instance acording to the configuration string */    public boolean SetProperties(Properties props) {	String     str;	this.props=props;	str=props.getProperty("mcast_addr");	if(str != null) {	    mcast_addr_name=new String(str);	    props.remove("mcast_addr");	}	str=props.getProperty("mcast_port");	if(str != null) {	    default_mcast_port=new Integer(str).intValue();	    props.remove("mcast_port");	}	str=props.getProperty("trace");	if(str != null) {	    trace=new Boolean(str).booleanValue();	    props.remove("trace");	}	str=props.getProperty("ip_mcast");	if(str != null) {	    ip_mcast=new Boolean(str).booleanValue();	    props.remove("ip_mcast");	}	str=props.getProperty("ip_ttl");	if(str != null) {	    ip_ttl=new Integer(str).intValue();	    props.remove("ip_mcast");	}	if(props.size() > 0) {	    System.err.println("UDP.SetProperties(): the following properties are not recognized:");	    props.list(System.out);	    return false;	}	return true;    }    public void Up(Event evt) {	if(evt.GetType() == Event.START) {	    PassUp(evt);	    StartWork();	    PassUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));	    //PassUp(evt);	    PassUp(new Event(Event.START_OK));	    return;	}		PassUp(evt);    }    /**     * Caller by the layer above this layer. Usually we just put this Message     * into the send queue and let one or more worker threads handle it. A worker thread     * then removes the Message from the send queue, performs a conversion and adds the     * modified Message to the send queue of the layer below it, by calling Down).     */    public void Down(Event evt) {	Message      msg;	Object       dest_addr;	UdpHeader    hdr;	if(trace) Util.Print("UDP.Down(): event is " + evt);		if(evt.GetType() != Event.MSG) {	    HandleDownEvent(evt);	    return;	}	msg=(Message)evt.GetArg();	/* Add header (includes channel name) */	hdr=new UdpHeader(group_addr);	msg.AddHeader(hdr);	dest_addr=msg.GetDest();		if(dest_addr == null) {  // 'null' means send to all group members	    if(group_addr == null) {		System.err.println("UDP.Down(): dest address of message is null, and " + 				   "sending to default address fails as group_addr is null, too !" +				   " Discarding message.");		return;	    }	    else if(group_addr instanceof String) {   // send to group		if(ip_mcast)		    msg.SetDest(mcast_addr);		else {		    SendMultipleUdpMessages(msg, members);		    return;		}	    }	}	try {	    SendUdpMessage(msg);	}	catch(Exception e) {	    System.err.println(e);	}	    }    private void StartWork() {	CheckForLinux();	try {	    CreateSockets();	}	catch(Exception e) {	    System.err.println(e);	    return;	}	if(ucast_receiver == null) {	    ucast_receiver=new UcastReceiver();	    ucast_receiver.start();	}	if(ip_mcast) {	    if(mcast_receiver == null) {		mcast_receiver=new Thread(this);		mcast_receiver.start();	    }	}	incoming_packet_handler=new IncomingPacketHandler();	incoming_packet_handler.start();    }    private synchronized void StopWork() {	if(ip_mcast)	    if(mcast_receiver != null) {	    		mcast_receiver.stop();		try {		    mcast_receiver.join(2000);		}		catch(Exception e) {		    System.err.println(e);		}		mcast_receiver=null;	    }	if(ucast_receiver != null) {	    ucast_receiver.stop();	    try {		ucast_receiver.join(2000);	    }	    catch(Exception e) {		System.err.println(e);	    }	    ucast_receiver=null;	}	try {	    CloseSockets();	}	catch(Exception e) {	    System.err.println(e);	}    }    /*--------------------------- End of Protocol interface -------------------------- */        }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?