⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer.java

📁 java语言开发的P2P流媒体系统
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* Stream-2-Stream - Peer to peer television and radio
 * October 13, 2005 - This file has been modified from the original P2P-Radio source
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 */

/* 
 * P2P-Radio - Peer to peer streaming system
 * Project homepage: http://p2p-radio.sourceforge.net/
 * Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
 * 
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */

package stream2stream.network;
import p2pradio.logging.*;
import p2pradio.players.ListenBuffer;
import p2pradio.sources.BroadcastBuffer;
import p2pradio.packets.*;
import p2pradio.packets.security.*;
import stream2stream.XML.SettingsXML;
import p2pradio.event.*;

import java.io.*;
import java.net.*;
import java.util.*;

import p2pradio.*;

/**
 * A server or a normal peer in the P2P network.
 * It starts the threads and manages the buffer, the supplier and the children.
 *
 * @author Michael Kaufmann
 */
public class Peer extends RemotePeer
{
	private UI ui;
	// Der Server (die Wurzel des Baums)
	private RemotePeer server = null;
	
	// Ist dieser Peer der Server?
	private boolean isServer;
	
	// Maximale Upload-Bandbreite
	private double maxUploadBandwidth;
	
	// Der "Zulieferer"-Peer des Datenstroms
	private RemotePeer supplier = null;
	private RemotePeer supplierFather = null;
	private InetSocketAddress multicastAddress;
	
	private UniversalSocket supplierSocket;
	
	private Vector children = new Vector();
	private LinkedList localChildren = new LinkedList();
	private LinkedList remoteChildren = new LinkedList();
	private Map childrenData = new HashMap();
	private Vector bannedPeers = new Vector();
	
	private Random random = new Random();
	
	// Soll sich der Peer fies verhalten?
	private boolean misbehavior[] = new boolean[p2pradio.monitor.Commands.LAST_COMMAND];
	public static final long MISBEHAVIOR_PACKET_DELAY = 1000;
	
	// Das Monitor-Objekt dieses Peers
	private MonitorLogHandler monitorLogHandler;
		
	// War der Peer schon einmal verbunden?
	private boolean wasOnceConnected = false;
	
	 
	// Puffer
	private Buffer buffer;
	private BroadcastBuffer broadcastBuffer;
	
	// Threads
	private JoinThread joinThread;
	private ConnectThread connectThread;
	private FreeloaderForgetter freeloaderForgetter;
	private StreamDispatcherForSupplier streamDispatcherForSupplier;
	private Listener listener1;
	private Listener listener2;
	private UniversalSocket messageSocket;
	
	
	public static final int DEFAULT_PORT_NR = 2000;
	public static final int PLAYER_PORT_OFFSET = 2;
	public static final int WEBINTERFACE_PORT_OFFSET = 1;
	
	public static final int MONITOR_PORT = 39491;
	
	public static final int MAX_FREELOADER_COMPLAINTS = 3;
	public static final int MAX_NUMBER_OF_CHILDREN = 2;
	
	
	
	// Authentifizierung
	private SignatureGenerator signatureGenerator;
	private SignatureChecker signatureChecker;
	public static boolean signStreamPackets = Radio.signOrVerify;
	public static boolean verifyStreamPackets = Radio.signOrVerify;
	private long startTime;								   //Start Time (in milliseconds)
	private long Time_Server_Has_Been_Running_At_Startup;  //The time the server has been running when we start P2PRadio (in milliseconds)
															//Will always be 0 if we are the server
	private boolean lagIsValid, verifiedBandwidth;
	private boolean addYP, isConnected;
	private SettingsXML xml;
	private double LANUpload, LANDownload, internetMaxUpload, internetMaxDownload, internetUpload, internetDownload;
	private String IP;
	private int port, messageMode, streamMode, multicastTTL;
	private UDPPeerUpdater udpPeerUpdater;
	protected Peer(RemotePeer server, int myPort, boolean isServer, double maxUploadBandwidth, InetSocketAddress monitorAddress, UI ui ) throws PeerException
	{
		super(null);
		isConnected = false;
		this.isServer = isServer;
		this.server = server;
		port = myPort;
		this.ui = ui;
		xml = ui.getSettingsXML();
		readXML();
		lagIsValid = false;
		startTime=Time_Server_Has_Been_Running_At_Startup=0;
		try
		{
			InetSocketAddress ownAddress = new InetSocketAddress(InetAddress.getLocalHost(), myPort);
			setSocketAddress(ownAddress);
		}
		catch (UnknownHostException e)
		{
			// Sollte nicht vorkommen
			Logger.severe("Peer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
		}
		
		
		
		// TCP-Socket zuerst erstellen, nachher weiss man die eigene IP-Adresse
		
		// Monitor starten
		monitorLogHandler = new MonitorLogHandler(this);
		monitorLogHandler.setFormatter(new LogFormatter());
		Logger.getLogger().addHandler(monitorLogHandler);
		
		if (Radio.enableMonitor)
		{
			if (monitorAddress != null)
			{
				monitorLogHandler.setMonitorAddress(monitorAddress);
			}
		}
		//Me - - - -Look over this
		// Authentifizierung
		if (signStreamPackets && isServer)
		{
			signatureGenerator = new SignatureGenerator();
		}
		
		// Puffer
		buffer = new Buffer(signatureGenerator);
		
		multicastTTL = xml.getTTL();
		if (isServer)
		{
			createListeners();
		}
		// FreeloaderForgetter initialisieren und gleich starten
		// (Der tut sowieso fast nichts)
		freeloaderForgetter = new FreeloaderForgetter(this);
		freeloaderForgetter.start();
		
		Time_Server_Has_Been_Running_At_Startup = 0;
		// Dem Netz beitreten, falls dieser Peer kein Server ist
		if (!isServer)
		{
			/*boolean exit = connectToServer();
			if (exit)
			{
				shutdown();
				return;
			}*/
			connectToServer();
		}
		
		// Dispatcher starten, falls dieser Peer der Server ist
		// Falls er nicht der Server ist, wird der Dispatcher nach
		// dem ersten erfolgreichen Join gestartet
		if (isServer)
		{	
			Logger.fine("Peer", "Peer.SERVER_READY"); //$NON-NLS-1$ //$NON-NLS-2$
		}
		
//		Radio.httpPlayer = new HttpPlayer(peer.getBuffer(), peer.getSocketAddress().getPort() + Peer.STREAM_PORT_OFFSET);
		
		
		
				
		//setPeer(peer);		//Set the GUI Mainframe's peer
		//System.out.println(peer.isDisconnected());
	}
	
	
	// Konstruktor f黵 Server
	
	/**
	 * Creates a new source peer.
	 * 
	 * @see #Peer(int,int,InetSocketAddress) 
	 */
	public Peer(int myPort, UI ui) throws PeerException
	{
		this(null, myPort, true, 0, null, ui);
	}
	
	// Konstruktor f黵 Server mit Monitor
	
	/**
	 * Creates a new source peer that sends notification messages to a monitor.
	 * 
	 * @see #Peer(int,int,InetSocketAddress)
	 */
	public Peer(int myPort, InetSocketAddress monitorAddress, UI ui) throws PeerException
	{
		this(null, myPort, true, 0, monitorAddress, ui);
	}
	
	
	
	// Konstruktor f黵 Server mit Bandbreite
	
	/**
	 * Creates a new source peer with specified bandwith.
	 * 
	 * @see #Peer(int,int,InetSocketAddress)
	 */
	
	public Peer(int myPort, double maxUploadBandwidth, UI ui) throws PeerException
	{
 		this(null, myPort, true, maxUploadBandwidth, null, ui);
	}
	
	// Konstruktor f黵 Server mit Bandbreite und Monitor
	
	/**
	 * Creates a new source peer with specified bandwidth that
	 * sends notification messages to a monitor.
	 *
	 * @param myPort The port number to use. myPort, (myPort+1)
	 *        and (myPort+2) must be available for both TCP and UDP sockets. 
	 * @param maxUploadBandwidth The maximal upload bandwidth that this peer can use (in KiloBytes/s).
	 * @param monitorAddress The address of the monitor to which
	 *        notification packets will be sent.
	 * 
	 * @throws PeerException If <code>myPort</code> is already occupied 
	 */
	public Peer(int myPort, double maxUploadBandwidth, InetSocketAddress monitorAddress, UI ui) throws PeerException
	{
		this(null, myPort, true, maxUploadBandwidth, monitorAddress, ui);
	}
	
	
	// Konstruktor f黵 normalen Peer
	/**
	 * Creates a normal peer that immediately joins the P2P network of the specified server.
	 */
	public Peer(RemotePeer server, int myPort, UI ui) throws PeerException
	{
		this(server, myPort, false, 0, null, ui);
	}
	
	// Konstruktor f黵 normalen Peer mit Bandbreite
	/**
	 * Creates a normal peer that immediately joins the P2P network of the specified server. The new peer will use less bandwidth than <code>maxUploadBandwidth</code>.
	 * 
	 * @param maxUploadBandwidth The maximal upload bandwidth that this peer can use (in KiloBytes/s).
	 */
	public Peer(RemotePeer server, int myPort, double maxUploadBandwidth, UI ui) throws PeerException
	{
		this(server, myPort, false, maxUploadBandwidth, null, ui);
	}
	private void readXML()
	{
		LANUpload = xml.getLANUpload();
		LANDownload = xml.getLANDownload();
		IP = xml.getIP();
		internetMaxUpload = xml.Get_Internet_Upload_Max();
		internetMaxDownload = xml.Get_Internet_Download_Max();
		internetUpload = xml.Get_Internet_Upload_Limit();
		internetDownload = xml.Get_Internet_Download_Limit();
		if(isServer)
		{
			verifiedBandwidth = xml.This_Server_Uses_Verified_Bandwidth();
			setMaxUploadBandwidth();
			messageMode = xml.getMessageMode();
			streamMode = xml.getStreamMode();
		}	
		
	}
	/**
	 * Connects to the server
	 */
	public void connectToServer()
	{
		connectThread = new ConnectThread(this);
		connectThread.start();
	}
	/**
	 * Re-joins the P2P network
	 */
	public synchronized void join()
	{	
		join(null);
	}
	
	protected synchronized void join(RemotePeer startPeer)
	{
			if (udpPeerUpdater == null && messageSocket.getMode() == SettingsXML.UDP)
			{
				udpPeerUpdater = new UDPPeerUpdater(messageSocket);
				udpPeerUpdater.start();
			}
				
			// Fehlt der Zulieferer 黚erhaupt?
			if (isServer || (supplier != null))
			{
				return;
			}
				
			if ((joinThread != null) && joinThread.isAlive())
			{
				// Der Verbindungsversuch dauert noch an
				return;
			}
			
			// (Wieder) mit dem Netz verbinden
			if (startPeer == null)
			{
				joinThread = new JoinThread(this);
			}
			else
			{
				joinThread = new JoinThread(this, startPeer);
			}
			
			
			joinThread.start();
	}

	/*
	// Diese Methode wird aufgerufen, falls der alte Zulieferer
	// verschwunden ist und sich nicht anst鋘dig verabschiedet hat
	// (passiert auch, wenn die Methode "leave" aufgerufen wird)
	synchronized void reConnect()
	{
		if (!keepDisconnected)
		{
			join();
		}
	}
	*/
	private void createListeners()
	{
		int TCP = UniversalSocket.TCP;
		int UDP = UniversalSocket.UDP;
		int MULTICAST = SettingsXML.MULTICAST_PLUS;
		if (messageMode == streamMode)
		{
			//We will have one listener since the transports are the same
			if (streamMode == TCP)
				listener1 = new Listener(this, Listener.TCP_BOTH);
			else if (streamMode == UDP)
				listener1 = new Listener(this, Listener.UDP_BOTH);
		}
		else if(streamMode == MULTICAST && messageMode == UDP)
		{
			listener1 = new Listener(this, Listener.MULTICAST_PLUS_STREAMS_AND_UDP_MESSAGES);
		}
		else
		{
			//We will have two listeners since the transports are different
			if (streamMode == MULTICAST)
				listener1 = new Listener(this, Listener.MULTICAST_PLUS_STREAMS);
			else if (streamMode == UDP)
				listener1 = new Listener(this, Listener.UDP_STREAMS);
			else if (streamMode == TCP)
				listener1 = new Listener(this, Listener.TCP_STREAMS);
			
			
			if (messageMode == UDP)
				listener2 = new Listener(this, Listener.UDP_MESSAGES);
			else if (messageMode == TCP)
				listener2 = new Listener(this, Listener.TCP_MESSAGES);
		
		}
			if (listener1 != null)
				listener1.start();
			if (listener2 != null)
				listener2.start();
	}
	/**
	 * Leaves the P2P network, then destroys all network objects within
	 * 
	 */
	public synchronized void shutdown(){
		leave();
		if (joinThread != null)
			joinThread.shutdown();
		if (connectThread != null)
			connectThread.shutdown();
		if (listener1 != null)
			listener1.shutdown();
		if (listener2 != null)
			listener2.shutdown();
		if (freeloaderForgetter != null)
			freeloaderForgetter.shutdown();
		if (udpPeerUpdater != null)
			udpPeerUpdater.shutdown();
	}
	/**
	 * Leaves the P2P network. Use {@link #join()} to rejoin.
	 */
	public synchronized void leave()
	{	
		if (isServer)
		{
			if (broadcastBuffer != null)
			{
				broadcastBuffer.removeFromYP();
			}
			
			return;
		}
		
		// Kann auch aufgerufen werden,
		// falls es keinen Zulieferer gibt
		
		// Zulieferer f黵 die verbleibenden Kinder
		RemotePeer redirection;
		
		if (supplier != null)
		{
			redirection = supplier;
			
			// Den Zulieferer entfernen
			removeSupplier(true);
		}
		else
		{
			redirection = server;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -