📄 jointhread.java
字号:
/* Stream-2-Stream - Peer to peer television and radio
* October 13, 2005 - This file has been modified from the original P2P-Radio source
* Project homepage: http://s2s.sourceforge.net/
* Copyright (C) 2005-2006 Jason Hooks
*/
/*
* P2P-Radio - Peer to peer streaming system
* Project homepage: http://p2p-radio.sourceforge.net/
* Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
*
* ---------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
* ---------------------------------------------------------------------------
*/
package stream2stream.network;
import stream2stream.XML.SettingsXML;
import stream2stream.network.*;
import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.monitor.*;
import p2pradio.*;
import java.io.*;
import java.net.*;
import java.util.*;
/**
* Searches a new supplier for its peer.
*
* @author Michael Kaufmann
*/
public class JoinThread extends Thread
{
private Peer peer;
// Peer, der als erster angefragt wird
// Falls es mit diesem Peer nicht klappt, wird die Wurzel genommen
private RemotePeer startPeer;
// Eigenes Socket
private UniversalSocket socket;
private UniversalSocket rootSocket;
private UniversalSocket supplierSocket;
private UniversalSocket universalMulticast;
// Der zu findende Zulieferer
private RemotePeer supplier;
private RemotePeer supplierFather;
private boolean shutdown;
public static final int MAX_TREE_TRAVERSIONS = 3;
// W鋒rend dieser Zeit muss der m鰃liche Zulieferer eine
// Antwort geben
public static final int JOIN_REQUEST_SOCKET_TIMEOUT = 1000;
public static final int MULTICAST_TIMEOUT = 500;
public static final String c = "JoinThread";
/**
* Creates a <code>JoinThread</code> that searches a new supplier for
* the specified peer.
*/
public JoinThread(Peer peer)
{
super(Messages.getString("JoinThread.THREAD_NAME")); //$NON-NLS-1$
this.peer = peer;
startPeer = null;
rootSocket = peer.getMessageSocket();
}
/**
* Creates a <code>JoinThread</code> that searches a new supplier for
* the specified peer, first asking <code>startPeer</code> if it can become the new supplier.
*/
public JoinThread(Peer peer, RemotePeer startPeer)
{
this(peer);
this.startPeer = startPeer;
}
public void run()
{
if (peer.getMulticastAddress() != null)
{
Logger.info(c, c + ".MULTICAST", peer.getMulticastAddress());
multicastJoin();
}
else
p2pJoin();
}
private void multicastJoin()
{
InetSocketAddress groupAndPort = peer.getMulticastAddress();
try {
//System.out.println("1");
MulticastSocket socket = new MulticastSocket(groupAndPort.getPort());
//System.out.println("2");
socket.joinGroup(groupAndPort.getAddress());
//System.out.println("3");
universalMulticast = new UniversalSocket(socket);
//System.out.println("4");
universalMulticast.setSoTimeout(MULTICAST_TIMEOUT);
//System.out.println("5");
universalMulticast.setBuffer(new byte [PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
//System.out.println("6");
universalMulticast.receive(); //See if we can see the stream
//System.out.println("7");
universalMulticast.setSoTimeout(StreamDispatcherForSupplier.SUPPLIER_CONNECTION_TIMEOUT);
//System.out.println("8");
peer.addSupplier(new RemotePeer(groupAndPort), null, universalMulticast);
//System.out.println("9");
//System.out.println("MULTICAST");
} catch (Exception e) {
p2pJoin(); //If any sort of problem arises, do a p2p join
}
}
private void p2pJoin()
{
try
{
Logger.fine(c, c + ".JOINING"); //$NON-NLS-1$ //$NON-NLS-2$
socket = new UniversalSocket(rootSocket.getMode());
socket.setSoTimeout(JOIN_REQUEST_SOCKET_TIMEOUT);
if (shutdown)
return;
if (startPeer != null)
{
doJoin(startPeer); //jhooks - Traverse part of the tree for a supplier
if (shutdown)
return;
if (supplier == null)
{
// Auch noch bei der Quelle beginnen
doJoin(peer.getServer());
if (shutdown)
return;
}
// Das n鋍hste mal direkt die Quelle kontaktieren
startPeer = null;
}
else
{
if (shutdown)
return;
doJoin(peer.getServer()); //jhooks - Traverse the entire tree for a supplier
if (shutdown)
return;
}
}
catch (Exception e)
{
Logger.severe("JoinThread", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
finally
{
// Sockets schliessen
if (socket != null)
{
try {
socket.close();
} catch (IOException e) {
}
}
// Gefundenen Zulieferer melden
if (supplier == null)
{
Logger.info("JoinThread", "JoinThread.NO_SUPPLIER_FOUND"); //$NON-NLS-1$ //$NON-NLS-2$
// Das Netz verlassen (es konnte kein Zulieferer gefunden werden)
peer.leave(); //TODO - VERY BAD, the client should at least try to connect
//Reconnect to the server with peer.connect(), call every minute
//Once a connection is obtained, try to join
}
else
{
//jhooks - I'm commenting this if statement, it is good to know what exactly is going on
//if (!peer.wasOnceConnected())
//{
Logger.info("JoinThread", "JoinThread.SUPPLIER_FOUND"); //$NON-NLS-1$ //$NON-NLS-2$
//}
peer.addSupplier(supplier, supplierFather, supplierSocket);
}
}
}
private void doJoin(RemotePeer rootPeer)
{
// Der n鋍hste Peer, der angefragt wird
RemotePeer possibleSupplier = rootPeer;
// Der letzte besuchte Peer
RemotePeer possibleSupplierFather = null;
// Wurde ein Paket vom anderen Peer empfangen?
boolean packetReceived;
// Die Nummer des Verbindungsversuchs
int connectTryNumber;
// Anzahl Anfragen bei der Quelle
int treeTraversionNumber = 1;
// Die Peers, die beim Herunterhangeln des Baums schon
// mal angefragt wurden
HashSet alreadySeenPeers = new HashSet();
// Den Peer-Baum h鯿hstens MAX_TREE_TRAVERSIONS-mal traversieren
while ((supplier == null) && (treeTraversionNumber <= MAX_TREE_TRAVERSIONS))
{
if (shutdown)
return;
connectTryNumber = 1;
packetReceived = false;
Packet outPacket = new JoinPacket(peer.getSocketAddress().getPort());;
Packet inPacket = null;
if (possibleSupplier == rootPeer)
{
// Fies sein?
while (!packetReceived && (connectTryNumber <= PacketFactory.UDP_RETRIES))
{
packetReceived = false;
if (peer.getMisbehavior(Commands.BLAME_EVERYONE_AS_FREELOADER))
{
if (possibleSupplierFather != null)
{
peer.reportFreeloader(possibleSupplier, possibleSupplierFather);
}
}
// Die Anzahl Verbindungsversuche zur點ksetzen
try {
rootSocket.send(outPacket);
if (shutdown)
return;
inPacket = rootSocket.receive();
if (shutdown)
return;
packetReceived = true;
} catch (IOException e1) {
}
}
}
else{
while (!packetReceived && (connectTryNumber <= PacketFactory.UDP_RETRIES))
{
packetReceived = false;
try
{
if (peer.getMisbehavior(Commands.BLAME_EVERYONE_AS_FREELOADER))
{
if (possibleSupplierFather != null)
{
peer.reportFreeloader(possibleSupplier, possibleSupplierFather);
}
}
if (shutdown)
return;
Logger.finer("JoinThread", "JoinThread.SENDING_REQUEST", possibleSupplier); //$NON-NLS-1$ //$NON-NLS-2$
if (shutdown)
return;
socket.connect(possibleSupplier.getSocketAddress(), JOIN_REQUEST_SOCKET_TIMEOUT);
if (socket.getMode() == SettingsXML.UDP)
socket.setBuffer(new byte [PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
else
socket.setBuffer(new byte[PacketFactory.TCP_OUTPUTSTREAM_BUFFER]);
if (shutdown)
return;
socket.send(outPacket);
if (shutdown)
return;
inPacket = socket.receive();
if (shutdown)
return;
socket.disconnect();
if (shutdown)
return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -