📄 asyncio.cpp
字号:
// Also WaitForNext will refuse to block if this is set
// unless m_bWaiting is also set which it will be when we release
// the critsec if there are any outstanding).
m_bFlushing = TRUE;
CAsyncRequest * preq;
while((preq = GetWorkItem()) != 0)
{
preq->Cancel();
PutDoneItem(preq);
}
// now wait for any outstanding requests to complete
if(m_cItemsOut > 0)
{
// can be only one person waiting
ASSERT(!m_bWaiting);
// this tells the completion routine that we need to be
// signalled via m_evAllDone when all outstanding items are
// done. It also tells WaitForNext to continue blocking.
m_bWaiting = TRUE;
}
else
{
// all done
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
m_evDone.Set();
return S_OK;
}
}
ASSERT(m_bWaiting);
// wait without holding critsec
for(;;)
{
m_evAllDone.Wait();
{
// hold critsec to check
CAutoLock lock(&m_csLists);
if(m_cItemsOut == 0)
{
// now we are sure that all outstanding requests are on
// the done list and no more will be accepted
m_bWaiting = FALSE;
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
m_evDone.Set();
return S_OK;
}
}
}
}
// end a flushing state
HRESULT
CAsyncIo::EndFlush()
{
CAutoLock lock(&m_csLists);
m_bFlushing = FALSE;
ASSERT(!m_bWaiting);
// m_evDone might have been set by BeginFlush - ensure it is
// set IFF m_listDone is non-empty
if(m_listDone.GetCount() > 0)
{
m_evDone.Set();
}
else
{
m_evDone.Reset();
}
return S_OK;
}
// start the thread
HRESULT
CAsyncIo::StartThread(void)
{
if(m_hThread)
{
return S_OK;
}
// clear the stop event before starting
m_evStop.Reset();
DWORD dwThreadID;
m_hThread = CreateThread(NULL,
0,
InitialThreadProc,
this,
0,
&dwThreadID);
if(!m_hThread)
{
DWORD dwErr = GetLastError();
return HRESULT_FROM_WIN32(dwErr);
}
return S_OK;
}
// stop the thread and close the handle
HRESULT
CAsyncIo::CloseThread(void)
{
// signal the thread-exit object
m_evStop.Set();
if(m_hThread)
{
WaitForSingleObject(m_hThread, INFINITE);
CloseHandle(m_hThread);
m_hThread = NULL;
}
return S_OK;
}
// manage the list of requests. hold m_csLists and ensure
// that the (manual reset) event hevList is set when things on
// the list but reset when the list is empty.
// returns null if list empty
CAsyncRequest*
CAsyncIo::GetWorkItem()
{
CAutoLock lck(&m_csLists);
CAsyncRequest * preq = m_listWork.RemoveHead();
// force event set correctly
if(m_listWork.GetCount() == 0)
{
m_evWork.Reset();
}
return preq;
}
// get an item from the done list
CAsyncRequest*
CAsyncIo::GetDoneItem()
{
CAutoLock lock(&m_csLists);
CAsyncRequest * preq = m_listDone.RemoveHead();
// force event set correctly if list now empty
// or we're in the final stages of flushing
// Note that during flushing the way it's supposed to work is that
// everything is shoved on the Done list then the application is
// supposed to pull until it gets nothing more
//
// Thus we should not set m_evDone unconditionally until everything
// has moved to the done list which means we must wait until
// cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
if(m_listDone.GetCount() == 0 &&
(!m_bFlushing || m_bWaiting))
{
m_evDone.Reset();
}
return preq;
}
// put an item on the work list - fail if bFlushing
HRESULT
CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
{
CAutoLock lock(&m_csLists);
HRESULT hr;
if(m_bFlushing)
{
hr = VFW_E_WRONG_STATE;
}
else if(m_listWork.AddTail(pRequest))
{
// event should now be in a set state - force this
m_evWork.Set();
// start the thread now if not already started
hr = StartThread();
}
else
{
hr = E_OUTOFMEMORY;
}
return(hr);
}
// put an item on the done list - ok to do this when
// flushing
HRESULT
CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
{
ASSERT(CritCheckIn(&m_csLists));
if(m_listDone.AddTail(pRequest))
{
// event should now be in a set state - force this
m_evDone.Set();
return S_OK;
}
else
{
return E_OUTOFMEMORY;
}
}
// called on thread to process any active requests
void
CAsyncIo::ProcessRequests(void)
{
// lock to get the item and increment the outstanding count
CAsyncRequest * preq = NULL;
for(;;)
{
{
CAutoLock lock(&m_csLists);
preq = GetWorkItem();
if(preq == NULL)
{
// done
return;
}
// one more item not on the done or work list
m_cItemsOut++;
// release critsec
}
preq->Complete();
// regain critsec to replace on done list
{
CAutoLock l(&m_csLists);
PutDoneItem(preq);
if(--m_cItemsOut == 0)
{
if(m_bWaiting)
m_evAllDone.Set();
}
}
}
}
// the thread proc - assumes that DWORD thread param is the
// this pointer
DWORD
CAsyncIo::ThreadProc(void)
{
HANDLE ahev[] = {m_evStop, m_evWork};
for(;;)
{
DWORD dw = WaitForMultipleObjects(2,
ahev,
FALSE,
INFINITE);
if(dw == WAIT_OBJECT_0+1)
{
// requests need processing
ProcessRequests();
}
else
{
// any error or stop event - we should exit
return 0;
}
}
}
// perform a synchronous read request on this thread.
// may not be aligned - so we will have to buffer.
HRESULT
CAsyncIo::SyncRead(
LONGLONG llPos,
LONG lLength,
BYTE * pBuffer)
{
if(IsAligned(llPos) &&
IsAligned(lLength) &&
IsAligned((LONG) pBuffer))
{
LONG cbUnused;
return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
}
// not aligned with requirements - use buffered file handle.
//!!! might want to fix this to buffer the data ourselves?
CAsyncRequest request;
HRESULT hr = request.Request(this,
m_pStream,
llPos,
lLength,
FALSE,
pBuffer,
NULL,
0);
if(FAILED(hr))
{
return hr;
}
return request.Complete();
}
// Return the alignment
HRESULT
CAsyncIo::Alignment(LONG *pAlignment)
{
CheckPointer(pAlignment,E_POINTER);
*pAlignment = Alignment();
return S_OK;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -