📄 listener.java
字号:
/* Stream-2-Stream - Peer to peer television and radio
* Project homepage: http://s2s.sourceforge.net/
* Copyright (C) 2005-2006 Jason Hooks
* ---------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
* ---------------------------------------------------------------------------
*/
package stream2stream.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import p2pradio.logging.Logger;
import p2pradio.packets.PacketFactory;
import stream2stream.XML.SettingsXML;
/*
* If the streams and messages transport types are the same e.g. (TCP streams, TCP messages),
* streams and messages will be received over the same socket.
* This means only one instance of listener will exist.
*
* Otherwise, if the transport types are different (e.g. TCP streams, UDP messages),
* multiple instances of Listener will exist. Streams and messages will be received over
* different sockets.
*
* Possible combos -
* TCP 2000 messages * UDP Stream - 2 instances
* TCP 2000 messages * m+ 2000 Stream - 2 instances
* TCP 2000 messages * TCP 2000 Stream - 1 instance
* UDP 2000 messages * UDP - 1 instance
* UDP 2000 messages * m+ 2000 Stream - 1 instance
* UDP 2000 messages * TCP 2000 Stream - 2 instances
****2000 is the default port
*
*/
public class Listener extends Thread {
private UniversalServerSocket socket;
private MulticastSocket multicastSocket;
private UniversalSocket universalMulticast;
private ServerSocket tcpSocket;
private DatagramSocket udpSocket;
private Peer peer;
private SettingsXML xml;
private int mode;
private boolean multicastPlus;
private boolean tcp;
private boolean messages;
private boolean streams;
private boolean shutdown;
private HashSet messageDispatchers;
private HashSet streamDispatchers;
private Transmitter multicastTransmitter;
private int port;
private InetAddress group;
private InetSocketAddress address;
public static final int SERVER_UDP_UPDATE_INTERVAL=10000; //10 seconds
public static final String c = "Listener";
public static final int LOWER_BOUND = 0xE0000100; // 224.0.1.0
public static final int UPPER_BOUND = 0xEFFFFFFF; // 239.255.255.255
public static final int TIME_TO_CHECK_FOR_OPEN_MULTICAST = 1000; //1 second
public static final int TCP_MESSAGES = 0;
public static final int TCP_STREAMS = 1;
public static final int TCP_BOTH = 2;
public static final int UDP_MESSAGES = 3;
public static final int UDP_STREAMS = 4;
public static final int UDP_BOTH = 5;
public static final int MULTICAST_PLUS_STREAMS = 6;
public static final int MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES = 7;
public Listener(Peer peer, int mode)
{
super ("Listener");
shutdown = false;
this.peer = peer;
xml = peer.getXML();
this.mode = mode;
multicastPlus = (mode == MULTICAST_PLUS_STREAMS || mode == MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
tcp = (mode == TCP_MESSAGES || mode == TCP_STREAMS || mode == TCP_BOTH);
messages = (mode == TCP_MESSAGES || mode == TCP_BOTH || mode == UDP_MESSAGES || mode == UDP_BOTH || mode == MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
streams = (mode == TCP_STREAMS || mode == TCP_BOTH || mode == UDP_STREAMS || mode == UDP_BOTH || mode == MULTICAST_PLUS_STREAMS || mode == MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
if (messages)
messageDispatchers = new HashSet();
if (streams)
streamDispatchers = new HashSet();
if (MessageDispatcher.acceptedPeers == null)
MessageDispatcher.acceptedPeers = new HashSet();
}
public void run()
{
String[] transport = {"UDP", "" + peer.getPort()};
if (tcp) //on TCP port + port
transport[0] = "TCP";
if (multicastPlus)
{
serveMulticastStream();
}
if (streams)
Logger.info("Listener", "Listener.LISTENING_STREAMS", transport);
if (messages)
Logger.info("Listener", "Listener.LISTENING_MESSAGES", transport);
//if (multicast+ && streams)
//Run transmitter on open multicast address
try{
if (tcp)
{
tcpSocket = new ServerSocket(peer.getPort());
socket = new UniversalServerSocket(tcpSocket);
}
else
{
udpSocket = new DatagramSocket(peer.getPort());
socket = new UniversalServerSocket(udpSocket);
}
socket.setSoTimeout(0);
}
catch (IOException e) {
e.printStackTrace();
}
if (shutdown)
return;
int i = 0;
while (!shutdown)
{
i++;
UniversalSocket newSocket = null;
try {
//if (socket == null)
//System.out.println("its null");
newSocket = socket.accept();
if (newSocket.getMode() == SettingsXML.UDP)
newSocket.setBuffer(new byte[PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
else
newSocket.setBuffer(new byte[PacketFactory.TCP_OUTPUTSTREAM_BUFFER]);
if (newSocket.getMode() == SettingsXML.UDP && peer.isServer())
newSocket.setSoTimeout(SERVER_UDP_UPDATE_INTERVAL);
if (shutdown)
return;
} catch (IOException e) {
//Logger.fine("TCPListener", "IO_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
if (newSocket != null)
try
{
//System.out.println("listener");
newSocket.close();
}
catch (IOException ee)
{
}
// N鋍hste Verbindung annehmen
continue;
}
if (messages)
{
MessageDispatcher messageDispatcher = new MessageDispatcher(this, newSocket, streams);
messageDispatcher.start();
messageDispatchers.add(messageDispatcher);
continue;
}
if (shutdown)
return;
if (streams)
{
StreamDispatcherForChildren streamDispatcher = new StreamDispatcherForChildren(this, newSocket);
streamDispatcher.start();
streamDispatchers.add(streamDispatcher);
}
}
}
private void serveMulticastStream()
{
address = null;
if (peer.isServer())
{
String multicastAddress = xml.getMulticastAddress();
if (multicastAddress == null || multicastAddress.equals(""))
{
findOpenMulticastSocket();
}
else
{
int colon = multicastAddress.indexOf(":");
if (colon == -1)
{
try {
group = InetAddress.getByName(multicastAddress);
findOpenPort();
} catch (UnknownHostException e1) {
}
}
else
{
try {
group = InetAddress.getByName(multicastAddress.substring(0, colon));
port = Integer.parseInt(multicastAddress.substring(colon + 1));
} catch (UnknownHostException e) {
}
}
}
address = new InetSocketAddress(group, port);
}
else if(peer.canServePeers(2) && !peer.getSupplier().isMulticastAddress())//&& !peer.isServer() is client and can serve 2 more peers
{
address = peer.getMulticastAddress();
}
else
return;
peer.setMulticastAddress(address);
try {
multicastSocket = new MulticastSocket();
multicastSocket.setTimeToLive(peer.getMulticastTTL());
universalMulticast = new UniversalSocket(multicastSocket);
universalMulticast.setBuffer(new byte [PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
universalMulticast.connect(address);
multicastTransmitter = new Transmitter(this, new RemotePeer(address), universalMulticast);
multicastTransmitter.start();
peer.addChild(new RemotePeer(address));
peer.addRemoteChild(new RemotePeer(address));
Logger.info(c, c + ".MULTICASTING", address);
} catch (IOException e) {
// TODO Auto-generated catch block
}
catch (PeerException e) {
}
}
/**
* Check for a multicast IP address and port that is unused.
* Range is 224.0.1.0 through 239.255.255.255
*/
private void findOpenMulticastSocket()
{
//Get a random IP, then get a random port
//If the IP is occupied go to the next consecutive IP
Random generator = new Random();
openSocket();
boolean OK = false;
try {
multicastSocket.setSoTimeout(TIME_TO_CHECK_FOR_OPEN_MULTICAST);
} catch (SocketException e1) {
}
OK = false;
int offset = generator.nextInt(UPPER_BOUND - LOWER_BOUND);
int addr = LOWER_BOUND + offset; //Somewhere between the upper and lower bound
byte[] b = new byte[4];
DatagramPacket packet = new DatagramPacket(new byte[1000],1000);
while (!OK)
{
b[3] = (byte) addr;
addr >>= 8;
b[2] = (byte) addr;
addr >>= 8;
b[1] = (byte) addr;
addr >>= 8;
b[0] = (byte) addr;
addr++;
if (addr > UPPER_BOUND)
addr = LOWER_BOUND;
try {
group = InetAddress.getByAddress(b);
multicastSocket.joinGroup(group);
} catch (UnknownHostException e) {
continue;
}
catch (IOException e) {
continue;
}
try {
multicastSocket.receive(packet);
} catch (SocketTimeoutException e) {
OK = true; //if the socket timed out, then the address is open
}
catch (IOException e) {
}
}
if (multicastSocket != null)
multicastSocket.close();
}
private void findOpenPort()
{
openSocket();
if (multicastSocket != null)
multicastSocket.close();
}
private void openSocket()
{
while (true)
{
Random generator = new Random();
port = 1024 + generator.nextInt((65535 - 2) - 1024);
try {
multicastSocket = new MulticastSocket(port);
} catch (IOException e1) {
// TODO Auto-generated catch block
continue;
}
break;
}
}
public void shutdown()
{
shutdown = true;
if (messageDispatchers != null)
{
Iterator iter = messageDispatchers.iterator();
while (iter.hasNext())
((MessageDispatcher)iter.next()).shutdown();
}
if (streamDispatchers != null)
{
Iterator iter = streamDispatchers.iterator();
while (iter.hasNext())
((StreamDispatcherForChildren)iter.next()).shutdown();
}
if (multicastTransmitter != null)
multicastTransmitter.shutdown();
if (socket != null)
{
try {
socket.close();
} catch (IOException e) {
}
}
if (multicastSocket != null)
multicastSocket.close();
}
public Peer getPeer()
{
return peer;
}
public HashSet getMessageDispatchers()
{
return messageDispatchers;
}
public HashSet getStreamDispatchers()
{
return streamDispatchers;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -