streamdispatcherforchildren.java

来自「java语言开发的P2P流媒体系统」· Java 代码 · 共 374 行

JAVA
374
字号
/* Stream-2-Stream - Peer to peer television and radio
 * October 13, 2005 - This file has been modified from the original P2P-Radio source
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 */

/* 
 * P2P-Radio - Peer to peer streaming system
 * Project homepage: http://p2p-radio.sourceforge.net/
 * Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
 * 
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */

package stream2stream.network;

import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.*;
import stream2stream.XML.SettingsXML;

import java.io.*;
import java.net.*;

/**
 * Processes all packets that it receives from its child.
 * <P>
 * First, the child has to identify itself. If the child refuses to do this,
 * this thread cuts the connection to the child.
 * <P>
 * After the child has been identified, this thread starts a {@link Transmitter} for the child.
 * <P>
 * The remaining mission is to free ressources after the child disconnects. 
 *
 * @author Michael Kaufmann
 */
public class StreamDispatcherForChildren extends Thread
{
	// Innerhalb dieser Zeit muss das Kind eine TCP-Verbindung aufbauen
	public static final int TIME_FOR_CONNECTION_ESTABLISHMENT = 5000;
	
	// Innerhalb dieser Zeit muss sich der neue Peer identifizieren
	public static final int TIME_FOR_IDENTIFICATION = 5000;
	
	private Peer peer;
	private Listener listener;
	
	// Solange childPeer null ist, hat sich das Kind noch nicht
	// identifiziert
	private RemotePeer childPeer;
	private RemotePeerData childData;
	private Packet packet;
	
	private UniversalSocket socket;
	private boolean shutdown, receive;
	private Transmitter transmitter;
	public static final String c = "StreamDispatcherForChildren"; //ClassName
	
	/**
	 * Creates a new TCP Dispatcher for a child.
	 * 
	 * @param peer The peer that this dispatcher belongs to
	 * @param socket The socket of the connection with the child (needed to set timeouts)  
	 * @param inputStream The input stream of the connection with the child
	 * @param outputStream The output stream of the connection with the child  
	 */
	public StreamDispatcherForChildren(Listener listener, UniversalSocket socket)
	{
		this(listener, socket, null);
	}
	public StreamDispatcherForChildren(Listener listener, UniversalSocket socket, Packet packet)
	{
		super(Messages.getString(c + ".THREAD_NAME")); //$NON-NLS-1$
		setDaemon(true);
		shutdown = false;
		
		this.packet = packet;
		this.peer = listener.getPeer();
		this.listener = listener;
		this.socket = socket;
		receive = packet == null;
	}
	
	
	public void run()
	{
		Logger.finer(c, c + ".THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
		
		try
		{
			if (shutdown)
				return;
			// Timeout setzen
			//System.out.println("STREAMCHILDPORT:" + socket.getLocalPort());
			try
			{
				socket.setSoTimeout(TIME_FOR_IDENTIFICATION);
			}
			catch (Exception e)
			{
				Logger.warning(c, "COULD_NOT_SET_SOCKET_TIMEOUT"); //$NON-NLS-1$ //$NON-NLS-2$
				return;
			}
			if (shutdown)
				return;
		
			// Endlosschleife
			while (true)
			{
				if (hasChildBeenRemoved())
				{
					return;
				}
				if (shutdown)
					return;
				if (receive)
				{
					try{
						packet = socket.receive();
					}
					catch (SocketTimeoutException e)
					{
							
						if (childPeer == null)
						{
							Logger.fine(c, c + ".CHILD_HAS_NOT_IDENTIFIED_ITSELF_TIME_IS_UP"); //$NON-NLS-1$ //$NON-NLS-2$
							return;
						}
						else
						{
							// Diesen Fall sollte es nicht geben
							Logger.severe(c, "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
							return;
						}
					}
					catch (EOFException e)
					{
						if (!hasChildBeenRemoved())
						{
							Logger.finer(c, c + ".STREAM_FROM_CHILD_ENDED"); //$NON-NLS-1$ //$NON-NLS-2$
						}
						return;
					}
					catch (IOException e)
					{
						if (!hasChildBeenRemoved())
						{
							Logger.fine("c", "IO_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
						}
						return;
					}
				}
				receive = true;
				if (shutdown)
					return;
				if (packet == null)
					return;
				if (hasChildBeenRemoved())
				{
					return;
				}
				
				if (shutdown)
					return;

				// System.out.println(actionPacket);	
				
				synchronized (peer)
				{
					// Hat sich das Kind noch nicht identifiziert?
					if (childPeer == null)
					{
						// Will der andere Peer nur ein Pong-Paket?
						if (packet instanceof PingPacket)
						{
							try
							{
								if (shutdown)
									return;
								socket.send(new PongPacket());
							}
							catch (IOException e)
							{
								// Nicht melden
							}
							
							// Verbindung schliessen, Thread beenden  --  Connection close, Thread terminate
							return;
						}
					
						
						if (!(packet instanceof IdentificationPacket))
						{
							Logger.fine(c, c + ".PEER_HAS_NOT_IDENTIFIED_ITSELF"); //$NON-NLS-1$ //$NON-NLS-2$
							
							// Thread beenden und Verbindung schliessen
							return;
						}
						else
						{	
							// Peer will sich identifizieren
							int port = ((IdentificationPacket)packet).getSenderPort();
							InetAddress addressOnly = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
							InetSocketAddress sender = new InetSocketAddress(addressOnly, port);
							if (shutdown)
								return;
							childPeer = peer.searchChildWithAddress(sender);
							if (shutdown)
								return;
							
							if (childPeer == null)
							{
								// Kein bekanntes Kind
								Logger.fine(c, c + ".CHILD_IS_NOT_KNOWN", sender); //$NON-NLS-1$ //$NON-NLS-2$
															
								// Thread beenden und Verbindung schliessen
								return;
							}
							
							childData = peer.getChildData(childPeer);
					
							// Peer-Daten setzen
							childData.setSocket(socket);
							childData.setStreamDispatcher(this);
							if (shutdown)
								return;
							
							// Timeout aufheben
							try
							{
								if (socket.getMode() == SettingsXML.TCP)
									socket.setSoTimeout(0);
								else
									socket.setSoTimeout(Listener.SERVER_UDP_UPDATE_INTERVAL);
							}
							catch (Exception e)
							{
								Logger.warning(c, "COULD_NOT_SET_SOCKET_TIMEOUT"); //$NON-NLS-1$ //$NON-NLS-2$
								
								// Thread beenden und Verbindung schliessen
								return;
							}
							if (shutdown)
								return;
							
							// Transmitter zum Kind-Peer starten
							if (packet instanceof IdentificationSeqNrPacket)
							{
								IdentificationSeqNrPacket idseqPacket = (IdentificationSeqNrPacket)packet;
								transmitter = new Transmitter(listener, childPeer, socket, idseqPacket.getResumeSeqNr(), idseqPacket.getHeaderSeqNr(), idseqPacket.getMetadataSeqNr());
							}
							else
							{
								transmitter = new Transmitter(listener, childPeer, socket);
							}
							if (shutdown)
								return;
							
							transmitter.start();
							childData.setTransmitter(transmitter);
														
							Logger.finer(c, c + ".NEW_CHILD", childPeer); //$NON-NLS-1$ //$NON-NLS-2$
						}										
					}
					else if (packet instanceof UDPUpdatePacket)
					{
						;//Do nothing;
					}
					else if (packet instanceof LeavePacket)
					{		
						// Das Kind will gehen
						
						// Thread beenden und Kind entfernen
						return;
					}
					else
					{
						// Nicht erwartetes Paket erhalten
						// Nichts darauf antworten
						Logger.fine(c, "UNEXPECTED_PACKET_RECEIVED", packet); //$NON-NLS-1$ //$NON-NLS-2$
					}
				}
			}
		}
		catch (Exception e)
		{
			Logger.severe(c, "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
		}
		finally
		{
			// Aufr鋟men
			
			// isChild(...) und removeChild(...) m黶sen direkt
			// aufeinander folgen
			synchronized(peer)
			{
				//if (transmitter != null)
					//transmitter.shutdown();
				// Hatte sich das Kind schon identifiziert?
				if (childPeer != null)
				{
					// Wurde das Kind noch nicht entfernt?
					// Achtung: Das Kind k鰊nte schon wieder ein Join gemacht haben!
					// Falsch: if (peer.isChild(childPeer)) ...
//					the child was not removed yet? //note: The child could have already made a Join again! //wrong: if (more peer.isChild(childPeer))...
					if (childData.isConnected())
					{
						peer.removeChild(childPeer);
					}
				}
				else
				{
					// Kind hatte sich noch nicht identifiziert
						
					try
					{
						//System.out.println("stream1");
						socket.close();
					}
					catch (IOException e)
					{
					}
						
					// Jenes Kind entfernen, das urspr黱glich diese
					// TCP-Verbindung aufgebaut hatte 
					peer.removeUnconnectedChildren();
				}
			}
		}
	}
	
	
	private boolean hasChildBeenRemoved()
	{
		if (childPeer == null)
		{
			return false;
		}
		else
		{
			// Nicht das peer-Objekt fragen!
			// (Siehe Kommentare weiter oben)
			return !childData.isConnected();
		}
	}
	public void shutdown()
	{
		shutdown = true;
		listener.getStreamDispatchers().remove(this);
		if (transmitter != null)
			transmitter.shutdown();  //will close socket
		else
			try {
				//System.out.println("stream2");
				socket.close();
			} catch (IOException e) {
			}
	}
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?