📄 asyncio.cpp
字号:
// prevent further requests being queued.
// Also WaitForNext will refuse to block if this is set
// unless m_bWaiting is also set which it will be when we release
// the critsec if there are any outstanding).
m_bFlushing = TRUE;
CAsyncRequest * preq;
while(preq = GetWorkItem()) {
preq->Cancel();
PutDoneItem(preq);
}
// now wait for any outstanding requests to complete
if (m_cItemsOut > 0) {
// can be only one person waiting
ASSERT(!m_bWaiting);
// this tells the completion routine that we need to be
// signalled via m_evAllDone when all outstanding items are
// done. It also tells WaitForNext to continue blocking.
m_bWaiting = TRUE;
} else {
// all done
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
m_evDone.Set();
return S_OK;
}
}
ASSERT(m_bWaiting);
// wait without holding critsec
for (;;) {
m_evAllDone.Wait();
{
// hold critsec to check
CAutoLock lock(&m_csLists);
if (m_cItemsOut == 0) {
// now we are sure that all outstanding requests are on
// the done list and no more will be accepted
m_bWaiting = FALSE;
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
m_evDone.Set();
return S_OK;
}
}
}
}
// end a flushing state
HRESULT
CAsyncIo::EndFlush()
{
CAutoLock lock(&m_csLists);
m_bFlushing = FALSE;
ASSERT(!m_bWaiting);
// m_evDone might have been set by BeginFlush - ensure it is
// set IFF m_listDone is non-empty
if (m_listDone.GetCount() > 0) {
m_evDone.Set();
} else {
m_evDone.Reset();
}
return S_OK;
}
// start the thread
HRESULT
CAsyncIo::StartThread(void)
{
if (m_hThread) {
return S_OK;
}
// clear the stop event before starting
m_evStop.Reset();
DWORD dwThreadID;
m_hThread = CreateThread(
NULL,
0,
InitialThreadProc,
this,
0,
&dwThreadID);
if (!m_hThread) {
DWORD dwErr = GetLastError();
return HRESULT_FROM_WIN32(dwErr);
}
return S_OK;
}
// stop the thread and close the handle
HRESULT
CAsyncIo::CloseThread(void)
{
// signal the thread-exit object
m_evStop.Set();
if (m_hThread) {
WaitForSingleObject(m_hThread, INFINITE);
CloseHandle(m_hThread);
m_hThread = NULL;
}
return S_OK;
}
// manage the list of requests. hold m_csLists and ensure
// that the (manual reset) event hevList is set when things on
// the list but reset when the list is empty.
// returns null if list empty
CAsyncRequest*
CAsyncIo::GetWorkItem()
{
CAutoLock lck(&m_csLists);
CAsyncRequest * preq = m_listWork.RemoveHead();
// force event set correctly
if (m_listWork.GetCount() == 0) {
m_evWork.Reset();
}
return preq;
}
// get an item from the done list
CAsyncRequest*
CAsyncIo::GetDoneItem()
{
CAutoLock lock(&m_csLists);
CAsyncRequest * preq = m_listDone.RemoveHead();
// force event set correctly if list now empty
// or we're in the final stages of flushing
// Note that during flushing the way it's supposed to work is that
// everything is shoved on the Done list then the application is
// supposed to pull until it gets nothing more
//
// Thus we should not set m_evDone unconditionally until everything
// has moved to the done list which means we must wait until
// cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
if (m_listDone.GetCount() == 0 &&
(!m_bFlushing || m_bWaiting)) {
m_evDone.Reset();
}
return preq;
}
// put an item on the work list - fail if bFlushing
HRESULT
CAsyncIo::PutWorkItem(CAsyncRequest* pRequest)
{
CAutoLock lock(&m_csLists);
HRESULT hr;
if (m_bFlushing) {
hr = VFW_E_WRONG_STATE;
}
else if (m_listWork.AddTail(pRequest)) {
// event should now be in a set state - force this
m_evWork.Set();
// start the thread now if not already started
hr = StartThread();
} else {
hr = E_OUTOFMEMORY;
}
return(hr);
}
// put an item on the done list - ok to do this when
// flushing
HRESULT
CAsyncIo::PutDoneItem(CAsyncRequest* pRequest)
{
ASSERT(CritCheckIn(&m_csLists));
if (m_listDone.AddTail(pRequest)) {
// event should now be in a set state - force this
m_evDone.Set();
return S_OK;
} else {
return E_OUTOFMEMORY;
}
}
// called on thread to process any active requests
void
CAsyncIo::ProcessRequests(void)
{
// lock to get the item and increment the outstanding count
CAsyncRequest * preq = NULL;
for (;;) {
{
CAutoLock lock(&m_csLists);
preq = GetWorkItem();
if (preq == NULL) {
// done
return;
}
// one more item not on the done or work list
m_cItemsOut++;
// release critsec
}
preq->Complete();
// regain critsec to replace on done list
{
CAutoLock l(&m_csLists);
PutDoneItem(preq);
if (--m_cItemsOut == 0) {
if (m_bWaiting) {
m_evAllDone.Set();
}
}
}
}
}
// the thread proc - assumes that DWORD thread param is the
// this pointer
DWORD
CAsyncIo::ThreadProc(void)
{
HANDLE ahev[] = {m_evStop, m_evWork};
for (;;) {
DWORD dw = WaitForMultipleObjects(
2,
ahev,
FALSE,
INFINITE);
if (dw == WAIT_OBJECT_0+1) {
// requests need processing
ProcessRequests();
} else {
// any error or stop event - we should exit
return 0;
}
}
}
// perform a synchronous read request on this thread.
// may not be aligned - so we will have to buffer.
HRESULT
CAsyncIo::SyncRead(
LONGLONG llPos,
LONG lLength,
BYTE* pBuffer)
{
if (IsAligned(llPos) &&
IsAligned(lLength) &&
IsAligned((LONG) pBuffer)) {
LONG cbUnused;
return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL);
}
// not aligned with requirements - use buffered file handle.
//!!! might want to fix this to buffer the data ourselves?
CAsyncRequest request;
HRESULT hr = request.Request(
this,
m_pStream,
llPos,
lLength,
FALSE,
pBuffer,
NULL,
0);
if (FAILED(hr)) {
return hr;
}
return request.Complete();
}
// Return the alignment
HRESULT
CAsyncIo::Alignment(LONG *pl)
{
*pl = Alignment();
return S_OK;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -