📄 commandprocessor.cs
字号:
namespace NCindy.CommandProcessor
{
using NCindy.DataStructures;
using NCindy.Util.Logging;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
public sealed class CommandProcessor
{
private int _busyThreads;
private LockFreeQueue<ICommand> _commands = new LockFreeQueue<ICommand>();
private int _commandsCount;
private volatile bool _isRunning;
private int _maxThreadsCount = (Environment.get_ProcessorCount() * 2);
private int _minThreadsCount = Environment.get_ProcessorCount();
private string _name;
private List<Thread> _threads = new List<Thread>();
private long _totalExecutedCommands;
private int _uselessThreadTimeout = 0x3a98;
private static readonly ILogger log = LogFactory.CreateLogger(MethodBase.GetCurrentMethod().ReflectedType);
public CommandProcessor(string name)
{
this._name = name;
AppDomain.CurrentDomain.ProcessExit += new EventHandler(this.CurrentDomain_ProcessExit);
}
public void AddCommand(ICommand command)
{
this._commands.Enqueue(command);
Interlocked.Increment(ref this._commandsCount);
lock (this)
{
Monitor.Pulse(this);
}
}
public void AddCommand(ICommand command, ThreadPriority threadPriority)
{
this._commands.Enqueue(command);
Interlocked.Increment(ref this._commandsCount);
lock (this)
{
Monitor.Pulse(this);
}
}
private void AddThreadToPool()
{
lock (this._threads)
{
if ((this._maxThreadsCount <= -1) || (this._threads.get_Count() < this._maxThreadsCount))
{
Thread thread = new Thread(new ThreadStart(this.ThreadProc));
thread.Name = this._name;
thread.IsBackground = true;
thread.Priority = ThreadPriority.Normal;
thread.Start();
this._threads.Add(thread);
}
}
}
private void CurrentDomain_ProcessExit(object sender, EventArgs e)
{
this.Stop();
}
private void ProcessAllCommands()
{
while (this._isRunning)
{
ICommand item;
if (!this._commands.TryDequeue(out item))
{
return;
}
Interlocked.Decrement(ref this._commandsCount);
try
{
item.Execute();
}
catch (Exception)
{
}
Interlocked.Increment(ref this._totalExecutedCommands);
}
}
public void Start()
{
this._isRunning = true;
for (int i = 0; i < this._minThreadsCount; i++)
{
this.AddThreadToPool();
}
}
public void Stop()
{
if (this._isRunning)
{
this._isRunning = false;
lock (this)
{
Monitor.PulseAll(this);
}
lock (this._threads)
{
List<Thread>.Enumerator enumerator = this._threads.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
enumerator.get_Current().Abort();
}
}
finally
{
enumerator.Dispose();
}
}
this._threads.Clear();
}
}
private void ThreadProc()
{
try
{
while (this._isRunning)
{
if (this._commandsCount < 1)
{
bool flag = false;
lock (this)
{
flag = !Monitor.Wait(this, this._uselessThreadTimeout);
}
if (flag)
{
lock (this._threads)
{
if (this._threads.get_Count() <= this._minThreadsCount)
{
continue;
}
this._threads.Remove(Thread.CurrentThread);
return;
}
}
}
Interlocked.Increment(ref this._busyThreads);
if (this._busyThreads == this._threads.get_Count())
{
this.AddThreadToPool();
}
this.ProcessAllCommands();
Interlocked.Decrement(ref this._busyThreads);
}
}
catch (ThreadAbortException)
{
}
catch (Exception exception)
{
log.Error("", exception);
}
}
public int ConcurrentThreads
{
get
{
lock (this._threads)
{
return this._threads.get_Count();
}
}
}
public long TotalExecutedCommands
{
get
{
return this._totalExecutedCommands;
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -