buffer.java
来自「java语言开发的P2P流媒体系统」· Java 代码 · 共 560 行 · 第 1/2 页
JAVA
560 行
* Returns the buffer sequence number of the newest packet in the buffer.
*/
public synchronized long getNewestSeqNr()
{
return newestSeqNr;
}
/**
* Translates a {@link StreamPacket} sequence number to a buffer sequence number.
* That's only possible if the concerning packet is in the buffer.
*
* @param streamPacketSeqNr The {@link StreamPacket} sequence number to be translated
* @return The buffer sequence number of the {@link StreamPacket}
*
* @throws PeerException If no {@link StreamPacket} with the specified sequence number is in the buffer
*/
public synchronized long streamPacketSeqNrToBufferSeqNr(long streamPacketSeqNr) throws PeerException
{
Long bufferSeqNr = (Long)streamPacketSeqNrToBufferSeqNr.get(new Long(streamPacketSeqNr));
if (bufferSeqNr != null)
{
// Puffer-Sequenznummer zur點kgeben
return bufferSeqNr.intValue();
}
else
{
throw new PeerException(Messages.getString("Buffer.PACKET_NOT_AVAILABLE")); //$NON-NLS-1$
}
}
/**
* Returns whether there's a valid resume sequence number available.
* <P>
* A valid resume sequence number is available as soon as a {@link StreamPacket}
* is put into the buffer.
*
* @see #getResumeSeqNr()
*/
public synchronized boolean isResumeSeqNrValid()
{
return resumeSeqNrIsValid;
}
/**
* Waits until valid resume data gets available.
*
* @see #isResumeSeqNrValid()
* @see #getResumeSeqNr()
*/
public synchronized void waitForValidResumeData()
{
while (!resumeSeqNrIsValid)
{
waitForNewPacket();
}
}
/**
* Returns the resume sequence number. This number is only valid if
* {@link #isResumeSeqNrValid()} returns <code>true</code>.
* <P>
* The resume sequence number is the packet sequence number of the newest
* {@link StreamPacket} in the buffer.
*
* @see #isResumeSeqNrValid()
*/
public synchronized long getResumeSeqNr()
{
return resumeSeqNr;
}
/**
* Waits <code>maxTimeToWait</code> milliseconds for the
* {@link StreamPacket} with the specified packet sequence number to arrive.
* If the packet is already in the buffer, this method returns immediately.
*
* @return The {@link StreamPacket} with the specified packet sequence number, or
* <code>null</code> if the packet hasn't arrived within the
* specified time
*/
public synchronized Packet getFuturePacket(long streamPacketSeqNr, long maxTimeToWait)
{
// Paket schon verf黦bar?
try
{
return get(streamPacketSeqNrToBufferSeqNr(streamPacketSeqNr));
}
catch (PeerException ex)
{
// Paket noch nicht verf黦bar
long waitUntil = System.currentTimeMillis() + maxTimeToWait;
long time;
while (waitUntil > (time = System.currentTimeMillis()))
{
try
{
wait(waitUntil - time);
}
catch (InterruptedException e)
{
Logger.severe("Buffer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
long seqNr = 0;
boolean packetAvailable = true;
try
{
seqNr = streamPacketSeqNrToBufferSeqNr(streamPacketSeqNr);
}
catch (PeerException e)
{
packetAvailable = false;
}
if (packetAvailable)
{
return get(seqNr);
}
}
return null;
}
}
/**
* Returns whether a packet with the specified age is available.
*/
public synchronized boolean isPacketAvailableWithAge(int milliseconds)
{
if (milliseconds > MAX_TIME_IN_BUFFER)
{
throw new IllegalArgumentException(Messages.getString("Buffer.PACKET_AGE_IS_BIGGER_THAN_MAXIMUM")); //$NON-NLS-1$
}
return (getAgeOfOldestPacket() >= milliseconds);
}
/**
* Returns the age of the oldest packet (in milliseconds)
*/
public synchronized int getAgeOfOldestPacket()
{
// Auf ein Paket warten, falls n鰐ig
while (buffer.size() == 0)
{
waitForNewPacket();
}
return (int)(System.currentTimeMillis() - ((Long)arrivalTime.get(new Long(newestSeqNr - buffer.size() + 1))).longValue());
}
/**
* Waits until a new packet is put into the buffer.
*/
public synchronized void waitForNewPacket()
{
try
{
wait();
}
catch (InterruptedException e)
{
Logger.severe("Buffer", "INTERNAL_ERROR", e); //$NON-NLS-1$ //$NON-NLS-2$
}
}
/**
* Resets the resume sequence number. {@link #isResumeSeqNrValid()}
* will return <code>false</code> after a call to this method.
* <P>
* The resume sequence number will stay invalid until a new {@link StreamPacket} is put
* into the buffer.
*/
public void resetResumeSeqNr()
{
resumeSeqNrIsValid = false;
}
/**
* Tells whether a {@link StreamPacket} with a buffer sequence number
* bigger than <code>seqNr</code> is available.
* <P>
* This is needed by the {@link Transmitter} to determine if a
* {@link StandByPacket} is obsolete or not.
*/
public boolean isNewerStreamPacketAvailable(long seqNr)
{
for (long i=seqNr+1; i <= newestSeqNr; i++)
{
if (get(i) instanceof StreamPacket)
{
return true;
}
}
return false;
}
public synchronized HeaderPacket getNewestHeaderPacket()
{
return newestHeaderPacket;
}
/**
* Returns the newest metadata packet, or <code>null</code> if
* there has never been a metadata packet in the buffer.
*/
public synchronized MetadataPacket getNewestMetadataPacket()
{
return newestMetadataPacket;
}
public synchronized HeaderPacket getAssociatedHeaderPacket(StreamPacket streamPacket)
{
if (streamPacket instanceof MetaStreamPacket)
{
throw new IllegalArgumentException();
}
return (HeaderPacket)associatedHeaderPacket.get(streamPacket);
}
public synchronized MetadataPacket getAssociatedMetadataPacket(StreamPacket streamPacket)
{
if (streamPacket instanceof MetaStreamPacket)
{
throw new IllegalArgumentException();
}
return (MetadataPacket)associatedMetadataPacket.get(streamPacket);
}
public String toString()
{
if (!newestSeqNrIsValid)
{
return "Buffer: Empty"; //$NON-NLS-1$
}
String result = "Buffer: " + buffer.size() + " packets\n"; //$NON-NLS-1$ //$NON-NLS-2$
result += "Newest Buffer Sequence Nr: " + newestSeqNr + "\n"; //$NON-NLS-1$ //$NON-NLS-2$
result += "Newest Header Packet: " + newestHeaderPacket.getSeqNr() + "\n"; //$NON-NLS-1$ //$NON-NLS-2$
result += "Newest Metadata Packet: " + newestMetadataPacket.getSeqNr() + "\n"; //$NON-NLS-1$ //$NON-NLS-2$
result += "Resume Sequence Number: "; //$NON-NLS-1$
if (resumeSeqNrIsValid)
{
result += resumeSeqNr + "\n"; //$NON-NLS-1$
}
else
{
result += "invalid\n"; //$NON-NLS-1$
}
if (newestSeqNrIsValid)
{
result += "Packets in the Buffer: Packet Sequence Nr | Buffer Sequence Nr | Arrival Time\n"; //$NON-NLS-1$
for (long packetNr = newestSeqNr - buffer.size() + 1; packetNr <= newestSeqNr; packetNr++)
{
if (get(packetNr) instanceof StreamPacket)
{
result += ((StreamPacket)get(packetNr)).getSeqNr() + " | " + packetNr + " | " + getArrivalTime(packetNr) + "\n"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
else
{
result += "- | " + packetNr + " | " + getArrivalTime(packetNr) + "\n"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
}
}
return result;
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?