⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 batchqueueservice.cs

📁 C# 版本的一个三层商业架构
💻 CS
字号:
using System;
using System.Collections;
using System.Threading;
using System.Messaging;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Channels.Tcp;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using CSLA;
using CSLA.Resources;

namespace CSLA.BatchQueue.Server
{
  /// <summary>
  /// Implements the batch queue service.
  /// </summary>
  /// <remarks>
  /// This class implements the server-side batch queue functionality.
  /// It must be hosted within some process, typically a Windows Service.
  /// It may also be hosted within a console application, which is
  /// useful for testing and debugging.
  /// </remarks>
  public class BatchQueueService
  {
    static TcpServerChannel _channel;
    static MessageQueue _queue;
    static Thread _monitor;
    static System.Timers.Timer _timer;
    static volatile bool _running;
    static Hashtable _activeEntries = Hashtable.Synchronized(new Hashtable());

    static AutoResetEvent _sync = new AutoResetEvent(false);
    static ManualResetEvent _waitToEnd = new ManualResetEvent(false);

    static BatchQueueService()
    {
      _timer = new System.Timers.Timer();
      _timer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);
    }

    /// <summary>
    /// Returns the name of the batch queue server.
    /// </summary>
    public static string Name
    {
      get
      {
        return LISTENER_NAME;
      }
    }

    #region Start/Stop

    /// <summary>
    /// Starts the batch queue.
    /// </summary>
    /// <remarks>
    /// Call this as your Windows Service starts up to
    /// start the batch queue itself. This will cause
    /// the queue to start listening for user requests
    /// via remoting and to the MSMQ for queued jobs.
    /// </remarks>
    public static void Start()
    {
      _timer.AutoReset = false;

      // open and/or create queue
      if(MessageQueue.Exists(QUEUE_NAME))
        _queue = new MessageQueue(QUEUE_NAME);
      else
        _queue = MessageQueue.Create(QUEUE_NAME);
      _queue.MessageReadPropertyFilter.Extension = true;

      // start reading from queue
      _running = true;
      _monitor = new Thread(new ThreadStart(MonitorQueue));
      _monitor.Name = "MonitorQueue";
      _monitor.Start();

      // start remoting for Server.BatchQueue
      if(_channel == null)
      {
        // set application name (virtual root name)
        RemotingConfiguration.ApplicationName = LISTENER_NAME;

        // set up channel
        Hashtable properties = new Hashtable();
        properties["name"] = "TcpBinary";
        properties["port"] = PORT.ToString();
        BinaryServerFormatterSinkProvider svFormatter = 
          new BinaryServerFormatterSinkProvider();

        //TODO: comment the following line for .NET 1.0
        svFormatter.TypeFilterLevel = 
          System.Runtime.Serialization.Formatters.TypeFilterLevel.Full;

        _channel = new TcpServerChannel(properties, svFormatter);
        ChannelServices.RegisterChannel(_channel);

        // register our class
        RemotingConfiguration.RegisterWellKnownServiceType( 
          typeof(Server.BatchQueue), "BatchQueue.rem", 
          WellKnownObjectMode.SingleCall);
      }
      else
        _channel.StartListening(null);

      System.Text.StringBuilder sb = new System.Text.StringBuilder();

      sb.AppendFormat("{0}\n", Strings.GetResourceString("BatchQueueProcStarted"));
      sb.AppendFormat("{0}{1}\n", Strings.GetResourceString("BatchQueueProcName"), Name);
      sb.AppendFormat("{0}{1}\n", Strings.GetResourceString("BatchQueueProcPort"), PORT);
      sb.AppendFormat("{0}{1}\n", Strings.GetResourceString("BatchQueueProcQueue"), QUEUE_NAME);
      sb.AppendFormat("{0}{1}\n", Strings.GetResourceString("BatchQueueProcMaxJobs"), MAX_ENTRIES);
      System.Diagnostics.EventLog.WriteEntry(Name, sb.ToString(), 
        System.Diagnostics.EventLogEntryType.Information);
    }

    /// <summary>
    /// Stops the batch queue.
    /// </summary>
    /// <remarks>
    /// <para>
    /// Call this as your Windows Service is stopping. It
    /// stops the batch queue, causing it to stop listening
    /// for user requests via remoting and to stop looking for
    /// jobs in the MSMQ queue.
    /// </para><para>
    /// NOTE that this method will not return until any
    /// currently active (executing) batch jobs have completed.
    /// </para>
    /// </remarks>
    public static void Stop()
    {
      // stop remoting for Server.BatchQueue
      _channel.StopListening(null);

      // signal to stop working 
      _running = false;
      _sync.Set();
      _monitor.Join();
      // close the queue
      _queue.Close();

      if(_activeEntries.Count > 0)
      {
        // wait for work to end
        _waitToEnd.WaitOne();
      }
    }

    #endregion

    #region Process messages

    // this will be running on a background thread
    static void MonitorQueue()
    {
      while(_running)
      {
        ScanQueue();
        _sync.WaitOne();
      }
    }

    // this runs on a threadpool thread
    static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
    {
      _timer.Stop();
      _sync.Set();
    }

    // this is called by MonitorQueue
    static void ScanQueue()
    {
      Message msg;
      DateTime holdUntil;
      DateTime nextWake = DateTime.MaxValue;

      MessageEnumerator en = _queue.GetMessageEnumerator();
      while(en.MoveNext())
      {
        msg = en.Current;
        holdUntil = 
          DateTime.Parse(System.Text.Encoding.ASCII.GetString(msg.Extension));
        if(holdUntil <= DateTime.Now)
        {
          if(_activeEntries.Count < MAX_ENTRIES)
          {
            ProcessEntry(_queue.ReceiveById(msg.Id));
          }
          else
          {
            // the queue is busy, go to sleep
            return;
          }
        }
        else
        {
          if(holdUntil < nextWake)
          {
            // find the minimum holduntil value
            nextWake = holdUntil;
          }
        }
      }

      if(nextWake < DateTime.MaxValue && nextWake > DateTime.Now)
      {
        // we have at least one entry holding, so set the
        // timer to wake us when it should be run
        _timer.Interval = nextWake.Subtract(DateTime.Now).TotalMilliseconds;
        _timer.Start();
      }
    }

    static void ProcessEntry(Message msg)
    {
      // get entry from queue
      BatchEntry entry;
      BinaryFormatter formatter = new BinaryFormatter();
      entry = (BatchEntry)formatter.Deserialize(msg.BodyStream);

      // make active
      entry.Info.SetStatus(BatchEntryStatus.Active);
      _activeEntries.Add(entry.Info.ID, entry.Info);

      // start processing entry on background thread
      ThreadPool.QueueUserWorkItem(new WaitCallback(entry.Execute));
    }

    // called by BatchEntry when it is done processing so
    // we know that it is complete and we can start another
    // job if needed
    internal static void Deactivate(BatchEntry entry)
    {
      _activeEntries.Remove(entry.Info.ID);
      _sync.Set();
      if(!_running && _activeEntries.Count == 0)
      {
        // indicate that there are no active workers
        _waitToEnd.Set();
      }
    }

    #endregion

    #region Enqueue/Dequeue/LoadEntries

    internal static void Enqueue(BatchEntry entry)
    {
      Message msg = new Message();
      BinaryFormatter f = new BinaryFormatter();

      msg.Label = entry.ToString();
      msg.Priority = entry.Info.Priority;
      msg.Extension = 
        System.Text.Encoding.ASCII.GetBytes(
        entry.Info.HoldUntil.ToString());
      entry.Info.SetMessageID(msg.Id);
      f.Serialize(msg.BodyStream, entry);

        _queue.Send(msg);

      _sync.Set();
    }

    internal static void Dequeue(BatchEntryInfo entry)
    {
      string msgID = null;

      string label = entry.ToString();
      MessageEnumerator en = _queue.GetMessageEnumerator();
      _queue.MessageReadPropertyFilter.Label = true;

      while(en.MoveNext())
      {
        if(en.Current.Label == label)
        {
          // we found a match
          msgID = en.Current.Id;
        }
      }

      if(msgID != null)
        _queue.ReceiveById(msgID);
    }

    internal static void LoadEntries(BatchEntries list)
    {
      // load our list of BatchEntry objects
      BinaryFormatter formatter = new BinaryFormatter();
      Server.BatchEntry entry;

      // get all active entries
      lock(_activeEntries.SyncRoot)
      {
        foreach(DictionaryEntry de in _activeEntries)
        {
          list.Add((BatchEntryInfo)de.Value);
        }
      }

      // get all queued entries
      Message [] msgs = _queue.GetAllMessages();
      foreach(Message msg in msgs)
      {
        entry = (Server.BatchEntry)formatter.Deserialize(msg.BodyStream);
        entry.Info.SetMessageID(msg.Id);
        list.Add(entry.Info);
      }
    }

    #endregion

    #region Utility functions

    static string QUEUE_NAME
    {
      get
      {
        return @".\private$\" + ConfigurationSettings.AppSettings["QueueName"];
      }
    }

    static string LISTENER_NAME
    {
      get
      {
        return ConfigurationSettings.AppSettings["ListenerName"];
      }
    }

    static int PORT
    {
      get
      {
        return Convert.ToInt32(ConfigurationSettings.AppSettings["ListenerPort"]);
      }
    }

    static int MAX_ENTRIES
    {
      get
      {
        return Convert.ToInt32(
          ConfigurationSettings.AppSettings["MaxActiveEntries"]);
      }
    }

    #endregion


  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -