broadcastbuffer.java

来自「java语言开发的P2P流媒体系统」· Java 代码 · 共 492 行

JAVA
492
字号
/* Stream-2-Stream - Peer to peer television and radio
 * October 13, 2005 - This file has been modified from the original P2P-Radio source
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 */

/* 
 * P2P-Radio - Peer to peer streaming system
 * Project homepage: http://p2p-radio.sourceforge.net/
 * Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
 * 
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */
 
package p2pradio.sources;
import stream2stream.network.*;
import java.util.Properties;

import p2pradio.*;
import p2pradio.packets.MetadataPacket;
import p2pradio.packets.HeaderPacket;
import p2pradio.packets.DataPacket;
import p2pradio.packets.PacketFactory;
import p2pradio.gui.MainFrame;
import p2pradio.logging.Logger;
import p2pradio.event.*;
import stream2stream.stationlist.LANPages;
import stream2stream.stationlist.YellowPages;


/**
 * Sources use BroadcastBuffer objects to broadcast data.
 * <P>
 * The broadcast buffer automatically adds sequence numbers and time stamps
 * to the packets and sends Metadata periodically
 * 
 * @author Michael Kaufmann
 */
//jhooks - modified, 2 new constructors
public class BroadcastBuffer extends Thread
{
	private Buffer buffer;
	private int port;
	private Metadata metadata;
	
	// Informationen f黵 die Strompakete
	private long startTime;
	private boolean startTimeIsValid = false;
	private boolean addYP;
	private long nextSeqNr = 1;
	
	private long lastSendTime;
	private boolean lastSendTimeIsValid = false;	
	
	// Messung der Durchschnittsgeschwindigkeit
	private long lastSpeedMeasurementTime;
	private boolean lastSpeedMeasurementTimeIsValid = false;
	private long bytesSinceLastSpeedMeasurement = 0;
	private int speedMeasurementPeriod = 1000;
	private int averageBytesPerSec = 0;
	
	public static final int MAX_SPEED_MEASUREMENT_PERIOD = 32000;
	
	public static final int MAX_NODATA_INTERVAL = 1000;
	public static final byte[] NULL_BYTE = new byte[0];
	
	private YellowPages yp;
	private LANPages lp;
	private Peer peer;
	
		
	
	/**
	 * Creates a new Broadcast Buffer.
	 * 
	 * @param buffer The buffer to be filled
	 */	
	public BroadcastBuffer(Buffer buffer, int port)
	{
		this(buffer, port, Messages.getString("BroadcastBuffer.THREAD_NAME")); //$NON-NLS-1$
	}
	
	/**
	 * Creates a new Broadcast Buffer.
	 * 
	 * @param buffer The buffer to be filled
	 * @param threadName The thread name of this Broadcast Buffer
	 */	
	public BroadcastBuffer(Buffer buffer, int port, String threadName)
	{
		super(threadName);
		metadata = new Metadata();
		this.buffer = buffer;
		this.port = port;
		setDaemon(true);
		sendMetadata();
		setHeader(NULL_BYTE);
	}
	/**
	 * Creates a new Broadcast Buffer.
	 * 
	 * @param buffer The buffer to be filled
	 * @param startTime The time that the server started in milliseconds
	 * @param port The p2pr network port
	 */	
	public BroadcastBuffer(Buffer buffer, int port, Peer peer)
	{
		this(buffer, port);
		addYP = false;
		startTimeIsValid = true;
		this.startTime = peer.getStartTime();
		this.peer = peer;
		//metadata = new Metadata();//peer.getUI().getMetadata();
		metadata.setPublic(addYP);
	}
	/**
	 * Creates a new Broadcast Buffer.
	 * 
	 * @param buffer The buffer to be filled
	 * @param startTime The time that the server started in milliseconds
	 * @param threadName The thread name of this Broadcast Buffer
	 */	
	public BroadcastBuffer(Buffer buffer, int port, Peer peer, String threadName)
	{
		this(buffer, port, threadName);
		startTimeIsValid = true;
		//metadata = peer.getUI().getMetadata();
		this.startTime = peer.getStartTime();
		this.peer = peer;
	}
	
	/**
	 * Starts this Broadcast Buffer. It will periodically add Metadata packets.
	 */
	public void run()
	{	
		Logger.finer("BroadcastBuffer", "BroadcastBuffer.THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
			
		try
		{
			long now = System.currentTimeMillis();
			long waitTime = MAX_NODATA_INTERVAL;
						
			while(true)
			{
				try
				{
					synchronized(this)
					{
						if (waitTime > 0)
						{
							wait(waitTime);
						}
					}
				}
				catch (InterruptedException e)
				{		
				}
					
				now = System.currentTimeMillis();
				
				if (lastSendTimeIsValid)
				{
					if (now - lastSendTime >= MAX_NODATA_INTERVAL)
					{
						// Es wurden keine Daten gesendet
						sendNullData();
						
						waitTime = MAX_NODATA_INTERVAL;
					}
					else
					{
						waitTime = MAX_NODATA_INTERVAL - (now - lastSendTime);
					}
				}
				else
				{
					// Es wurden noch keine Daten gesendet
					sendNullData();
					
					waitTime = MAX_NODATA_INTERVAL;
				}
			}
		}
		catch (Exception e)
		{
			Logger.severe("BroadcastBuffer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
		}
	}

	public void setHeader(byte[] header)
	{
		setHeader(header, 0, header.length);
	}
	
	public void setHeader(byte[] header, int headerLength)
	{
		setHeader(header, 0, headerLength);
	}
	
	public void setHeader(byte[] header, int headerOffset, int headerLength)
	{
		if (header == null)
		{
			throw new NullPointerException();
		}
		else if ((headerOffset < 0) || (headerOffset > header.length) || (headerLength < 0) || ((headerOffset + headerLength) > header.length) || ((headerOffset + headerLength) < 0))
		{
			throw new IndexOutOfBoundsException();
		}
		
		// Maximale Paketgr鰏se
		int maxHeaderSize = PacketFactory.STREAM_PACKET_MAX_SIZE - HeaderPacket.getMaxLength();
		
		if (headerLength > maxHeaderSize)
		{
			throw new IllegalArgumentException(Messages.getString("BroadcastBuffer.HEADER_IS_TOO_LONG")); //$NON-NLS-1$
		}
		
		buffer.put(new HeaderPacket(getNextSeqNr(), getTimestamp(), header, headerOffset, headerLength));
	}


	/**
	 * That's the same as <code>sendData(data, 0, data.length)</code>
	 * 
	 * @see #sendData(byte[],int,int)
	 */
	public void sendData(byte[] data)
	{
		sendData(data, 0, data.length);
	}

	/**
	 * That's the same as <code>sendData(data, 0, dataLength)</code>
	 * 
	 * @see #sendData(byte[],int,int)
	 */
	public void sendData(byte[] data, int dataLength)
	{
		sendData(data, 0, dataLength);
	}
	
	/**
	 * Puts <code>data</code> in the buffer, but only starting at
	 * <code>dataOffset</code> and with length <code>dataLength</code>.
	 * <P>
	 * If the data is too big to be put in a single {@link DataPacket},
	 * the data is distributed to multiple packets.
	 * 
	 * @see #sendData(byte[])
	 * @see #sendData(byte[],int)
	 */
	public void sendData(byte[] data, int dataOffset, int dataLength)
	{
		if (data == null)
		{
			throw new NullPointerException();
		}
		else if ((dataOffset < 0) || (dataOffset > data.length) || (dataLength < 0) || ((dataOffset + dataLength) > data.length) || ((dataOffset + dataLength) < 0))
		{
			throw new IndexOutOfBoundsException();
		}
		else if (dataLength == 0)
		{
			return;
		}
		
		// Maximale Paketgr鰏se
		int maxDataSize = PacketFactory.STREAM_PACKET_MAX_SIZE - DataPacket.getMaxLength();
		
		if (dataLength > maxDataSize)
		{
			// Die Daten m黶sen auf mehrere Pakete aufgeteilt werden
			
			synchronized(buffer)
			{
				for (int i=0; i < dataLength / maxDataSize; i++)
				{
					buffer.put(new DataPacket(getNextSeqNr(), getTimestamp(), buffer.getNewestHeaderPacket().getSeqNr(), buffer.getNewestMetadataPacket().getSeqNr(), data, dataOffset + i * maxDataSize, maxDataSize));
				}
			
				buffer.put(new DataPacket(getNextSeqNr(), getTimestamp(), buffer.getNewestHeaderPacket().getSeqNr(), buffer.getNewestMetadataPacket().getSeqNr(), data, dataOffset + dataLength - (dataLength % maxDataSize), dataLength % maxDataSize));
			}
		}
		else
		{
			// Nur ein Paket ist n鰐ig
			buffer.put(new DataPacket(getNextSeqNr(), getTimestamp(), buffer.getNewestHeaderPacket().getSeqNr(), buffer.getNewestMetadataPacket().getSeqNr(), data, dataOffset, dataLength));
		}
		
		long now = System.currentTimeMillis();
		
		lastSendTime = now;
		lastSendTimeIsValid = true;
		
		
		// Messung der Durchschnittsgeschwindigkeit
		
		bytesSinceLastSpeedMeasurement += dataLength;
		
		if (lastSpeedMeasurementTimeIsValid)
		{
			if (now - lastSpeedMeasurementTime >= speedMeasurementPeriod)
			{
				averageBytesPerSec = Math.round((float)bytesSinceLastSpeedMeasurement / ((float)(now - lastSpeedMeasurementTime) / 1000.0f));
				metadata.setAverageByterate(averageBytesPerSec);
				//sendMetadata();
				
				if (speedMeasurementPeriod < MAX_SPEED_MEASUREMENT_PERIOD)
				{
					speedMeasurementPeriod *= 2;
				}
				
				lastSpeedMeasurementTime = now;
				bytesSinceLastSpeedMeasurement = 0;
			}
		}
		else
		{
			lastSpeedMeasurementTime = now;
			lastSpeedMeasurementTimeIsValid = true;
		}
	}

	/**
	 * Sets the current metadata.
	 */
	public void setMetadataSong(Metadata metadata)
	{
		//System.out.println("song1:" + metadata.getSongTitle() + "|song2:" + this.metadata.getSongTitle());
		if (!metadata.getSongTitle().equals(this.metadata.getSongTitle()))
		{
			//System.out.println("fff");
			// Sonst geht die Byterate verloren
			metadata.setAverageByterate(averageBytesPerSec);
			
			// Metadaten klonen und nur die Kopie behalten (sonst sind
			// die Metadaten automatisch immer auf dem neusten Stand, weil
			// die Referenzen auf dasselbe Objekt zeigen)
			this.metadata.setSongTitle(metadata.getSongTitle());
			this.metadata.setSongURL(metadata.getSongURL());
			this.metadata.setPublic(true);
			
			// Metadaten sofort senden
			sendMetadata(); 
		}
		
		// Jetzt, wo wir die Metadaten haben, k鰊nen wir das
		// Yellow Pages-System informieren	
		
		if (addYP && yp == null)
		{
			yp = new YellowPages(this, port);
			yp.start();
		}
		if (lp == null)
		{
			lp = new LANPages(this);
			lp.start();
		}
	}
	public void setMetadata(Metadata metadata)
	{
		if (!metadata.equals(this.metadata))
		{
			//System.out.println("fff");
			// Sonst geht die Byterate verloren
			metadata.setAverageByterate(averageBytesPerSec);
			
			// Metadaten klonen und nur die Kopie behalten (sonst sind
			// die Metadaten automatisch immer auf dem neusten Stand, weil
			// die Referenzen auf dasselbe Objekt zeigen)
			this.metadata.setProperties((Properties)metadata.getProperties().clone());
			this.metadata.setPublic(true);
			
			// Metadaten sofort senden
			sendMetadata(); 
		}
		if (addYP && yp == null)
		{
			yp = new YellowPages(this, port);
			yp.start();
		}
		if (lp == null)
		{
			lp = new LANPages(this);
			lp.start();
		}
	}

	/**
	 * Puts <code>metadata</code> in the buffer.
	 */
	protected void sendMetadata()
	{
		// Da diese Klasse auch manchmal die Metadaten eigenh鋘dig
		// 鋘dert (Geschwindigkeitsmessung), m黶sen die Metadaten
		// nochmals geklont werden
		buffer.put(new MetadataPacket(getNextSeqNr(), getTimestamp(), (Metadata)metadata.clone()));
				
		lastSendTime = System.currentTimeMillis();
		lastSendTimeIsValid = true;
	}
	
	private void sendNullData()
	{
		sendData(NULL_BYTE);
	}
	
	private synchronized long getTimestamp()
	{
		if (startTimeIsValid)
		{
			return System.currentTimeMillis() - startTime;
		}
		else
		{
			startTime = System.currentTimeMillis();
			startTimeIsValid = true;
		
			return System.currentTimeMillis() - startTime;
		}
	}
	
	private synchronized long getNextSeqNr()
	{
		return nextSeqNr++;
	}
	
	// Required by YellowPages
	public Metadata getMetadata()
	{
		return metadata;
	}
	
	public void removeFromYP()
	{
		addYP = false;
		if (yp != null)
		{
			yp.shutdown();
			Logger.info("BroadcastBuffer", "BroadcastBuffer.REMOVING_YP", "" + YellowPages.DEFAULT_WAIT_TIME / 60);
		}	
		metadata.setPublic(false);
	}
	public void addToYP(UI ui)
	{
		Logger.info("BroadcastBuffer", "BroadcastBuffer.ADDING_YP", "" + YellowPages.FIRST_WAIT_TIME);
		addYP = true;
		metadata.setPublic(true);
		
		if (yp !=null  && !yp.isAlive())
			{
				//System.out.println("1");
				yp.start();
			}
		else if(yp == null)
		{
			//System.out.println("2");
			yp = new YellowPages(this, port);
			yp.start();
		}
		else
			;//System.out.println("3");
		if (yp != null)
			{
				yp.addUI(ui);
			}
	}
	public Peer getPeer()
	{
		return peer;
	}
	
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?