broadcastbuffer.java
来自「java语言开发的P2P流媒体系统」· Java 代码 · 共 492 行
JAVA
492 行
/* Stream-2-Stream - Peer to peer television and radio
* October 13, 2005 - This file has been modified from the original P2P-Radio source
* Project homepage: http://s2s.sourceforge.net/
* Copyright (C) 2005-2006 Jason Hooks
*/
/*
* P2P-Radio - Peer to peer streaming system
* Project homepage: http://p2p-radio.sourceforge.net/
* Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
*
* ---------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
* ---------------------------------------------------------------------------
*/
package p2pradio.sources;
import stream2stream.network.*;
import java.util.Properties;
import p2pradio.*;
import p2pradio.packets.MetadataPacket;
import p2pradio.packets.HeaderPacket;
import p2pradio.packets.DataPacket;
import p2pradio.packets.PacketFactory;
import p2pradio.gui.MainFrame;
import p2pradio.logging.Logger;
import p2pradio.event.*;
import stream2stream.stationlist.LANPages;
import stream2stream.stationlist.YellowPages;
/**
* Sources use BroadcastBuffer objects to broadcast data.
* <P>
* The broadcast buffer automatically adds sequence numbers and time stamps
* to the packets and sends Metadata periodically
*
* @author Michael Kaufmann
*/
//jhooks - modified, 2 new constructors
public class BroadcastBuffer extends Thread
{
private Buffer buffer;
private int port;
private Metadata metadata;
// Informationen f黵 die Strompakete
private long startTime;
private boolean startTimeIsValid = false;
private boolean addYP;
private long nextSeqNr = 1;
private long lastSendTime;
private boolean lastSendTimeIsValid = false;
// Messung der Durchschnittsgeschwindigkeit
private long lastSpeedMeasurementTime;
private boolean lastSpeedMeasurementTimeIsValid = false;
private long bytesSinceLastSpeedMeasurement = 0;
private int speedMeasurementPeriod = 1000;
private int averageBytesPerSec = 0;
public static final int MAX_SPEED_MEASUREMENT_PERIOD = 32000;
public static final int MAX_NODATA_INTERVAL = 1000;
public static final byte[] NULL_BYTE = new byte[0];
private YellowPages yp;
private LANPages lp;
private Peer peer;
/**
* Creates a new Broadcast Buffer.
*
* @param buffer The buffer to be filled
*/
public BroadcastBuffer(Buffer buffer, int port)
{
this(buffer, port, Messages.getString("BroadcastBuffer.THREAD_NAME")); //$NON-NLS-1$
}
/**
* Creates a new Broadcast Buffer.
*
* @param buffer The buffer to be filled
* @param threadName The thread name of this Broadcast Buffer
*/
public BroadcastBuffer(Buffer buffer, int port, String threadName)
{
super(threadName);
metadata = new Metadata();
this.buffer = buffer;
this.port = port;
setDaemon(true);
sendMetadata();
setHeader(NULL_BYTE);
}
/**
* Creates a new Broadcast Buffer.
*
* @param buffer The buffer to be filled
* @param startTime The time that the server started in milliseconds
* @param port The p2pr network port
*/
public BroadcastBuffer(Buffer buffer, int port, Peer peer)
{
this(buffer, port);
addYP = false;
startTimeIsValid = true;
this.startTime = peer.getStartTime();
this.peer = peer;
//metadata = new Metadata();//peer.getUI().getMetadata();
metadata.setPublic(addYP);
}
/**
* Creates a new Broadcast Buffer.
*
* @param buffer The buffer to be filled
* @param startTime The time that the server started in milliseconds
* @param threadName The thread name of this Broadcast Buffer
*/
public BroadcastBuffer(Buffer buffer, int port, Peer peer, String threadName)
{
this(buffer, port, threadName);
startTimeIsValid = true;
//metadata = peer.getUI().getMetadata();
this.startTime = peer.getStartTime();
this.peer = peer;
}
/**
* Starts this Broadcast Buffer. It will periodically add Metadata packets.
*/
public void run()
{
Logger.finer("BroadcastBuffer", "BroadcastBuffer.THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
try
{
long now = System.currentTimeMillis();
long waitTime = MAX_NODATA_INTERVAL;
while(true)
{
try
{
synchronized(this)
{
if (waitTime > 0)
{
wait(waitTime);
}
}
}
catch (InterruptedException e)
{
}
now = System.currentTimeMillis();
if (lastSendTimeIsValid)
{
if (now - lastSendTime >= MAX_NODATA_INTERVAL)
{
// Es wurden keine Daten gesendet
sendNullData();
waitTime = MAX_NODATA_INTERVAL;
}
else
{
waitTime = MAX_NODATA_INTERVAL - (now - lastSendTime);
}
}
else
{
// Es wurden noch keine Daten gesendet
sendNullData();
waitTime = MAX_NODATA_INTERVAL;
}
}
}
catch (Exception e)
{
Logger.severe("BroadcastBuffer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
public void setHeader(byte[] header)
{
setHeader(header, 0, header.length);
}
public void setHeader(byte[] header, int headerLength)
{
setHeader(header, 0, headerLength);
}
public void setHeader(byte[] header, int headerOffset, int headerLength)
{
if (header == null)
{
throw new NullPointerException();
}
else if ((headerOffset < 0) || (headerOffset > header.length) || (headerLength < 0) || ((headerOffset + headerLength) > header.length) || ((headerOffset + headerLength) < 0))
{
throw new IndexOutOfBoundsException();
}
// Maximale Paketgr鰏se
int maxHeaderSize = PacketFactory.STREAM_PACKET_MAX_SIZE - HeaderPacket.getMaxLength();
if (headerLength > maxHeaderSize)
{
throw new IllegalArgumentException(Messages.getString("BroadcastBuffer.HEADER_IS_TOO_LONG")); //$NON-NLS-1$
}
buffer.put(new HeaderPacket(getNextSeqNr(), getTimestamp(), header, headerOffset, headerLength));
}
/**
* That's the same as <code>sendData(data, 0, data.length)</code>
*
* @see #sendData(byte[],int,int)
*/
public void sendData(byte[] data)
{
sendData(data, 0, data.length);
}
/**
* That's the same as <code>sendData(data, 0, dataLength)</code>
*
* @see #sendData(byte[],int,int)
*/
public void sendData(byte[] data, int dataLength)
{
sendData(data, 0, dataLength);
}
/**
* Puts <code>data</code> in the buffer, but only starting at
* <code>dataOffset</code> and with length <code>dataLength</code>.
* <P>
* If the data is too big to be put in a single {@link DataPacket},
* the data is distributed to multiple packets.
*
* @see #sendData(byte[])
* @see #sendData(byte[],int)
*/
public void sendData(byte[] data, int dataOffset, int dataLength)
{
if (data == null)
{
throw new NullPointerException();
}
else if ((dataOffset < 0) || (dataOffset > data.length) || (dataLength < 0) || ((dataOffset + dataLength) > data.length) || ((dataOffset + dataLength) < 0))
{
throw new IndexOutOfBoundsException();
}
else if (dataLength == 0)
{
return;
}
// Maximale Paketgr鰏se
int maxDataSize = PacketFactory.STREAM_PACKET_MAX_SIZE - DataPacket.getMaxLength();
if (dataLength > maxDataSize)
{
// Die Daten m黶sen auf mehrere Pakete aufgeteilt werden
synchronized(buffer)
{
for (int i=0; i < dataLength / maxDataSize; i++)
{
buffer.put(new DataPacket(getNextSeqNr(), getTimestamp(), buffer.getNewestHeaderPacket().getSeqNr(), buffer.getNewestMetadataPacket().getSeqNr(), data, dataOffset + i * maxDataSize, maxDataSize));
}
buffer.put(new DataPacket(getNextSeqNr(), getTimestamp(), buffer.getNewestHeaderPacket().getSeqNr(), buffer.getNewestMetadataPacket().getSeqNr(), data, dataOffset + dataLength - (dataLength % maxDataSize), dataLength % maxDataSize));
}
}
else
{
// Nur ein Paket ist n鰐ig
buffer.put(new DataPacket(getNextSeqNr(), getTimestamp(), buffer.getNewestHeaderPacket().getSeqNr(), buffer.getNewestMetadataPacket().getSeqNr(), data, dataOffset, dataLength));
}
long now = System.currentTimeMillis();
lastSendTime = now;
lastSendTimeIsValid = true;
// Messung der Durchschnittsgeschwindigkeit
bytesSinceLastSpeedMeasurement += dataLength;
if (lastSpeedMeasurementTimeIsValid)
{
if (now - lastSpeedMeasurementTime >= speedMeasurementPeriod)
{
averageBytesPerSec = Math.round((float)bytesSinceLastSpeedMeasurement / ((float)(now - lastSpeedMeasurementTime) / 1000.0f));
metadata.setAverageByterate(averageBytesPerSec);
//sendMetadata();
if (speedMeasurementPeriod < MAX_SPEED_MEASUREMENT_PERIOD)
{
speedMeasurementPeriod *= 2;
}
lastSpeedMeasurementTime = now;
bytesSinceLastSpeedMeasurement = 0;
}
}
else
{
lastSpeedMeasurementTime = now;
lastSpeedMeasurementTimeIsValid = true;
}
}
/**
* Sets the current metadata.
*/
public void setMetadataSong(Metadata metadata)
{
//System.out.println("song1:" + metadata.getSongTitle() + "|song2:" + this.metadata.getSongTitle());
if (!metadata.getSongTitle().equals(this.metadata.getSongTitle()))
{
//System.out.println("fff");
// Sonst geht die Byterate verloren
metadata.setAverageByterate(averageBytesPerSec);
// Metadaten klonen und nur die Kopie behalten (sonst sind
// die Metadaten automatisch immer auf dem neusten Stand, weil
// die Referenzen auf dasselbe Objekt zeigen)
this.metadata.setSongTitle(metadata.getSongTitle());
this.metadata.setSongURL(metadata.getSongURL());
this.metadata.setPublic(true);
// Metadaten sofort senden
sendMetadata();
}
// Jetzt, wo wir die Metadaten haben, k鰊nen wir das
// Yellow Pages-System informieren
if (addYP && yp == null)
{
yp = new YellowPages(this, port);
yp.start();
}
if (lp == null)
{
lp = new LANPages(this);
lp.start();
}
}
public void setMetadata(Metadata metadata)
{
if (!metadata.equals(this.metadata))
{
//System.out.println("fff");
// Sonst geht die Byterate verloren
metadata.setAverageByterate(averageBytesPerSec);
// Metadaten klonen und nur die Kopie behalten (sonst sind
// die Metadaten automatisch immer auf dem neusten Stand, weil
// die Referenzen auf dasselbe Objekt zeigen)
this.metadata.setProperties((Properties)metadata.getProperties().clone());
this.metadata.setPublic(true);
// Metadaten sofort senden
sendMetadata();
}
if (addYP && yp == null)
{
yp = new YellowPages(this, port);
yp.start();
}
if (lp == null)
{
lp = new LANPages(this);
lp.start();
}
}
/**
* Puts <code>metadata</code> in the buffer.
*/
protected void sendMetadata()
{
// Da diese Klasse auch manchmal die Metadaten eigenh鋘dig
// 鋘dert (Geschwindigkeitsmessung), m黶sen die Metadaten
// nochmals geklont werden
buffer.put(new MetadataPacket(getNextSeqNr(), getTimestamp(), (Metadata)metadata.clone()));
lastSendTime = System.currentTimeMillis();
lastSendTimeIsValid = true;
}
private void sendNullData()
{
sendData(NULL_BYTE);
}
private synchronized long getTimestamp()
{
if (startTimeIsValid)
{
return System.currentTimeMillis() - startTime;
}
else
{
startTime = System.currentTimeMillis();
startTimeIsValid = true;
return System.currentTimeMillis() - startTime;
}
}
private synchronized long getNextSeqNr()
{
return nextSeqNr++;
}
// Required by YellowPages
public Metadata getMetadata()
{
return metadata;
}
public void removeFromYP()
{
addYP = false;
if (yp != null)
{
yp.shutdown();
Logger.info("BroadcastBuffer", "BroadcastBuffer.REMOVING_YP", "" + YellowPages.DEFAULT_WAIT_TIME / 60);
}
metadata.setPublic(false);
}
public void addToYP(UI ui)
{
Logger.info("BroadcastBuffer", "BroadcastBuffer.ADDING_YP", "" + YellowPages.FIRST_WAIT_TIME);
addYP = true;
metadata.setPublic(true);
if (yp !=null && !yp.isAlive())
{
//System.out.println("1");
yp.start();
}
else if(yp == null)
{
//System.out.println("2");
yp = new YellowPages(this, port);
yp.start();
}
else
;//System.out.println("3");
if (yp != null)
{
yp.addUI(ui);
}
}
public Peer getPeer()
{
return peer;
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?