📄 threadproxyfactory.h
字号:
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ThreadProxyFactory.h
//
// Factory for creating thread proxies.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace Concurrency
{
namespace details
{
struct IThreadProxyFactory
{
virtual Concurrency::IThreadProxy* RequestProxy(unsigned int stackSize, int contextPriority) =0;
virtual void ReclaimProxy(Concurrency::IThreadProxy* pThreadProxy) =0;
virtual LONG Reference() =0;
virtual LONG Release() =0;
virtual DWORD GetExecutionResourceTls() =0;
virtual ~IThreadProxyFactory() {}
};
class ThreadProxyFactoryManager;
template <typename threadProxy, typename factoryType>
class ThreadProxyFactory : public IThreadProxyFactory
{
public:
/// <summary>
/// Returns a thread proxy from a pool of proxies, creating a new one, if needed.
/// </summary>
/// <param name = stackSize">
/// The required minimum stack size for the thread proxy.
/// </param>
/// <param name = contextPriority">
/// The required thread priority for the thread proxy.
/// </param>
virtual Concurrency::IThreadProxy* RequestProxy(unsigned int stackSize, int contextPriority)
{
// Based on the requested stack size of the proxy, find the index into the pool array.
threadProxy * pProxy = NULL;
for (int i = 0; i < s_numBuckets; ++i)
{
if (stackSize <= s_proxyStackSize[i])
{
pProxy = m_proxyPool[i].Pop();
if (pProxy != NULL)
break;
}
}
if (pProxy == NULL)
{
// Either we couldn't find a proxy in one of the pools, or we received a stack size
// larger than the largest size we pool.
pProxy = Create(stackSize);
}
if (pProxy != NULL)
{
Prepare(pProxy, contextPriority);
}
return pProxy;
}
/// <summary>
/// Returns a proxy back to the idle pool for reuse.
/// </summary>
/// <param name="pThreadProxy">
/// The thread proxy being returned.
/// </param>
virtual void ReclaimProxy(Concurrency::IThreadProxy* pThreadProxy)
{
threadProxy * pProxy = static_cast<threadProxy *>(pThreadProxy);
for (int i = 0; i < s_numBuckets; ++i)
{
if (pProxy->GetStackSize() == s_proxyStackSize[i])
{
// If the pool is close to full, cancel the proxy and allow the thread to exit.
if (m_proxyPool[i].Count() < s_bucketSize)
{
m_proxyPool[i].Push(pProxy);
pProxy = NULL;
}
break;
}
}
if (pProxy != NULL)
{
Retire(pProxy);
}
}
/// <summary>
/// Destroys a thread proxy factory.
/// </summary>
virtual ~ThreadProxyFactory()
{
}
/// <summary>
/// Retires the proxies that are present in the free pools of a thread proxy factory, causing them to run to
/// completion, and exit.
/// </summary>
void RetireThreadProxies()
{
for (int i = 0; i < s_numBuckets; ++i)
{
threadProxy *pProxy = m_proxyPool[i].Flush();
while (pProxy != NULL)
{
threadProxy *pNextProxy = LockFreeStack<threadProxy>::Next(pProxy);
// Retiring the proxy will cause it to perform any necessary cleanup, and exit its dispatch loop.
Retire(pProxy);
pProxy = pNextProxy;
}
}
}
/// <summary>
/// Creates a singleton thread proxy factory.
/// </summary>
static IThreadProxyFactory* CreateFactory(ThreadProxyFactoryManager * pManager)
{
if (s_bucketSize == 0)
{
s_bucketSize = 4*::Concurrency::GetProcessorCount();
}
ASSERT(s_bucketSize >= 4);
return new factoryType(pManager);
}
/// <summary>
/// Initiates shutdown of the factory, and deletes it if shutdown can be completed inline.
/// </summary>
virtual void ShutdownFactory() =0;
/// <summary>
/// Returns a TLS index used by thread proxies and subscribed threads to store per-thread data.
/// </summary>
virtual DWORD GetExecutionResourceTls()
{
return m_executionResourceTlsIndex;
}
protected:
/// <summary>
/// Protected constructor. All construction must go through the CreateFactory API.
/// </summary>
ThreadProxyFactory(ThreadProxyFactoryManager * pManager) :
m_executionResourceTlsIndex(pManager->GetExecutionResourceTls())
{ }
protected:
/// <summary>
/// Creates a new thread proxy.
/// </summary>
/// <param name="stackSize">
/// The stack size for the thread proxy.
/// </param>
virtual threadProxy* Create(unsigned int stackSize) =0;
/// <summary>
/// Retires a thread proxy.
/// </summary>
virtual void Retire(threadProxy *pProxy) =0;
/// <summary>
/// Prepares a thread proxy for use.
/// </summary>
/// <param name="pProxy">
/// The proxy to prepare.
/// </param>
/// <param name="contextPriority">
/// The required thread priority for the thread proxy.
/// <param>
virtual void Prepare(threadProxy *pProxy, int contextPriority)
{
//
// Adjust the thread priority if necessary.
//
if (pProxy->GetPriority() != contextPriority)
{
pProxy->SetPriority(contextPriority);
}
}
// Each factory supports a small number of pools of specific stack sizes.
// Currently supported stack sizes are the default process stack size, 64KB, 256KB and 1024KB (1MB)
static int s_numBuckets;
static int s_bucketSize;
static unsigned int s_proxyStackSize[4];
// Cached copy of the execution resource TLS index that was created by the factory manager.
DWORD m_executionResourceTlsIndex;
// A list that will hold thread proxies.
LockFreeStack<threadProxy> m_proxyPool[4];
};
template <typename threadProxy, typename factoryType> int ThreadProxyFactory<threadProxy, factoryType>::s_numBuckets = 4;
template <typename threadProxy, typename factoryType> int ThreadProxyFactory<threadProxy, factoryType>::s_bucketSize = 0;
template <typename threadProxy, typename factoryType> unsigned int ThreadProxyFactory<threadProxy, factoryType>::s_proxyStackSize[4] = { 0, 64, 256, 1024 };
/// <summary>
/// A factory that creates thread proxies for thread schedulers.
/// </summary>
class FreeThreadProxyFactory : public ThreadProxyFactory<FreeThreadProxy, FreeThreadProxyFactory>
{
public:
/// <summary>
/// Creates a new Win32 free thread proxy factory.
/// </summary>
FreeThreadProxyFactory(ThreadProxyFactoryManager * pManager) :
ThreadProxyFactory(pManager),
m_referenceCount(1),
m_fShutdown(false)
{
}
/// <summary>
/// Destroys a free thread proxy factory.
/// </summary>
virtual ~FreeThreadProxyFactory()
{
}
/// <summary>
/// Adds a reference to the thread proxy factory.
/// </summary>
LONG Reference()
{
return InterlockedIncrement(&m_referenceCount);
}
/// <summary>
/// Releases a reference on the thread proxy factory.
/// </summary>
LONG Release()
{
LONG refCount = InterlockedDecrement(&m_referenceCount);
if (refCount == 0)
delete this;
return refCount;
}
/// <summary>
/// Returns a proxy back to the idle pool, for reuse.
/// </summary>
/// <param name="pThreadProxy">
/// The thread proxy being returned.
/// </param>
virtual void ReclaimProxy(Concurrency::IThreadProxy* pThreadProxy)
{
FreeThreadProxy * pProxy = static_cast<FreeThreadProxy *>(pThreadProxy);
// If the factory has been shut down, we should retire the proxy right away.
if (!m_fShutdown)
{
for (int i = 0; i < s_numBuckets; ++i)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -