inetechoxchange.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 323 行

JAVA
323
字号
//// Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//  // This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// // This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.// // You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: //	Brian Weaver	<weave@oculan.com>//	http://www.opennms.org///// Tab Stop = 8////import java.lang.*;import java.net.*;import java.io.*;import java.util.*;public class InetEchoXchange{	static class BufferXchange	{		private static final int RECV = 0;		private static final int SENT = 1;		private byte[] m_sent;		private int m_state;		BufferXchange()		{			this.m_sent = null;			this.m_state= RECV;		}		synchronized boolean isReceived()		{			return this.m_state == RECV;		}		synchronized boolean isSent()		{			return this.m_state == SENT;		}		synchronized void setRecipt()		{			this.m_state = RECV;			this.notifyAll();		}		synchronized void setSent(byte[] buffer)		{			this.m_state = SENT;			this.m_sent  = buffer;			this.notifyAll();		}		synchronized byte[] getBuffer()		{			return this.m_sent;		}	}	static class Sender		extends Thread	{		private DataOutputStream m_stream;		private BufferXchange m_xchange;		private Random m_rand;		private int m_monitor;		Sender(BufferXchange xchange, OutputStream outs)		{			super("Sender" + (System.currentTimeMillis() / 1000) % (24*3600));			m_xchange = xchange;			m_stream = new DataOutputStream(new BufferedOutputStream(outs));			m_rand = new Random(System.currentTimeMillis());			m_monitor = 0;		}		synchronized int getMonitor()		{			return m_monitor;		}		public void run()		{			for(;;)			{				int len = m_rand.nextInt(4096);				byte[] data = new byte[len];				m_rand.nextBytes(data);				// send the data now				//				synchronized(m_xchange)				{					while(!m_xchange.isReceived())					{						try						{							m_xchange.wait();						}						catch(InterruptedException e)						{							e.printStackTrace();							return;						}					}					try					{						m_stream.writeInt(len);						m_stream.write(data, 0, data.length);						m_stream.flush();					}					catch(IOException ioE)					{						ioE.printStackTrace();						return;					}					m_xchange.setSent(data);				}				synchronized(this)				{					m_monitor++;				}			}		}	}	static class Receiver		extends Thread	{		private DataInputStream m_stream;		private BufferXchange m_xchange;		private int m_monitor;			Receiver(BufferXchange xchange, InputStream ins)		{			super("Receiver" + (System.currentTimeMillis() / 1000) % (24*3600));			m_stream = new DataInputStream(new BufferedInputStream(ins, 8192));			m_xchange = xchange;			m_monitor = 0;		}		synchronized int getMonitor()		{			return m_monitor;		}		public void run()		{			for(;;)			{				synchronized(m_xchange)				{					while(!m_xchange.isSent())					{						try						{							m_xchange.wait();						}						catch(InterruptedException e)						{							e.printStackTrace();						}					}										byte[] compdata = m_xchange.getBuffer();					try					{						int len = m_stream.readInt();						if(len != compdata.length)						{							System.out.println("Invalid Stream Marker (" + len + " != "  + compdata.length + ")");							return;						}						byte[] data = new byte[len];						m_stream.readFully(data);						for(int i = 0; i < data.length; i++)						{							if(data[i] != compdata[i])							{								System.out.println("Invalid Stream Data");								return;							}						}						m_xchange.setRecipt();					}					catch(IOException ioE)					{						ioE.printStackTrace();						return;					}				}				synchronized(this)				{					m_monitor++;				}			}		}	}	static class Pair	{		private Object m_first;		private Object m_second;		Pair(Object first, Object second)		{			m_first = first;			m_second = second;		}		Object getFirst()		{			return m_first;		}		Object getSecond()		{			return m_second;		}	}	public static void main(String[] args)	{		long now   = System.currentTimeMillis();		long start = 0;		LinkedList srlist = new LinkedList();		LinkedList cntlist = new LinkedList();		for(;;)		{			now = System.currentTimeMillis();			if((now - start) > 60000 && srlist.size() < 128) // five minutes have passed			{				start = System.currentTimeMillis();				System.out.println("Starting new echo thread");				try				{					Socket s = new Socket("127.0.0.1", 7);					s.setTcpNoDelay(true);					BufferXchange xchange = new BufferXchange();					Receiver recv = new Receiver(xchange, s.getInputStream());					Sender snd = new Sender(xchange, s.getOutputStream());					srlist.add(new Pair(snd, recv));					cntlist.add(new Pair(new Integer(0), new Integer(0)));					recv.start();					snd.start();				}				catch(Exception e)				{					e.printStackTrace();				}			}			try			{				Thread.sleep(10000);			}			catch(InterruptedException ie)			{				ie.printStackTrace();			}			//			// check the senders and receivers			//			for(int x = 0; x < srlist.size(); x++)			{				Pair a = (Pair) srlist.get(x);				Pair b = (Pair) cntlist.get(x);				int old = ((Integer)b.getFirst()).intValue();				int y = ((Sender)a.getFirst()).getMonitor();				if(y == old)				{					System.out.println("Count has not changed for " + a.getFirst());					continue;				}				old = ((Integer)b.getSecond()).intValue();				int z   = ((Receiver)a.getSecond()).getMonitor();				if(z == old)				{					System.out.println("Count has not changed for " + a.getSecond());					continue;				}				cntlist.set(x, new Pair(new Integer(y), new Integer(z)));			}		} // end infinite loop	}}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?