📄 streamdispatcherforsupplier.java
字号:
/* Stream-2-Stream - Peer to peer television and radio
* October 13, 2005 - This file has been modified from the original P2P-Radio source
* Project homepage: http://s2s.sourceforge.net/
* Copyright (C) 2005-2006 Jason Hooks
*/
/*
* P2P-Radio - Peer to peer streaming system
* Project homepage: http://p2p-radio.sourceforge.net/
* Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
*
* ---------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
* ---------------------------------------------------------------------------
*/
package stream2stream.network;
import stream2stream.XML.SettingsXML;
import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.sources.BroadcastBuffer;
import p2pradio.*;
import java.io.*;
import java.net.SocketException;
import java.net.SocketTimeoutException;
/**
* Processes all packets that it receives from the supplier.
* <P>
* This thread continually checks if the supplier is sending enough data. If necessary, it checks
* if the {@link StreamPacket}s have a correct signature. It also checks if the supplier
* tries to resume the stream at the correct position.
* <P>
* If the supplier is too slow or is cheating, this thread tells the
* peer to change the supplier.
*
* @author Michael Kaufmann
*/
//jhooks - receives the nsv/ogg/mp3/... stream from the supplier and sends it to the {@link Buffer}
//Note - the @link Buffer is used to exchange the stream between classes
public class StreamDispatcherForSupplier extends Thread
{
private Peer peer;
private RemotePeer supplierPeer;
private UniversalSocket socket;
public static final String c = "StreamDispatcherForSupplier";
public static final long TIME_TO_MEASURE_SPEED = 5000; // 5 seconds
// Soll dieser Thread so schnell wie m鰃lich beendet
// werden?
private boolean shutdown = false;
/* Geschwindigkeitsmessung:
Mit den Zeitstempeln in den Strompaketen kann berechnet werden,
ob sich ein R點kstand ergeben hat.
*/
// Erlaubte maximale Verz鰃erung des Datenstroms
public static final int TIME_BACKLOG = 3000;
// Soviel Zeit wird dem Zulieferer gew鋒rt, um einen neuen Vater zu finden
public static final int TIME_SUPPLIER_SWITCHING = 3000;
// Zeit, die zur Verf黦ung steht, um eine Verbindung aufzubauen
public static final int CONNECTION_ESTABLISHMENT_TIMEOUT = 5000;
// Timeout der Verbindung zum Zulieferer
// Achtung: Nicht zu kurz w鋒len!
public static final int SUPPLIER_CONNECTION_TIMEOUT = 2000;
// Maximale Wartezeit f黵 erstes Strompaket
// Muss >= SUPPLIER_CONNECTION_TIMEOUT sein
public static final int TIMEOUT_FOR_FIRST_STREAM_PACKET = 2000;
// Startzeitpunkte der Geschwindigkeitsmessung, in "Stromzeit" und Lokalzeit
private long startTimeStream;
private long startTimeLocal;
// Zeitstempel des letzten empfangenen Datenpakets
private long timeStampOfLastDataPacket;
// Wann wurde mit dem Empfangen der Daten begonnen?
private long receiveStartTime;
// Wurde schon mal ein DataPacket empfangen?
boolean dataPacketReceived = false;
// Wechselt gerade der Vater des Zulieferers?
private boolean supplierFatherIsChanging = false;
private boolean firstTime = true;
// Zeitpunkt, als der Zulieferer das Wechseln seines Vaters ank黱digte
private long supplierFatherChangeStart;
private int bytes, bytesPerSec;
private long startTime;
private long currentTime;
private UDPPeerUpdater udpPeerUpdater;
/**
* Creates a new TCP Dispatcher for Supplier.
*
* @param peer The peer that this dispatcher belongs to
* @param supplierPeer The supplier of <code>peer</code>
* @param inputStream The input stream of <code>supplierPeer</code>
*/
public StreamDispatcherForSupplier(Peer peer, RemotePeer supplierPeer, UniversalSocket socket)
{
super(Messages.getString(c + ".THREAD_NAME")); //$NON-NLS-1$
setDaemon(true);
this.socket = socket;
/*
try {
socket.connect(supplierPeer.getSocketAddress());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
this.peer = peer;
this.supplierPeer = supplierPeer;
bytes = bytesPerSec = 0;
}
public void run()
{
Logger.finer(c, c + ".THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
// Ist der Zulieferer zu langsam?
boolean supplierTooSlow = false;
// Hat sich der Zulieferer verabschiedet?
boolean supplierLeft = false;
try
{
receiveStartTime = System.currentTimeMillis();
startTime = System.currentTimeMillis();
// Endlosschleife
while (true)
{
if (shutdown)
{
return;
}
boolean packetReceived = false;
Packet packet = null;
while(!packetReceived)
{
try
{
packet = socket.receive();
if (packet != null)
packetReceived = true;
bytes += packet.getContentLength();
currentTime = System.currentTimeMillis();
long timeElapsed = currentTime - startTime;
if (timeElapsed >= TIME_TO_MEASURE_SPEED || (bytesPerSec == 0.0 && timeElapsed >= 500 ) )
{
bytesPerSec = Math.round((float)bytes / ((float)(System.currentTimeMillis() - startTime) / 1000.0f));
peer.getUI().getMetadata().setAverageByterate(bytesPerSec);
startTime = System.currentTimeMillis();
bytes = 0;
}
}
catch (SocketTimeoutException e)
{
if (!shutdown)
{
Logger.fine(c, c +".SUPPLIER_HAS_SENT_NO_DATA_DURING_LONG_PERIOD"); //$NON-NLS-1$ //$NON-NLS-2$
}
return;
}
catch (EOFException e)
{
if (!shutdown)
{
Logger.finer(c, c + ".STREAM_FROM_SUPPLIER_ENDED"); //$NON-NLS-1$ //$NON-NLS-2$
}
return;
}
catch (IOException e)
{
if (!shutdown)
{
Logger.fine(c, "IO_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
return;
}
}
if (firstTime && !supplierPeer.isMulticastAddress() && socket.getMode() == SettingsXML.UDP)
{
try {
socket.setReceivedAddress();
} catch (SocketException e) {
} catch (IOException e) {
}
udpPeerUpdater = new UDPPeerUpdater(socket);
udpPeerUpdater.start();
firstTime = false;
}
if (shutdown)
{
return;
}
// System.out.println(actionPacket);
synchronized (peer)
{
if (packet instanceof LeaveAndRedirectPacket)
{
// Der Zulieferer ist gegangen
Logger.finer(c, c + ".SUPPLIER_HAS_SAID_GOODBYE"); //$NON-NLS-1$ //$NON-NLS-2$
RemotePeer newPossibleSupplier = ((LeaveAndRedirectPacket)packet).getRedirection();
// Zulieferer wechseln und ihn als Freeloader melden
// (schliesslich kann er auch nur so tun, als ob er gehen w黵de)
peer.changeSupplier(newPossibleSupplier, false, true);
return;
}
else if (packet instanceof FreeloaderRedirectPacket)
{
// Dieser Peer wurde vom Zulieferer als Freeloader erkannt
// Nachgeben und sich vom Zulieferer trennen
Logger.fine(c, c + ".THIS_PEER_WAS_DISCOVERED_AS_FREELOADER"); //$NON-NLS-1$ //$NON-NLS-2$
RemotePeer newPossibleSupplier = ((FreeloaderRedirectPacket)packet).getRedirection();
// Zulieferer wechseln. Nat黵lich melden wir unseren
// ehemaligen Zulieferer als Freeloader, er k鰊nte
// diesen Peer zu Unrecht als Freeloader behandeln
peer.changeSupplier(newPossibleSupplier, false, true);
// Der Zulieferer hat sich verabschiedet
supplierLeft = true;
return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -