⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 managedthreadpool.cs

📁 本系统是在asp版《在线文件管理器》的基础上设计制作
💻 CS
字号:
//------------------------------------------------------------------------------
// <copyright company="Telligent Systems">
//     Copyright (c) Telligent Systems Corporation.  All rights reserved.
// </copyright> 
//------------------------------------------------------------------------------

using System;
using System.Threading;
using System.Collections;
using System.Configuration;
using CommunityServer.Configuration;

#region Credits
// Stephen Toub
// stoub@microsoft.com
// 
// ManagedThreadPool.cs
// ThreadPool written in 100% managed code.  Mimics the core functionality of
// the System.Threading.ThreadPool class.
//
// http://www.gotdotnet.com/Community/UserSamples/Details.aspx?SampleGuid=bf59c98e-d708-4f8e-9795-8bae1825c3b6
//
// HISTORY:
// v1.0.1 - Disposes of items remaining in queue when the queue is emptied
//		  - Catches errors thrown during execution of delegates
//		  - Added reset to semaphore, called during empty queue
//		  - Catches errors when unable to dequeue delegates
// v1.0.0 - Original version
// 
// August 27, 2002
// v1.0.1
#endregion

namespace CommunityServer.Components
{
	/// <summary>Implementation of Dijkstra's PV Semaphore based on the Monitor class.</summary>
	public class Semaphore
	{
		#region Member Variables
		/// <summary>The number of units alloted by this semaphore.</summary>
		private int _count;
		#endregion

		#region Construction
		/// <summary> Initialize the semaphore as a binary semaphore.</summary>
		public Semaphore() : this(1) 
		{
		}

		/// <summary> Initialize the semaphore as a counting semaphore.</summary>
		/// <param name="count">Initial number of threads that can take out units from this semaphore.</param>
		/// <exception cref="ArgumentException">Throws if the count argument is less than 1.</exception>
		public Semaphore(int count) 
		{
			if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count");
			_count = count;
		}
		#endregion

		#region Synchronization Operations
		/// <summary>V the semaphore (add 1 unit to it).</summary>
		public void AddOne() { V(); }

		/// <summary>P the semaphore (take out 1 unit from it).</summary>
		public void WaitOne() { P(); }

		/// <summary>P the semaphore (take out 1 unit from it).</summary>
		public void P() 
		{
			// Lock so we can work in peace.  This works because lock is actually
			// built around Monitor.
			lock(this) 
			{
				// Wait until a unit becomes available.  We need to wait
				// in a loop in case someone else wakes up before us.  This could
				// happen if the Monitor.Pulse statements were changed to Monitor.PulseAll
				// statements in order to introduce some randomness into the order
				// in which threads are woken.
				while(_count <= 0) Monitor.Wait(this, Timeout.Infinite);
				_count--;
			}
		}

		/// <summary>V the semaphore (add 1 unit to it).</summary>
		public void V() 
		{
			// Lock so we can work in peace.  This works because lock is actually
			// built around Monitor.
			lock(this) 
			{
				// Release our hold on the unit of control.  Then tell everyone
				// waiting on this object that there is a unit available.
				_count++;
				Monitor.Pulse(this);
			}
		}

		/// <summary>Resets the semaphore to the specified count.  Should be used cautiously.</summary>
		public void Reset(int count)
		{
			lock(this) { _count = count; }
		}
		#endregion
	}

	/// <summary>Managed thread pool.</summary>
	public class ManagedThreadPool
	{
		#region Constants
		/// <summary>Maximum number of threads the thread pool has at its disposal.</summary>
		private static int _maxWorkerThreads = 5;

		#endregion

		#region Member Variables
		/// <summary>Queue of all the callbacks waiting to be executed.</summary>
		static Queue _waitingCallbacks;
		/// <summary>
		/// Used to signal that a worker thread is needed for processing.  Note that multiple
		/// threads may be needed simultaneously and as such we use a semaphore instead of
		/// an auto reset event.
		/// </summary>
		static Semaphore _workerThreadNeeded;
		/// <summary>List of all worker threads at the disposal of the thread pool.</summary>
		static ArrayList _workerThreads;
		/// <summary>Number of threads currently active.</summary>
		static int _inUseThreads;
		#endregion

		#region Construction
		/// <summary>Initialize the thread pool.</summary>
		static ManagedThreadPool()
		{
			try
			{
				_maxWorkerThreads = CSConfiguration.GetConfig().QueuedThreads;
			}
			catch{}
			// Create our thread stores; we handle synchronization ourself
			// as we may run into situtations where multiple operations need to be atomic.
			// We keep track of the threads we've created just for good measure; not actually
			// needed for any core functionality.
			_waitingCallbacks = new Queue();
			_workerThreads = new ArrayList();
			_inUseThreads = 0;

			// Create our "thread needed" event
			_workerThreadNeeded = new Semaphore(0);
			
			// Create all of the worker threads
			for(int i=0; i<_maxWorkerThreads; i++)
			{
				// Create a new thread and add it to the list of threads.
				System.Threading.Thread newThread = new System.Threading.Thread(new ThreadStart(ProcessQueuedItems));
				_workerThreads.Add(newThread);

				// Configure the new thread and start it
				newThread.Name = "ManagedPoolThread #" + i.ToString();
				newThread.IsBackground = true;
				newThread.Start();
			}
		}
		#endregion

		#region Public Methods
		/// <summary>Queues a user work item to the thread pool.</summary>
		/// <param name="callback">
		/// A WaitCallback representing the delegate to invoke when the thread in the 
		/// thread pool picks up the work item.
		/// </param>
		public static void QueueUserWorkItem(WaitCallback callback)
		{
			// Queue the delegate with no state
			QueueUserWorkItem(callback, null);
		}

		/// <summary>Queues a user work item to the thread pool.</summary>
		/// <param name="callback">
		/// A WaitCallback representing the delegate to invoke when the thread in the 
		/// thread pool picks up the work item.
		/// </param>
		/// <param name="state">
		/// The object that is passed to the delegate when serviced from the thread pool.
		/// </param>
		public static void QueueUserWorkItem(WaitCallback callback, object state)
		{
			// Create a waiting callback that contains the delegate and its state.
			// At it to the processing queue, and signal that data is waiting.
			WaitingCallback waiting = new WaitingCallback(callback, state);
			lock(_waitingCallbacks.SyncRoot) { _waitingCallbacks.Enqueue(waiting); }
			_workerThreadNeeded.AddOne();
		}

		/// <summary>Empties the work queue of any queued work items.</summary>
		public static void EmptyQueue()
		{
			lock(_waitingCallbacks.SyncRoot) 
			{ 
				try 
				{
					// Try to dispose of all remaining state
					foreach(object obj in _waitingCallbacks)
					{
						WaitingCallback callback = (WaitingCallback)obj;
						if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
					}
				} 
				catch
				{
					// Make sure an error isn't thrown.
				}

				// Clear all waiting items and reset the number of worker threads currently needed
				// to be 0 (there is nothing for threads to do)
				_waitingCallbacks.Clear();
				_workerThreadNeeded.Reset(0);
			}
		}
		#endregion

		#region Properties
		/// <summary>Gets the number of threads at the disposal of the thread pool.</summary>
		public static int MaxThreads { get { return _maxWorkerThreads; } }
		/// <summary>Gets the number of currently active threads in the thread pool.</summary>
		public static int ActiveThreads { get { return _inUseThreads; } }
		/// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary>
		public static int WaitingCallbacks { get { lock(_waitingCallbacks.SyncRoot) { return _waitingCallbacks.Count; } } }
		#endregion

		#region Thread Processing
		/// <summary>A thread worker function that processes items from the work queue.</summary>
		private static void ProcessQueuedItems()
		{
			// Process indefinitely
			while(true)
			{
				// Get the next item in the queue.  If there is nothing there, go to sleep
				// for a while until we're woken up when a callback is waiting.
				WaitingCallback callback = null;
				while (callback == null)
				{
					// Try to get the next callback available.  We need to lock on the 
					// queue in order to make our count check and retrieval atomic.
					lock(_waitingCallbacks.SyncRoot)
					{
						if (_waitingCallbacks.Count > 0)
						{
							try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); } 
							catch{} // make sure not to fail here
						}
					}

					// If we can't get one, go to sleep.
					if (callback == null) _workerThreadNeeded.WaitOne();
				}

				// We now have a callback.  Execute it.  Make sure to accurately
				// record how many callbacks are currently executing.
				try 
				{
					Interlocked.Increment(ref _inUseThreads);
					callback.Callback(callback.State);
				} 
				catch
				{
					// Make sure we don't throw here.  Errors are not our problem.
				}
				finally 
				{
					Interlocked.Decrement(ref _inUseThreads);
				}
			}
		}
		#endregion

		/// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
		private class WaitingCallback
		{
			#region Member Variables
			/// <summary>Callback delegate for the callback.</summary>
			private WaitCallback _callback;
			/// <summary>State with which to call the callback delegate.</summary>
			private object _state;
			#endregion

			#region Construction
			/// <summary>Initialize the callback holding object.</summary>
			/// <param name="callback">Callback delegate for the callback.</param>
			/// <param name="state">State with which to call the callback delegate.</param>
			public WaitingCallback(WaitCallback callback, object state)
			{
				_callback = callback;
				_state = state;
			}
			#endregion

			#region Properties
			/// <summary>Gets the callback delegate for the callback.</summary>
			public WaitCallback Callback { get { return _callback; } }
			/// <summary>Gets the state with which to call the callback delegate.</summary>
			public object State { get { return _state; } }
			#endregion
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -