📄 amr_pin.cpp
字号:
} while (1);
// unexpected
return -1;
}
HRESULT CAMROutputPin::DeliverPacket(CAMRPacket &packet)
{
// we don't accept packets while aborting...
if (ev_abort.Check()) {
return E_FAIL;
}
// ziskame novy packet na vystup
DataPacket *outp = NULL;
int ret = GetDataPacket(&outp);
if (ret < 0 || !outp) return E_FAIL;
outp->type = DataPacket::PACKET_TYPE_DATA;
// spocitame casy
outp->rtStart = packet.tStart;
outp->rtStop = packet.tStop;
outp->has_time = true;
outp->size = packet.packet_size;
outp->buf = (BYTE*)malloc(outp->size);
memcpy(outp->buf, packet.packet, packet.packet_size);
// each packet is sync point
outp->sync_point = TRUE;
// discontinuity ?
if (discontinuity) {
outp->discontinuity = TRUE;
discontinuity = false;
} else {
outp->discontinuity = FALSE;
}
{
// insert into queue
CAutoLock lck(&lock_queue);
queue.AddTail(outp);
ev_can_read.Set();
// we allow buffering for a few seconds (might be usefull for network playback)
if (GetBufferTime_MS() >= buffer_time_ms) {
ev_can_write.Reset();
}
}
return NOERROR;
}
HRESULT CAMROutputPin::DoEndOfStream()
{
DataPacket *packet = new DataPacket();
// naqueueujeme EOS
{
CAutoLock lck(&lock_queue);
packet->type = DataPacket::PACKET_TYPE_EOS;
queue.AddTail(packet);
ev_can_read.Set();
}
eos_delivered = true;
return NOERROR;
}
void CAMROutputPin::FlushQueue()
{
CAutoLock lck(&lock_queue);
while (queue.GetCount() > 0) {
DataPacket *packet = queue.RemoveHead();
if (packet) delete packet;
}
ev_can_read.Reset();
ev_can_write.Set();
}
HRESULT CAMROutputPin::DeliverDataPacket(DataPacket &packet)
{
IMediaSample *sample;
HRESULT hr = GetDeliveryBuffer(&sample, NULL, NULL, 0);
if (FAILED(hr)) {
return E_FAIL;
}
// we should have enough space in there
long lsize = sample->GetSize();
ASSERT(lsize >= packet.size);
BYTE *buf;
sample->GetPointer(&buf);
//*************************************************************************
//
// data
//
//*************************************************************************
memcpy(buf, packet.buf, packet.size);
sample->SetActualDataLength(packet.size);
// sync point, discontinuity ?
if (packet.sync_point) { sample->SetSyncPoint(TRUE); } else { sample->SetSyncPoint(FALSE); }
if (packet.discontinuity) {
sample->SetDiscontinuity(TRUE);
} else {
sample->SetDiscontinuity(FALSE);
}
// do we have a time stamp ?
if (packet.has_time) {
sample->SetTime(&packet.rtStart, &packet.rtStop);
}
// dorucime
hr = Deliver(sample);
sample->Release();
return hr;
}
__int64 CAMROutputPin::GetBufferTime_MS()
{
CAutoLock lck(&lock_queue);
if (queue.IsEmpty()) return 0;
DataPacket *pfirst;
DataPacket *plast;
__int64 tstart, tstop;
POSITION posf, posl;
tstart = -1;
tstop = -1;
posf = queue.GetHeadPosition();
while (posf) {
pfirst = queue.GetNext(posf);
if (pfirst->type == DataPacket::PACKET_TYPE_DATA && pfirst->rtStart != -1) {
tstart = pfirst->rtStart;
break;
}
}
posl = queue.GetTailPosition();
while (posl) {
plast = queue.GetPrev(posl);
if (plast->type == DataPacket::PACKET_TYPE_DATA && plast->rtStart != -1) {
tstop = plast->rtStart;
break;
}
}
if (tstart == -1 || tstop == -1) return 0;
// calculate time in milliseconds
return (tstop - tstart) / 10000;
}
// vlakno na output
DWORD CAMROutputPin::ThreadProc()
{
while (true) {
DWORD cmd = GetRequest();
switch (cmd) {
case CMD_EXIT: Reply(0); return 0; break;
case CMD_STOP:
{
Reply(0);
}
break;
case CMD_RUN:
{
Reply(0);
if (!IsConnected()) break;
// deliver packets
DWORD cmd2;
BOOL is_first = true;
DataPacket *packet;
HRESULT hr;
do {
if (ev_abort.Check()) break;
hr = NOERROR;
HANDLE events[] = { ev_can_read, ev_abort};
DWORD ret = WaitForMultipleObjects(2, events, FALSE, 10);
if (ret == WAIT_OBJECT_0) {
// look for new packet in queue
{
CAutoLock lck(&lock_queue);
packet = queue.RemoveHead();
if (queue.IsEmpty()) ev_can_read.Reset();
// allow buffering
if (GetBufferTime_MS() < buffer_time_ms) ev_can_write.Set();
}
// bud dorucime End Of Stream, alebo packet
if (packet->type == DataPacket::PACKET_TYPE_EOS) {
DeliverEndOfStream();
} else
if (packet->type == DataPacket::PACKET_TYPE_NEW_SEGMENT) {
hr = DeliverNewSegment(packet->rtStart, packet->rtStop, packet->rate);
} else
if (packet->type == DataPacket::PACKET_TYPE_DATA) {
hr = DeliverDataPacket(*packet);
}
delete packet;
if (FAILED(hr)) {
break;
}
}
} while (!CheckRequest(&cmd2));
}
break;
default:
Reply(E_UNEXPECTED);
break;
}
}
return 0;
}
//-----------------------------------------------------------------------------
//
// CAMRReader class
//
//-----------------------------------------------------------------------------
CAMRReader::CAMRReader(IAsyncReader *rd)
{
ASSERT(rd);
reader = rd;
reader->AddRef();
position = 0;
}
CAMRReader::~CAMRReader()
{
if (reader) {
reader->Release();
reader = NULL;
}
}
int CAMRReader::GetSize(__int64 *avail, __int64 *total)
{
// vraciame velkost
HRESULT hr = reader->Length(total, avail);
if (FAILED(hr)) return -1;
return 0;
}
int CAMRReader::GetPosition(__int64 *pos, __int64 *avail)
{
HRESULT hr;
__int64 total;
hr = reader->Length(&total, avail);
if (FAILED(hr)) return -1;
// aktualna pozicia
if (pos) *pos = position;
return 0;
}
int CAMRReader::Seek(__int64 pos)
{
__int64 avail, total;
GetSize(&avail, &total);
if (pos < 0 || pos >= total) return -1;
// seekneme
position = pos;
return 0;
}
int CAMRReader::Read(void *buf, int size)
{
__int64 avail, total;
GetSize(&avail, &total);
if (position + size > avail) {
return -1;
}
// TODO: Caching here !!!!
//TRACE(" - read, %I64d, %d\n", position, size);
HRESULT hr = reader->SyncRead(position, size, (BYTE*)buf);
if (FAILED(hr)) {
return -1;
}
// update position
position += size;
return 0;
}
DataPacket::DataPacket() :
type(PACKET_TYPE_EOS),
rtStart(0),
rtStop(0),
sync_point(FALSE),
discontinuity(FALSE),
buf(NULL),
size(0)
{
}
DataPacket::~DataPacket()
{
if (buf) {
free(buf);
buf = NULL;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -