📄 transmitter.java
字号:
/* Stream-2-Stream - Peer to peer television and radio
* October 13, 2005 - This file has been modified from the original P2P-Radio source
* Project homepage: http://s2s.sourceforge.net/
* Copyright (C) 2005-2006 Jason Hooks
*/
/*
* P2P-Radio - Peer to peer streaming system
* Project homepage: http://p2p-radio.sourceforge.net/
* Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
*
* ---------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
* ---------------------------------------------------------------------------
*/
package stream2stream.network;
import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.monitor.Commands;
import java.io.*;
import p2pradio.*;
/**
*
*
* Sends all {@link StreamPacket}s that will be put into
* the buffer of a peer to a child.
*
* @author Michael Kaufmann
*/
/**jhooks - sends the nsv/ogg/mp3/... stream from the {@link Buffer} to a child
/* Note - the {@link Buffer} is used to exchange the stream between classes
*/
public class Transmitter extends Thread
{
public static final int MAX_WAITTIME_FOR_PACKET = 5000;
private Peer peer;
private Listener listener;
private RemotePeer childPeer;
private UniversalSocket socket;
private boolean resumeDataIsValid = false;
private boolean shutdown = false;
private long resumeSeqNr;
private long headerSeqNr;
private long metadataSeqNr;
private RemotePeerData childData;
private Transmitter selfReference;
/**
* Creates a new transmitter that waits for {@link StreamPacket}s of the specified peer
* and sends them to a child peer.supplierPeer.isMulticastAddress() && socket.getMode() == SettingsXML.UDP
*
* @param peer The peer whose {@link StreamPacket}s will be sent
* @param childPeer The child peer
* @param outputStream The output stream of the child peer
*/
public Transmitter(Listener listener, RemotePeer childPeer, UniversalSocket socket)
{
super(Messages.getString("Transmitter.THREAD_NAME")); //$NON-NLS-1$
setDaemon(true);
selfReference = this;
this.peer = listener.getPeer();
this.listener = listener;
this.childPeer = childPeer;
this.socket = socket;
this.childData = peer.getChildData(childPeer);
}
/**
* Creates a new transmitter that waits for {@link StreamPacket}s of the specified peer
* and sends them to a child peer.
* <P>
* The transmitter tries to resume the stream with the packet with sequence number <code>resumeSeqNr</code>.
*
* @param peer The peer whose {@link StreamPacket}s will be sent
* @param childPeer The child peer
* @param outputStream The output stream of the child peer
*/
public Transmitter(Listener listener, RemotePeer childPeer, UniversalSocket socket, long resumeSeqNr, long headerSeqNr, long metadataSeqNr)
{
this(listener, childPeer, socket);
this.resumeSeqNr = resumeSeqNr;
this.headerSeqNr = headerSeqNr;
this.metadataSeqNr = metadataSeqNr;
resumeDataIsValid = true;
}
public void run()
{
Logger.finer("Transmitter", "Transmitter.THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
try
{
long seqNr = 0;
Packet packet = null;
Buffer buffer = peer.getBuffer();
if (shutdown)
return;
if (resumeDataIsValid)
{
boolean packetIsAvailable = true;
try
{
seqNr = buffer.streamPacketSeqNrToBufferSeqNr(resumeSeqNr);
}
catch (PeerException e)
{
// Paket ist nicht vorhanden
packetIsAvailable = false;
}
if (packetIsAvailable)
{
packet = buffer.get(seqNr);
}
else
{
// Will der Peer ein Paket aus der Zukunft? -- Does the Peer want a package from the future?
buffer.waitForValidResumeData();
// Die neuste Sequenznummer ist jene, mit der dieser
// Peer den Strom wiederaufnehmen w黵de
long newestSeqNr = buffer.getResumeSeqNr();
if (resumeSeqNr - newestSeqNr > 0)
{
// Eine Zeit lang warten auf das gew黱schte Paket
packet = buffer.getFuturePacket(resumeSeqNr, MAX_WAITTIME_FOR_PACKET);
if (shutdown)
return;
if (packet == null)
{
// Das Paket ist nicht gekommen, obwohl es aus der
// Zukunft war
Logger.fine("Transmitter", "Transmitter.CHILD_WANTS_INVALID_SEQUENCE_NUMBER", childPeer); //$NON-NLS-1$ //$NON-NLS-2$
// Kind entfernen
synchronized(peer)
{
// Sichergehen, dass es ein Kind ist (sonst g鋌e es eine Exception)
if (peer.isChild(childPeer))
{
peer.removeChild(childPeer);
}
}
return;
}
else
{
if (shutdown)
return;
try
{
seqNr = buffer.streamPacketSeqNrToBufferSeqNr(resumeSeqNr);
}
catch (PeerException e)
{
// Sollte nicht vorkommen
Logger.severe("Transmitter", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
}
else
{
// Das gew黱schte Paket ist aus der Vergangenheit
// Einfach das aktuelle Paket 黚ermitteln
// Wird nachher automatisch gemacht
}
}
}
if (shutdown)
return;
if (packet == null)
{
// Das neuste Paket anfordern
seqNr = buffer.getNewestSeqNr();
packet = buffer.get(seqNr);
while (packet == null)
{
if (shutdown)
return;
// Das Paket ist schon wieder weg
seqNr++;
packet = buffer.get(seqNr);
}
}
if (shutdown)
return;
// Falls StandBy-/Header-/Metadaten-Paket:
// Auf ein normales Strompaket warten
while ((packet == null) || !(packet instanceof DataPacket))
{
if (shutdown)
return;
seqNr++;
packet = buffer.get(seqNr);
}
try
{
// Muss das Headerpaket gesendet werden?
if (!resumeDataIsValid || (headerSeqNr < ((DataPacket)packet).getHeaderSeqNr()))
{
socket.send(buffer.getAssociatedHeaderPacket((DataPacket)packet));
}
if (shutdown)
return;
// Muss das Metadatenpaket gesendet werden?
if (!resumeDataIsValid || (metadataSeqNr < ((DataPacket)packet).getMetadataSeqNr()))
{
socket.send(buffer.getAssociatedMetadataPacket((DataPacket)packet));
}
if (shutdown)
return;
// Mit der regul鋜en Paket黚ertragung beginnen
while(true)
{
// Ist das Paket ein StandBy-Paket?
// In diesem Fall sollte das StandBy-Paket nur
// weitergesendet werden, wenn bisher kein neues
// Strom-Paket in den Puffer eingef黦t wurde
//Is the packet a StandBy packet?
//In this case, the StandBy packet should be further-sent only, if no new packets were inserted into the buffer
if (!((packet instanceof StandByPacket) && buffer.isNewerStreamPacketAvailable(seqNr)))
{
socket.send(packet);
}
if (shutdown)
return;
seqNr++;
packet = buffer.get(seqNr);
if (childData != null && !childData.isConnected())
{
return;
}
if ((packet != null) && peer.getMisbehavior(Commands.DROP_SOME_PACKETS))
{
// 10% aller Pakete verwerfen -- 10% of all packages rejected
if (new java.util.Random().nextInt(10) == 0)
{
// Dieses Paket verwerfen und n鋍hstes Paket holen
seqNr++;
packet = buffer.get(seqNr);
}
}
if (shutdown)
return;
// War das verlangte Paket nicht mehr im Puffer?
if (packet == null)
{
Logger.fine("Transmitter", "Transmitter.CHILD_IS_TOO_SLOW_AND_WILL_BE_REMOVED", childPeer); //$NON-NLS-1$ //$NON-NLS-2$
synchronized(peer)
{
// Sichergehen, dass es noch ein Kind ist (sonst g鋌e es eine Exception)
if (peer.isChild(childPeer))
{
peer.removeChild(childPeer);
}
}
return;
}
if (shutdown)
return;
if ((packet instanceof DataPacket) && peer.getMisbehavior(Commands.CHANGE_PACKET_DATA))
{
// Das erste Bit im ersten Byte des Pakets umkehren
DataPacket dataPacket = (DataPacket)packet;
byte data = dataPacket.getData()[dataPacket.getDataOffset()];
data = (byte)(data ^ (1 << 7));
dataPacket.getData()[dataPacket.getDataOffset()] = data;
}
//if (packet instanceof MetadataPacket)
//System.out.println("Sending metadata");
if (shutdown)
return;
if (peer.getMisbehavior(Commands.DELAY_PACKETS))
{
long arrivalTime = buffer.getArrivalTime(seqNr);
if (arrivalTime != -1)
{
long sleepTime = (arrivalTime + Peer.MISBEHAVIOR_PACKET_DELAY) - System.currentTimeMillis();
if (sleepTime > 0)
{
try
{
if (shutdown)
return;
Thread.sleep(sleepTime);
}
catch (InterruptedException e)
{
}
}
}
}
}
}
catch (IOException e)
{
// Dem TCPDispatcherForChildren Zeit geben
Thread.yield();
// Ist dieses Kind entfernt worden?
if (childData != null && !childData.isConnected())
{
// Es gibt nichts mehr zu tun
return;
}
else
{
Logger.fine("Transmitter", "IO_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
// Der TCPDispatcherForChildren r鋟mt die Ressourcen
// alleine auf
return;
}
}
}
catch (Exception e)
{
Logger.severe("Transmitter", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
finally
{
// Siehe Kommentare bei TCPDispatcherForChildren
synchronized(peer)
{
if (childData != null && childData.isConnected())
{
peer.removeChild(childPeer);
}
}
}
}
public void shutdown()
{
shutdown = true;
try {
socket.close();
} catch (IOException e) {
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -