⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jointhread.java

📁 java语言开发的P2P流媒体系统
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* Stream-2-Stream - Peer to peer television and radio
 * October 13, 2005 - This file has been modified from the original P2P-Radio source
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 */

/* 
 * P2P-Radio - Peer to peer streaming system
 * Project homepage: http://p2p-radio.sourceforge.net/
 * Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
 * 
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */

package stream2stream.network;
import stream2stream.XML.SettingsXML;
import stream2stream.network.*;
import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.monitor.*;
import p2pradio.*;

import java.io.*;
import java.net.*;
import java.util.*;


/**
 * Searches a new supplier for its peer.
 * 
 * @author Michael Kaufmann
 */

public class JoinThread extends Thread
{
	private Peer peer;
	
	// Peer, der als erster angefragt wird
	// Falls es mit diesem Peer nicht klappt, wird die Wurzel genommen
	private RemotePeer startPeer;
	
	// Eigenes Socket
	private UniversalSocket socket;
	private UniversalSocket rootSocket;
	private UniversalSocket supplierSocket;
	private UniversalSocket universalMulticast;

	// Der zu findende Zulieferer
	private RemotePeer supplier;
	private RemotePeer supplierFather;
	private boolean shutdown;
	
	
	public static final int MAX_TREE_TRAVERSIONS = 3;
	
	// W鋒rend dieser Zeit muss der m鰃liche Zulieferer eine
	// Antwort geben
	public static final int JOIN_REQUEST_SOCKET_TIMEOUT = 1000;
	public static final int MULTICAST_TIMEOUT = 500;
	public static final String c = "JoinThread";

	/**
	 * Creates a <code>JoinThread</code> that searches a new supplier for
	 * the specified peer.
	 */
	public JoinThread(Peer peer)
	{
		super(Messages.getString("JoinThread.THREAD_NAME")); //$NON-NLS-1$
		this.peer = peer;
		startPeer = null;
		rootSocket = peer.getMessageSocket();
		
	}
	
	/**
	 * Creates a <code>JoinThread</code> that searches a new supplier for
	 * the specified peer, first asking <code>startPeer</code> if it can become the new supplier.
	 */
	public JoinThread(Peer peer, RemotePeer startPeer)
	{
		this(peer);
		this.startPeer = startPeer;
	}
	
	public void run()
	{
		if (peer.getMulticastAddress() != null)
		{
			Logger.info(c, c + ".MULTICAST", peer.getMulticastAddress());
			multicastJoin();
			
		}
		else
			p2pJoin();
	}
	private void multicastJoin()
	{
		InetSocketAddress groupAndPort = peer.getMulticastAddress();
		try {
			//System.out.println("1");
			MulticastSocket socket = new MulticastSocket(groupAndPort.getPort());
			//System.out.println("2");
			socket.joinGroup(groupAndPort.getAddress());
			//System.out.println("3");
			universalMulticast = new UniversalSocket(socket);
			//System.out.println("4");
			universalMulticast.setSoTimeout(MULTICAST_TIMEOUT);
			//System.out.println("5");
			universalMulticast.setBuffer(new byte [PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
			//System.out.println("6");
			universalMulticast.receive();  //See if we can see the stream
			//System.out.println("7");
			universalMulticast.setSoTimeout(StreamDispatcherForSupplier.SUPPLIER_CONNECTION_TIMEOUT);
			//System.out.println("8");
			peer.addSupplier(new RemotePeer(groupAndPort), null, universalMulticast);
			//System.out.println("9");
			
			//System.out.println("MULTICAST");
			
		} catch (Exception e) {
			p2pJoin();           //If any sort of problem arises, do a p2p join
		}
		
		
		
		
	}
	private void p2pJoin()
	{
		try
		{
			Logger.fine(c, c + ".JOINING"); //$NON-NLS-1$ //$NON-NLS-2$
			socket = new UniversalSocket(rootSocket.getMode());
			socket.setSoTimeout(JOIN_REQUEST_SOCKET_TIMEOUT);
			
			if (shutdown)
				return;
			
			if (startPeer != null)
			{
				doJoin(startPeer);  //jhooks - Traverse part of the tree for a supplier
				if (shutdown)
					return;
				
				if (supplier == null)
				{
					// Auch noch bei der Quelle beginnen
					doJoin(peer.getServer());
					if (shutdown)
						return;
				}
			
				// Das n鋍hste mal direkt die Quelle kontaktieren
				startPeer = null;
			}
			else
			{
				if (shutdown)
					return;
				doJoin(peer.getServer());  //jhooks - Traverse the entire tree for a supplier
				if (shutdown)
					return;
			}
		}
		catch (Exception e)
		{
			Logger.severe("JoinThread", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
		}
		finally
		{
			// Sockets schliessen
			if (socket != null)
			{
				try {
					socket.close();
				} catch (IOException e) {
				}
			}
			
			// Gefundenen Zulieferer melden
			if (supplier == null)
			{
				Logger.info("JoinThread", "JoinThread.NO_SUPPLIER_FOUND"); //$NON-NLS-1$ //$NON-NLS-2$
				
				// Das Netz verlassen (es konnte kein Zulieferer gefunden werden)
				peer.leave();  //TODO - VERY BAD, the client should at least try to connect
							   //Reconnect to the server with peer.connect(), call every minute
							   //Once a connection is obtained, try to join 
							   
			}
			else
			{
				//jhooks - I'm commenting this if statement, it is good to know what exactly is going on
				//if (!peer.wasOnceConnected())
				//{
					Logger.info("JoinThread", "JoinThread.SUPPLIER_FOUND"); //$NON-NLS-1$ //$NON-NLS-2$
				//}
				
				peer.addSupplier(supplier, supplierFather, supplierSocket);
			}
		}
	}
	

	private void doJoin(RemotePeer rootPeer)
	{
		// Der n鋍hste Peer, der angefragt wird
		RemotePeer possibleSupplier = rootPeer;
		
		// Der letzte besuchte Peer
		RemotePeer possibleSupplierFather = null; 
		
		// Wurde ein Paket vom anderen Peer empfangen?
		boolean packetReceived;

		// Die Nummer des Verbindungsversuchs
		int connectTryNumber;

		// Anzahl Anfragen bei der Quelle
		int treeTraversionNumber = 1;
		
		// Die Peers, die beim Herunterhangeln des Baums schon
		// mal angefragt wurden
		HashSet alreadySeenPeers = new HashSet();
		
		
		// Den Peer-Baum h鯿hstens MAX_TREE_TRAVERSIONS-mal traversieren
		while ((supplier == null) && (treeTraversionNumber <= MAX_TREE_TRAVERSIONS))
		{
			if (shutdown)
				return;
			connectTryNumber = 1;
			packetReceived = false;
			Packet outPacket = new JoinPacket(peer.getSocketAddress().getPort());; 
			Packet inPacket = null;
			
			if (possibleSupplier == rootPeer)
			{
				// Fies sein?
				while (!packetReceived && (connectTryNumber <= PacketFactory.UDP_RETRIES))
				{
					packetReceived = false;
					if (peer.getMisbehavior(Commands.BLAME_EVERYONE_AS_FREELOADER))
					{
						if (possibleSupplierFather != null)
						{
							peer.reportFreeloader(possibleSupplier, possibleSupplierFather);
						}
					}
			
				
					// Die Anzahl Verbindungsversuche zur點ksetzen
				
					try {
						rootSocket.send(outPacket);
						if (shutdown)
							return;
						inPacket = rootSocket.receive();
						if (shutdown)
							return;
						packetReceived = true;
					} catch (IOException e1) {
					}
				}
			}
			else{
			
		
			while (!packetReceived && (connectTryNumber <= PacketFactory.UDP_RETRIES))
			{
				packetReceived = false;
				
				try
				{
					if (peer.getMisbehavior(Commands.BLAME_EVERYONE_AS_FREELOADER))
					{
						if (possibleSupplierFather != null)
						{
							peer.reportFreeloader(possibleSupplier, possibleSupplierFather);
						}
					}
					if (shutdown)
						return;
					Logger.finer("JoinThread", "JoinThread.SENDING_REQUEST", possibleSupplier); //$NON-NLS-1$ //$NON-NLS-2$
					
					if (shutdown)
						return;
					socket.connect(possibleSupplier.getSocketAddress(), JOIN_REQUEST_SOCKET_TIMEOUT);
					if (socket.getMode() == SettingsXML.UDP)
						socket.setBuffer(new byte [PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
					else
						socket.setBuffer(new byte[PacketFactory.TCP_OUTPUTSTREAM_BUFFER]);
					if (shutdown)
						return;
					socket.send(outPacket);
					if (shutdown)
						return;
					inPacket = socket.receive();
					if (shutdown)
						return;
					socket.disconnect();
					if (shutdown)
						return;
					

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -