⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 commandprocessor.clr.cs

📁 rudp可靠保障得udp传输
💻 CS
字号:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Runtime.InteropServices;

using Helper.Debug;
using Helper.Threading.Collections;

namespace Helper.CommandProcessor
{
	sealed public class CommandProcessor
	{

		#region Variables

		private string _name;

		private List<Thread> _threads = new List<Thread>();

		private LockFreeQueue<ICommand> _commands = new LockFreeQueue<ICommand>();
		private int _commandsCount = 0;

		volatile private bool _isRunning = false;

		// Internals
		private int _busyThreads = 0;
		private int _maxThreadsCount = Environment.ProcessorCount * 2;
		private int _minThreadsCount = Environment.ProcessorCount;
		private int _uselessThreadTimeout = 15000; // after 15 second inactivity, thread is removed

		// Stats
		private long _totalExecutedCommands = 0;

		#endregion

		#region Constructor

		public CommandProcessor(string name)
		{
			_name = name;
			AppDomain.CurrentDomain.ProcessExit += new EventHandler(CurrentDomain_ProcessExit);
		}

		private void CurrentDomain_ProcessExit(object sender, EventArgs e)
		{
			Stop();
		}

		#endregion

		#region Properties

		/// <summary>
		/// Returns the current number of threads used by this processor.
		/// </summary>
		public int ConcurrentThreads
		{
			get
			{
				lock (_threads)
					return _threads.Count;
			}
		}

		/// <summary>
		/// Returns the total number of executed commands.
		/// </summary>
		public long TotalExecutedCommands
		{
			get
			{
				return _totalExecutedCommands;
			}
		}

		#endregion

		#region Start

		/// <summary>
		/// Start the processor
		/// </summary>
		public void Start()
		{
			_isRunning = true;

			for (int index = 0; index < _minThreadsCount; index++)
				AddThreadToPool();
		}

		#endregion

		#region Stop

		/// <summary>
		/// Stop the processor
		/// </summary>
		public void Stop()
		{

			if (!_isRunning)
				return;

			_isRunning = false;

			// Wake up all the threads
			lock (this)
				Monitor.PulseAll(this);

			// Force all the threads to stop
			lock (_threads)

				foreach (Thread thread in _threads)
					thread.Abort(); // Violently stop the thread.

			_threads.Clear();
		}

		#endregion

		#region AddCommand

		/// <summary>
		/// Add a new command
		/// </summary>
		/// <param name="command">The command to execute</param>
		public void AddCommand(ICommand command)
		{
			_commands.Enqueue(command);

			Interlocked.Increment(ref _commandsCount);

			// Wakeup a processing thread
			lock (this)
				Monitor.Pulse(this);
		}

		/// <summary>
		/// Add a new command
		/// </summary>
		/// <param name="command">The command to execute</param>
		public void AddCommand(ICommand command, ThreadPriority threadPriority)
		{
			// NO MORE USED !!!!!! NO USE OF PRIORITY
			_commands.Enqueue(command);

			Interlocked.Increment(ref _commandsCount);

			// Wakeup a processing thread
			lock (this)
				Monitor.Pulse(this);
		}

		#endregion

		#region Processing

		private void ThreadProc()
		{

			try
			{

				while (_isRunning)
				{

					//---- Wait until we wake up this thread
					if (_commandsCount < 1)
					{
						//-- Wait...
						bool hasTimeOut = false;
						lock (this)
							hasTimeOut = !Monitor.Wait(this, _uselessThreadTimeout);

						//-- The thread pool is NOT heavily used, remove this thread from the pool
						if (hasTimeOut)
						{
							lock (_threads)

								if (_threads.Count > _minThreadsCount)
								{
									_threads.Remove(Thread.CurrentThread);
									return;
								}
							continue;
						}
					}

					//---- Process at least one command, if fast enough can process several commands
					Interlocked.Increment(ref _busyThreads);

					//---- Thread pool is heavily used, add a thread to the pool
					if (_busyThreads == _threads.Count)
						AddThreadToPool();

					ProcessAllCommands();
					Interlocked.Decrement(ref _busyThreads);
				}
			}

			catch (ThreadAbortException)
			{
			}

			catch (Exception exception)
			{
				ExceptionsHandler.Handle(exception);
			}
		}

		/// <summary>
		/// To avoid too much context switching, we try to processing several commands.
		/// But this thread cannot execute during too much time.
		/// </summary>
		private void ProcessAllCommands()
		{
			while (_isRunning)
			{
				//---- Get the next command
				ICommand command;

				if (!_commands.TryDequeue(out command))
					return; // No more commands

				Interlocked.Decrement(ref _commandsCount);

				//---- Execute the command
				try
				{
					command.Execute();
				}

				catch (Exception exception)
				{
					ExceptionsHandler.Handle(exception);
					//Console.WriteLine(exception.Message);
				}

				// Statistics
				Interlocked.Increment(ref _totalExecutedCommands);

				/*

								// Process commands since more than 20 ms, allows context switching
								if (HighResClock.TicksToMs(HighResClock.NowTicks - ticks) > ThreadHelper.ThreadMaxExecutionTime)
								{
									ThreadHelper.StallThread();
									return;
								}*/
			}
		}

		#endregion

		#region AddThreadToPool

		private void AddThreadToPool()
		{
			lock (_threads)
			{

				if (_maxThreadsCount > -1 && _threads.Count >= _maxThreadsCount)
					return;

				Thread newThread = new Thread(new ThreadStart(ThreadProc));
				newThread.Name = _name;
				newThread.IsBackground = true;
				newThread.Priority = ThreadPriority.Normal;
				newThread.Start();

				_threads.Add(newThread);
			}
		}

		#endregion

	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -