⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 listener.java

📁 java语言开发的P2P流媒体系统
💻 JAVA
字号:
/* Stream-2-Stream - Peer to peer television and radio
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */
package stream2stream.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;

import p2pradio.logging.Logger;
import p2pradio.packets.PacketFactory;
import stream2stream.XML.SettingsXML;

/*
 * If the streams and messages transport types are the same e.g. (TCP streams, TCP messages), 
 * streams and messages will be received over the same socket.  
 * This means only one instance of listener will exist.
 * 
 * Otherwise, if the transport types are different (e.g. TCP streams, UDP messages), 
 * multiple instances of Listener will exist.  Streams and messages will be received over
 * different sockets.
 * 
 * Possible combos - 
 * TCP 2000 messages * UDP Stream          - 2 instances 
 * TCP 2000 messages * m+ 2000 Stream      - 2 instances
 * TCP 2000 messages * TCP 2000 Stream     - 1 instance
 * UDP 2000 messages * UDP				   - 1 instance
 * UDP 2000 messages * m+ 2000 Stream	   - 1 instance
 * UDP 2000 messages * TCP 2000 Stream     - 2 instances
 ****2000 is the default port
 * 
 */

public class Listener extends Thread {
	private UniversalServerSocket socket;
	private MulticastSocket multicastSocket;
	private UniversalSocket universalMulticast;
	private ServerSocket tcpSocket;
	private DatagramSocket udpSocket;
	private Peer peer;
	private SettingsXML xml;
	private int mode;
	private boolean multicastPlus;
	private boolean tcp;
	private boolean messages;
	private boolean streams;
	private boolean shutdown;
	private HashSet messageDispatchers;
	private HashSet streamDispatchers;
	private Transmitter multicastTransmitter;
	private int port;
	private InetAddress group;
	private InetSocketAddress address;
	public static final int SERVER_UDP_UPDATE_INTERVAL=10000; //10 seconds
	
	public static final String c = "Listener";
	public static final int LOWER_BOUND = 0xE0000100; // 224.0.1.0
	public static final int UPPER_BOUND = 0xEFFFFFFF;    // 239.255.255.255
	public static final int TIME_TO_CHECK_FOR_OPEN_MULTICAST = 1000; //1 second
	public static final int TCP_MESSAGES = 0;
	public static final int TCP_STREAMS = 1;
	public static final int TCP_BOTH = 2;
	public static final int UDP_MESSAGES = 3;
	public static final int UDP_STREAMS = 4;
	public static final int UDP_BOTH = 5;
	public static final int MULTICAST_PLUS_STREAMS = 6;
	public static final int MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES = 7;
	public Listener(Peer peer, int mode)
	{
		super ("Listener");
		shutdown = false;
		this.peer = peer;
		xml = peer.getXML();
		this.mode = mode;
		multicastPlus = (mode == MULTICAST_PLUS_STREAMS || mode == MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
		tcp = (mode == TCP_MESSAGES || mode == TCP_STREAMS || mode == TCP_BOTH);
		messages = (mode == TCP_MESSAGES || mode == TCP_BOTH || mode == UDP_MESSAGES || mode == UDP_BOTH || mode == MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
		streams = (mode == TCP_STREAMS || mode == TCP_BOTH || mode == UDP_STREAMS || mode == UDP_BOTH || mode == MULTICAST_PLUS_STREAMS || mode == MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
		if (messages)
			messageDispatchers = new HashSet();
		if (streams)
			streamDispatchers = new HashSet();
		if (MessageDispatcher.acceptedPeers == null)
			MessageDispatcher.acceptedPeers = new HashSet();
	}
	public void run()
	{
		String[] transport = {"UDP", "" + peer.getPort()};
		if (tcp)  //on TCP port + port
			transport[0] = "TCP";
		
		if (multicastPlus)
		{
			serveMulticastStream();
		}
		
		if (streams)
			Logger.info("Listener", "Listener.LISTENING_STREAMS", transport);
		if (messages) 
			Logger.info("Listener", "Listener.LISTENING_MESSAGES", transport);
		
		//if (multicast+ && streams)
		//Run transmitter on open multicast address
		try{
			if (tcp)
			{
				tcpSocket = new ServerSocket(peer.getPort());
				socket = new UniversalServerSocket(tcpSocket);
			}
			else
			{
				udpSocket = new DatagramSocket(peer.getPort());
				socket = new UniversalServerSocket(udpSocket);
			}
			socket.setSoTimeout(0);
		}
		catch (IOException e) {
			e.printStackTrace();
		}
		if (shutdown)
			return;
		int i = 0;
		while (!shutdown)
		{
			i++;
			UniversalSocket newSocket = null;
			try {
				//if (socket == null)
					//System.out.println("its null");
				newSocket = socket.accept();
				if (newSocket.getMode() == SettingsXML.UDP)
					newSocket.setBuffer(new byte[PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
				else
					newSocket.setBuffer(new byte[PacketFactory.TCP_OUTPUTSTREAM_BUFFER]);
				if (newSocket.getMode() == SettingsXML.UDP && peer.isServer())
					newSocket.setSoTimeout(SERVER_UDP_UPDATE_INTERVAL);
				if (shutdown)
					return;
			} catch (IOException e) {
				//Logger.fine("TCPListener", "IO_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
				if (newSocket != null)
					try
					{
						//System.out.println("listener");
						newSocket.close();
					}
					catch (IOException ee)
					{
					}
				
					// N鋍hste Verbindung annehmen
					continue;
			}
			if (messages)
			{
				MessageDispatcher messageDispatcher = new MessageDispatcher(this, newSocket, streams);
				messageDispatcher.start();
				messageDispatchers.add(messageDispatcher);
				continue;
			}
			if (shutdown)
				return;
			if (streams)
			{
				StreamDispatcherForChildren streamDispatcher = new StreamDispatcherForChildren(this, newSocket);
				streamDispatcher.start();
				streamDispatchers.add(streamDispatcher);
			}
			
		}
	}
	
	private void serveMulticastStream()
	{
		address = null;
		if (peer.isServer())
		{
			String multicastAddress = xml.getMulticastAddress();
			
			if (multicastAddress == null || multicastAddress.equals(""))
			{
				findOpenMulticastSocket();
			} 
			else
			{
				int colon = multicastAddress.indexOf(":");
				if (colon == -1)
				{
					try {
						group = InetAddress.getByName(multicastAddress);
						findOpenPort();
					} catch (UnknownHostException e1) {
					}
				}
				else
				{
					try {
						group = InetAddress.getByName(multicastAddress.substring(0, colon));
						port = Integer.parseInt(multicastAddress.substring(colon + 1));
					} catch (UnknownHostException e) {
					}
				}
			}
			address = new InetSocketAddress(group, port);
		}
		else if(peer.canServePeers(2) && !peer.getSupplier().isMulticastAddress())//&& !peer.isServer()  is client and can serve 2 more peers
		{
			address = peer.getMulticastAddress();
		}
		else 
			return;
		peer.setMulticastAddress(address);
		try {
			multicastSocket = new MulticastSocket();
			multicastSocket.setTimeToLive(peer.getMulticastTTL());
			universalMulticast = new UniversalSocket(multicastSocket);
			universalMulticast.setBuffer(new byte [PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
			universalMulticast.connect(address);
			multicastTransmitter = new Transmitter(this, new RemotePeer(address), universalMulticast);
			multicastTransmitter.start();
			peer.addChild(new RemotePeer(address));
			peer.addRemoteChild(new RemotePeer(address));
			Logger.info(c, c + ".MULTICASTING", address);
		} catch (IOException e) {
			// TODO Auto-generated catch block
		}
		catch (PeerException e) {
		}
	}
	/**
	 * Check for a multicast IP address and port that is unused.
	 * Range is 224.0.1.0 through 239.255.255.255
	 */
	private void findOpenMulticastSocket()
	{
		//Get a random IP, then get a random port
		//If the IP is occupied go to the next consecutive IP
		Random generator = new Random();
		openSocket();
		boolean OK = false;
		
		try {
			multicastSocket.setSoTimeout(TIME_TO_CHECK_FOR_OPEN_MULTICAST);
		} catch (SocketException e1) {
		}
		OK = false;
		
		int offset = generator.nextInt(UPPER_BOUND - LOWER_BOUND);
		int addr = LOWER_BOUND + offset; //Somewhere between the upper and lower bound
		byte[] b = new byte[4];
		DatagramPacket packet = new DatagramPacket(new byte[1000],1000);
		while (!OK)
		{
			b[3] = (byte) addr;
			addr >>= 8;
			b[2] = (byte) addr;
			addr >>= 8;
			b[1] = (byte) addr;
			addr >>= 8;
			b[0] = (byte) addr;
			addr++;
			if (addr > UPPER_BOUND)
				addr = LOWER_BOUND;
			try {
				group = InetAddress.getByAddress(b);
				multicastSocket.joinGroup(group);
			} catch (UnknownHostException e) {
				continue;
			}
			catch (IOException e) {
				continue;
			}
			
			try {
				multicastSocket.receive(packet);
			} catch (SocketTimeoutException e) {
				OK = true;  //if the socket timed out, then the address is open
			}
			catch (IOException e) {
			}
			
		}
		if (multicastSocket != null)
			multicastSocket.close();
	}
	private void findOpenPort()
	{
		openSocket();
		if (multicastSocket != null)
			multicastSocket.close();
	}
	private void openSocket()
	{
		while (true)
		{
			Random generator = new Random();
			port = 1024 + generator.nextInt((65535 - 2) - 1024);
			try {
				multicastSocket = new MulticastSocket(port);
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				continue;
			}
			break;
		}
	}
	
	public void shutdown()
	{
		shutdown = true;
		if (messageDispatchers != null)
		{
			Iterator iter = messageDispatchers.iterator();
			while (iter.hasNext())
				((MessageDispatcher)iter.next()).shutdown();
		}
		if (streamDispatchers != null)
		{
			Iterator iter = streamDispatchers.iterator();
			while (iter.hasNext())
				((StreamDispatcherForChildren)iter.next()).shutdown();
		}
		if (multicastTransmitter != null)
			multicastTransmitter.shutdown();
		if (socket != null)
		{
			try {
				socket.close();
			} catch (IOException e) {
			}
		}
		if (multicastSocket != null)
			multicastSocket.close();
		
	}
	public Peer getPeer()
	{
		return peer;
	}
	public HashSet getMessageDispatchers()
	{
		return messageDispatchers;
	}
	public HashSet getStreamDispatchers()
	{
		return streamDispatchers;
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -