buffer.java

来自「java语言开发的P2P流媒体系统」· Java 代码 · 共 560 行 · 第 1/2 页

JAVA
560
字号
/* Stream-2-Stream - Peer to peer television and radio
 * October 13, 2005 - This file has been modified from the original P2P-Radio source
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 */

/* 
 * P2P-Radio - Peer to peer streaming system
 * Project homepage: http://p2p-radio.sourceforge.net/
 * Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
 * 
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */

package p2pradio;

import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.packets.security.*;
import stream2stream.network.PeerException;

import java.util.*;


/**
 * Stores and exchanges packets between the peer and its threads.
 * <P>
 * Every packet ist stored for {@link #MAX_TIME_IN_BUFFER} milliseconds.
 * <P>
 * The buffer uses its own sequence numbers (&quot;buffer sequence numbers&quot;).
 *  
 * @author Michael Kaufmann
 */ 
public class Buffer
{
	// Die Sequenznummern im Puffer sind nicht die gleichen wie
	// in den Paketen!
	private boolean newestSeqNrIsValid = false;
	private long newestSeqNr = 0;

	private Map buffer = new HashMap();
	private Map streamPacketSeqNrToBufferSeqNr = new HashMap();
	private Map arrivalTime = new HashMap();
	
	// F黵 Strompakete
	private boolean resumeSeqNrIsValid = false;
	private long resumeSeqNr;
		
	private SignatureGenerator signatureGenerator;
	
	// Mit Paketen verkn黳fte Header-/Metadaten-Pakete
	private Map associatedHeaderPacket = new HashMap();
	private Map associatedMetadataPacket = new HashMap();
	private HeaderPacket newestHeaderPacket;
	private MetadataPacket newestMetadataPacket;
	
	/**
	 * The time (in milliseconds) that a packet will stay in the buffer
	 */
	public static final long MAX_TIME_IN_BUFFER = 20000;  // ms
	
	
	public Buffer(SignatureGenerator signatureGenerator)
	{
		this.signatureGenerator = signatureGenerator;
	}

	/**
	 * Puts a packet into the buffer.
	 * The packet will be signed if necessary.
	 */
	public synchronized void put(Packet packet)
	{
		if (newestSeqNrIsValid)
		{
			newestSeqNr++;
		}

		newestSeqNrIsValid = true;
		
		Long key = new Long(newestSeqNr);
		
		arrivalTime.put(key, new Long(System.currentTimeMillis()));
		buffer.put(key, packet);
		
		// Wiederaufnahme-Daten sammeln und signieren, falls gew黱scht
		if (packet instanceof StreamPacket)
		{
			StreamPacket streamPacket = (StreamPacket)packet;
			Long streamPacketSeqNr = new Long(streamPacket.getSeqNr());
			streamPacketSeqNrToBufferSeqNr.put(streamPacketSeqNr, key);
			resumeSeqNr = streamPacket.getSeqNr();
			resumeSeqNrIsValid = true;
			
			if (signatureGenerator != null)
			{
				try
				{
					streamPacket.sign(signatureGenerator);
				}
				catch (Exception e)
				{
					Logger.warning("Buffer", "Buffer.PACKET_SIGNING_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
				}
			}
			
			// Header-/Metadatenpakete merken
			if (packet instanceof MetaStreamPacket)
			{
				if (packet instanceof HeaderPacket)
				{
					newestHeaderPacket = (HeaderPacket)packet;
				}
				else if (packet instanceof MetadataPacket)
				{
					newestMetadataPacket = (MetadataPacket)packet;
				}
			}
			else
			{
				if (packet instanceof DataPacket)
				{
					// Mit Spezialpaketen verkn黳fen
					associatedHeaderPacket.put(packet, newestHeaderPacket);
					associatedMetadataPacket.put(packet, newestMetadataPacket);
				}
			}
		}
				
		// Neues Paket verf黦bar  -- New Packet available
		notifyAll();  //wakes up ALL waiting threads; the scheduler decides which one will run
				
		// Alte Pakete suchen und entfernen
		
		long seqNrToCheck = newestSeqNr - buffer.size() + 1;
		
		//jhooks - Remove packets that are older than MAX_TIME_IN_BUFFER
		while (true)
		{
			key = new Long(seqNrToCheck); 
			
			// Ein Paket muss immer im Puffer bleiben
			if ((buffer.size() == 1) || ((System.currentTimeMillis() - ((Long)arrivalTime.get(key)).longValue()) <= MAX_TIME_IN_BUFFER))
			{
				// Mit dem Entfernen aufh鰎en
				break;
			}
			
			Packet removePacket = get(seqNrToCheck);
			if (removePacket instanceof StreamPacket)
			{
				StreamPacket streamPacket = (StreamPacket)removePacket;
				streamPacketSeqNrToBufferSeqNr.remove(new Long(streamPacket.getSeqNr()));
				
				if (removePacket instanceof DataPacket)
				{
					associatedHeaderPacket.remove(removePacket);
					associatedMetadataPacket.remove(removePacket);
				}
			}
			
			arrivalTime.remove(key);
			buffer.remove(key);
						
			seqNrToCheck++;
		}
	}
	
	/**
	 * Searches a packet in the buffer that has a specified age.
	 * 
	 * @param milliseconds The desired age in milliseconds
	 * @return The buffer sequence number of a packet that has approximately the desired age
	 */
	public synchronized long searchPacketWithAge(int milliseconds)
	{
		// Macht nur eine ganz grobe Altersabsch鋞zung (linear)!
		
		if (milliseconds < 0)
		{
			throw new IllegalArgumentException(Messages.getString("Buffer.PACKET_AGE_MUST_BE_POSITIVE")); //$NON-NLS-1$
		}
		
		long oldestSeqNr = newestSeqNr - buffer.size() + 1;
		
		long oldestAge = ((Long)arrivalTime.get(new Long(oldestSeqNr))).longValue();
		long newestAge = ((Long)arrivalTime.get(new Long(newestSeqNr))).longValue();
		
		long desiredAge = newestAge - milliseconds;
		
		if (desiredAge < oldestAge)
		{
			return oldestSeqNr;
		}
		
		return Math.round(oldestSeqNr + ((double)(desiredAge - oldestAge) / (newestAge - oldestAge)) * (newestSeqNr - oldestSeqNr));
	}
	
	/**
	 * Returns the packet with the specified buffer sequence
	 * number. If no such packet is in the buffer, this method
	 * returns <code>null</code>.
	 */
	public synchronized Packet get(long seqNr)
	{
		// Allererstes Paket?
		if ((seqNr == 0) && (buffer.size() == 0))
		{
			// Auf das erste Paket warten
			waitForNewPacket();
			
			return get(seqNr);
		}
		
		// Paket-R點kstand:
		// Sollte wegen dem Zweierkomplement auch mit
		// negativen Zahlen funktionieren
		long packetsArrear = (newestSeqNr - seqNr);
		
		if (packetsArrear >= buffer.size())
		{
			// Das Paket mit dieser Sequenznummer ist nicht mehr
			// im Puffer
			return null;
		}
		
		if (packetsArrear == -1)  //If the sequence number is 1 ahead, wait for a new packet
		{
			// Das n鋍hste eintreffende Paket wird verlangt
			waitForNewPacket();
			
			return get(seqNr);
		}
		
		if (packetsArrear < -1)
		{
			// Eine ung黮tige Sequenznummer wurde verlangt
			throw new IllegalArgumentException(Messages.getString("Buffer.INVALID_SEQUENCE_NUMBER_REQUESTED")); //$NON-NLS-1$
		}
		
		// Gew黱schtes Paket ist im Puffer
		return (Packet)buffer.get(new Long(seqNr));		
	}
	
	// Nur f黵 Paketverz鰃erung ben鰐igt
	
	/**
	 * Returns the arrival time of the packet with the buffer sequence
	 * number <code>seqNr</code>.
	 * 
	 * @see java.lang.System#currentTimeMillis()
	 */
	public synchronized long getArrivalTime(long seqNr)
	{
		if (!arrivalTime.containsKey(new Long(seqNr)))
		{
			// Die Ankunftszeit ist nicht mehr verf黦bar
			return -1;
		}
		else
		{
			return ((Long)arrivalTime.get(new Long(seqNr))).longValue();
		}
	}
	
	/**

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?