buffer.java

来自「java语言开发的P2P流媒体系统」· Java 代码 · 共 560 行 · 第 1/2 页

JAVA
560
字号
	 * Returns the buffer sequence number of the newest packet in the buffer.
	 */
	public synchronized long getNewestSeqNr()
	{	
		return newestSeqNr;
	}
	
	/**
	 * Translates a {@link StreamPacket} sequence number to a buffer sequence number.
	 * That's only possible if the concerning packet is in the buffer.
	 *
	 * @param streamPacketSeqNr The {@link StreamPacket} sequence number to be translated
	 * @return The buffer sequence number of the {@link StreamPacket}
	 *
	 * @throws PeerException If no {@link StreamPacket} with the specified sequence number is in the buffer
	 */
	public synchronized long streamPacketSeqNrToBufferSeqNr(long streamPacketSeqNr) throws PeerException
	{
		Long bufferSeqNr = (Long)streamPacketSeqNrToBufferSeqNr.get(new Long(streamPacketSeqNr));
		
		if (bufferSeqNr != null)
		{
			// Puffer-Sequenznummer zur點kgeben
			return bufferSeqNr.intValue();
		}
		else
		{
			throw new PeerException(Messages.getString("Buffer.PACKET_NOT_AVAILABLE")); //$NON-NLS-1$
		}
	}
	
	/**
	 * Returns whether there's a valid resume sequence number available.
	 * <P>
	 * A valid resume sequence number is available as soon as a {@link StreamPacket}
	 * is put into the buffer.
	 * 
	 * @see #getResumeSeqNr()
	 */
	public synchronized boolean isResumeSeqNrValid()
	{
		return resumeSeqNrIsValid;
	}
	
	/**
	 * Waits until valid resume data gets available.
	 * 
	 * @see #isResumeSeqNrValid()
	 * @see #getResumeSeqNr()
	 */
	public synchronized void waitForValidResumeData()
	{
		while (!resumeSeqNrIsValid)
		{
			waitForNewPacket();
		}		
	}
	
	/**
	 * Returns the resume sequence number. This number is only valid if
	 * {@link #isResumeSeqNrValid()} returns <code>true</code>.
	 * <P>
	 * The resume sequence number is the packet sequence number of the newest
	 * {@link StreamPacket} in the buffer.
	 * 
	 * @see #isResumeSeqNrValid()
	 */
	public synchronized long getResumeSeqNr()
	{
		return resumeSeqNr;
	}
	
	/**
	 * Waits <code>maxTimeToWait</code> milliseconds for the
	 * {@link StreamPacket} with the specified packet sequence number to arrive.
	 * If the packet is already in the buffer, this method returns immediately.
	 *
	 * @return The {@link StreamPacket} with the specified packet sequence number, or
	 *         <code>null</code> if the packet hasn't arrived within the
	 *         specified time 
	 */
	public synchronized Packet getFuturePacket(long streamPacketSeqNr, long maxTimeToWait)
	{
		// Paket schon verf黦bar?
		try
		{
			return get(streamPacketSeqNrToBufferSeqNr(streamPacketSeqNr));
		}
		catch (PeerException ex)
		{
			// Paket noch nicht verf黦bar
			
			long waitUntil = System.currentTimeMillis() + maxTimeToWait;
			
			long time;
			
			while (waitUntil > (time = System.currentTimeMillis()))
			{
				try
				{
					wait(waitUntil - time);
				}
				catch (InterruptedException e)
				{
					Logger.severe("Buffer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
				}
				
				long seqNr = 0;
				boolean packetAvailable = true;
				
				try
				{
					seqNr = streamPacketSeqNrToBufferSeqNr(streamPacketSeqNr);
				}
				catch (PeerException e)
				{
					packetAvailable = false;
				}
				
				if (packetAvailable)
				{
					return get(seqNr);
				}
			}
			
			return null;
		}
	}	
	
	/**
	 * Returns whether a packet with the specified age is available.
	 */
	public synchronized boolean isPacketAvailableWithAge(int milliseconds)
	{
		if (milliseconds > MAX_TIME_IN_BUFFER)
		{
			throw new IllegalArgumentException(Messages.getString("Buffer.PACKET_AGE_IS_BIGGER_THAN_MAXIMUM")); //$NON-NLS-1$
		}
		
		return (getAgeOfOldestPacket() >= milliseconds);		
	}
	
	/**
	 * Returns the age of the oldest packet (in milliseconds)
	 */
	public synchronized int getAgeOfOldestPacket()
	{
		// Auf ein Paket warten, falls n鰐ig
		while (buffer.size() == 0)
		{
			waitForNewPacket();
		}
		
		return (int)(System.currentTimeMillis() - ((Long)arrivalTime.get(new Long(newestSeqNr - buffer.size() + 1))).longValue());
	}
	
	/**
	 * Waits until a new packet is put into the buffer.
	 */
	public synchronized void waitForNewPacket()
	{
		try
		{
			wait();
		}
		catch (InterruptedException e)
		{
			Logger.severe("Buffer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
		}
	}
	
	/**
	 * Resets the resume sequence number. {@link #isResumeSeqNrValid()}
	 * will return <code>false</code> after a call to this method.
	 * <P>
	 * The resume sequence number will stay invalid until a new {@link StreamPacket} is put
	 * into the buffer.
	 */
	public void resetResumeSeqNr()
	{
		resumeSeqNrIsValid = false;
	}
	
	/**
	 * Tells whether a {@link StreamPacket} with a buffer sequence number
	 * bigger than <code>seqNr</code> is available.
	 * <P>
	 * This is needed by the {@link Transmitter} to determine if a
	 * {@link StandByPacket} is obsolete or not.
	 */
	public boolean isNewerStreamPacketAvailable(long seqNr)
	{
		for (long i=seqNr+1; i <= newestSeqNr; i++)
		{
			if (get(i) instanceof StreamPacket)
			{
				return true;
			}
		}
		
		return false;
	}
	
	public synchronized HeaderPacket getNewestHeaderPacket()
	{
		return newestHeaderPacket; 
	}
	
	/**
	 * Returns the newest metadata packet, or <code>null</code> if
	 * there has never been a metadata packet in the buffer.
	 */
	public synchronized MetadataPacket getNewestMetadataPacket()
	{
		return newestMetadataPacket;
	}
	
	public synchronized HeaderPacket getAssociatedHeaderPacket(StreamPacket streamPacket)
	{
		if (streamPacket instanceof MetaStreamPacket)
		{
			throw new IllegalArgumentException();
		}
		
		return (HeaderPacket)associatedHeaderPacket.get(streamPacket);
	}
	
	public synchronized MetadataPacket getAssociatedMetadataPacket(StreamPacket streamPacket)
	{
		if (streamPacket instanceof MetaStreamPacket)
		{
			throw new IllegalArgumentException();
		}
		
		return (MetadataPacket)associatedMetadataPacket.get(streamPacket);
	}
	
	public String toString()
	{
		if (!newestSeqNrIsValid)
		{
			return "Buffer: Empty"; //$NON-NLS-1$
		}
		
		String result = "Buffer: " + buffer.size() + " packets\n"; //$NON-NLS-1$ //$NON-NLS-2$
		result += "Newest Buffer Sequence Nr: " + newestSeqNr + "\n"; //$NON-NLS-1$ //$NON-NLS-2$
		result += "Newest Header Packet: " + newestHeaderPacket.getSeqNr() + "\n"; //$NON-NLS-1$ //$NON-NLS-2$
		result += "Newest Metadata Packet: " + newestMetadataPacket.getSeqNr() + "\n"; //$NON-NLS-1$ //$NON-NLS-2$
		result += "Resume Sequence Number: "; //$NON-NLS-1$
		
		if (resumeSeqNrIsValid)
		{
			result += resumeSeqNr + "\n"; //$NON-NLS-1$
		}
		else
		{
			result += "invalid\n"; //$NON-NLS-1$
		}
		
		if (newestSeqNrIsValid)
		{
			result += "Packets in the Buffer: Packet Sequence Nr | Buffer Sequence Nr | Arrival Time\n"; //$NON-NLS-1$
		
			for (long packetNr = newestSeqNr - buffer.size() + 1; packetNr <= newestSeqNr; packetNr++)
			{
				if (get(packetNr) instanceof StreamPacket)
				{
					result += ((StreamPacket)get(packetNr)).getSeqNr() + " | " + packetNr + " | " + getArrivalTime(packetNr) + "\n"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 
				}
				else
				{
					result += "- | " + packetNr + " | " + getArrivalTime(packetNr) + "\n"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
				}
			}
		}
		
		return result;
	}
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?