📄 streamdispatcherforsupplier.java
字号:
}
else if (packet instanceof StandByPacket)
{
// Nur ein Stand By-Paket pro Zuliefererwechsel akzeptieren
if (!supplierFatherIsChanging)
{
supplierFatherIsChanging = true;
supplierFatherChangeStart = System.currentTimeMillis();
Logger.finer(c, c + ".SUPPLIER_IS_ITSELF_SEARCHING_NEW_SUPPLIER"); //$NON-NLS-1$ //$NON-NLS-2$
// Kinder informieren -- Inform children
peer.getBuffer().put(packet);
}
else
{
Logger.fine(c, c + ".SUPERFLUOUS_STANDBY_PACKET_WAS_IGNORED"); //$NON-NLS-1$ //$NON-NLS-2$
}
}
else if (packet instanceof StreamPacket)
{
// Ein Paket des Stroms ist angekommen
StreamPacket streamPacket = (StreamPacket)packet;
// Signatur 黚erpr黤en
if (Peer.verifyStreamPackets && (peer.getSignatureChecker() != null))
{
try
{
if (!streamPacket.verify(peer.getSignatureChecker()))
{
Logger.fine(c, c + ".SUPPLIER_HAS_FALSIFIED_PACKET_INVALID_SIGNATURE"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln und als Freeloader melden
return;
}
}
catch (Exception e)
{
Logger.fine(c, c + ".ERROR_WHILE_CHECKING_SIGNATURE"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln und als Freeloader melden
return;
}
}
Buffer buffer = peer.getBuffer();
// Die Wiederaufnahme-Sequenznummer kann auch vom
// vorherigen Zulieferer sein
if (buffer.isResumeSeqNrValid())
{
// Kommt das Paket aus der Vergangenheit?
if (streamPacket.getSeqNr() <= buffer.getResumeSeqNr())
{
Logger.fine(c, c + ".PACKET_FROM_SUPPLIER_COMES_FROM_THE_PAST"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln und als Freeloader melden
//return;
}
}
if (dataPacketReceived)
{
// assert(buffer.isResumeSeqNrValid());
// Das heisst, dass der Strom jetzt kontinuierlich sein muss,
// weil der Zulieferer "fest" ist und nicht gerade vorher
// gewechselt hat
// Fand ein Sequenznummernsprung statt?
if (streamPacket.getSeqNr() != (buffer.getResumeSeqNr() + 1))
{
//Logger.fine(c, c + ".SUPPLIER_HAS_MADE_SEQUENCE_NUMBER_LEAP"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln
//return;
}
}
// Ist es ein Header- oder Metadatenpaket?
if (streamPacket instanceof MetaStreamPacket)
{
if (streamPacket instanceof HeaderPacket)
{
// Hat das Header-Paket eine 鋖tere Nummer als jenes
// Header-Paket, das wir bereits haben?
if ((buffer.getNewestHeaderPacket() != null) && (streamPacket.getSeqNr() <= buffer.getNewestHeaderPacket().getSeqNr()))
{
// Der Zulieferer hat ein falsches Header-Paket geliefert
Logger.fine(c, c + ".INVALID_HEADER_PACKET"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln und als Freeloader melden
return;
}
}
else if (streamPacket instanceof MetadataPacket)
{
// Hat das Metadaten-Paket eine 鋖tere Nummer als jenes
// Metadaten-Paket, das wir bereits haben?
//System.out.println("metadata");
if ((buffer.getNewestMetadataPacket() != null) && ((streamPacket).getSeqNr() <= buffer.getNewestMetadataPacket().getSeqNr()))
{
// Der Zulieferer hat ein falsches Metadaten-Paket geliefert
Logger.fine(c, c + ".INVALID_METADATA_PACKET"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln und als Freeloader melden
return;
}
}
supplierFatherIsChanging = false;
buffer.put(streamPacket);
}
else
{
// Es ist ein Datenpaket (kein Header- oder Metadaten-Paket)
// Wurde kein Header-Paket geliefert, obwohl eines
// h鋞te geliefert werden m黶sen?
if (buffer.getNewestHeaderPacket() == null)
{
//Logger.fine("TCPDispatcherForSupplier", "TCPDispatcherForSupplier.HEADER_PACKET_IS_MISSING"); //$NON-NLS-1$ //$NON-NLS-2$
HeaderPacket multicastNullHeader = new HeaderPacket(((DataPacket)streamPacket).getHeaderSeqNr(), System.currentTimeMillis(), BroadcastBuffer.NULL_BYTE);
buffer.put(multicastNullHeader);
// Zulieferer wechseln
//return;
}
// Wurde kein Metadaten-Paket geliefert, obwohl eines
// h鋞te geliefert werden m黶sen?
if (buffer.getNewestMetadataPacket() == null)
{
//Logger.fine("TCPDispatcherForSupplier", "TCPDispatcherForSupplier.METADATA_PACKET_IS_MISSING"); //$NON-NLS-1$ //$NON-NLS-2$
MetadataPacket metadataFromAcceptPacket = new MetadataPacket(((DataPacket)streamPacket).getMetadataSeqNr(), System.currentTimeMillis(), peer.getUI().getMetadata());
buffer.put(metadataFromAcceptPacket);
// Zulieferer wechseln
//return;
}
/*
// Hat der Zulieferer ein ung黮tiges Headerpaket geliefert?
if (((DataPacket)streamPacket).getHeaderSeqNr() != buffer.getNewestHeaderPacket().getSeqNr())
{
Logger.fine(c, c + ".TOO_OLD_HEADER_PACKET"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln
return;
}
// Hat der Zulieferer ein ung黮tiges Metadatenpaket geliefert?
if (((DataPacket)streamPacket).getMetadataSeqNr() != buffer.getNewestMetadataPacket().getSeqNr())
{
Logger.fine(c, c + ".TOO_OLD_METADATA_PACKET"); //$NON-NLS-1$ //$NON-NLS-2$
// Zulieferer wechseln
return;
}*/
// Paket ist in Ordnung
supplierFatherIsChanging = false;
buffer.put(streamPacket);
timeStampOfLastDataPacket = streamPacket.getTimeStamp();
if (!dataPacketReceived)
{
// Die Zeitpunkte m黶sen gesetzt werden
startTimeLocal = System.currentTimeMillis();
startTimeStream = streamPacket.getTimeStamp();
dataPacketReceived = true;
}
}
}
else
{
// Nicht erwartetes Paket erhalten
// Nichts darauf antworten
Logger.fine(c, "UNEXPECTED_PACKET_RECEIVED", packet); //$NON-NLS-1$ //$NON-NLS-2$
}
}
if (speedTooLow())
{
// Thread beenden
supplierTooSlow = true;
return;
}
}
}
catch (Exception e)
{
Logger.severe(c, "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
finally
{
if (udpPeerUpdater != null)
udpPeerUpdater.shutdown();
// Diese F鋖le wurden schon behandelt
if (!shutdown && !supplierLeft && !supplierTooSlow)
{
// Zulieferer wechseln und den alten als
// Freeloader melden
peer.changeSupplier(false, true);
}
}
}
private boolean speedTooLow()
{
long now = System.currentTimeMillis();
if (supplierFatherIsChanging)
{
// Dem Zulieferer nur TIME_SUPPLIER_SWITCHING Zeit geben,
// um einen neuen Vater zu finden
if (now - supplierFatherChangeStart > TIME_SUPPLIER_SWITCHING)
{
// Selbst einen neuen Zulieferer suchen
Logger.fine(c, c + ".SUPPLIER_NEEDED_TOO_MUCH_TIME_TO_FIND_NEW_SUPPLIER_FOR_ITSELF"); //$NON-NLS-1$ //$NON-NLS-2$
peer.changeSupplier(true, false);
return true;
}
else
{
// Dem Zulieferer noch Zeit geben
return false;
}
}
// Eigentliche Verz鰃erungsmessung
if ((!dataPacketReceived && ((now - receiveStartTime) > TIMEOUT_FOR_FIRST_STREAM_PACKET))
|| (dataPacketReceived && (System.currentTimeMillis() - startTimeLocal) - (timeStampOfLastDataPacket - startTimeStream) > TIME_BACKLOG))
{
// Verz鰃erung zu gross, Zulieferer wechseln
Logger.fine(c, c + ".SUPPLIER_IS_TOO_SLOW"); //$NON-NLS-1$ //$NON-NLS-2$
peer.changeSupplier(true, true);
return true;
}
return false;
}
/**
* Stops this thread as soon as possible. (Use {@link #join()} to wait for this thread to die)
*/
public synchronized void shutdown()
{
if (udpPeerUpdater != null)
udpPeerUpdater.shutdown();
shutdown = true;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -