📄 requestmanager.cs
字号:
//===============================================================================
// Microsoft patterns & practices
// Mobile Client Software Factory - July 2006
//===============================================================================
// Copyright Microsoft Corporation. All rights reserved.
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY
// OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT
// LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
// FITNESS FOR A PARTICULAR PURPOSE.
//===============================================================================
// The example companies, organizations, products, domain names,
// e-mail addresses, logos, people, places, and events depicted
// herein are fictitious. No association with any real company,
// organization, product, domain name, email address, logo, person,
// places, or events is intended or should be inferred.
//===============================================================================
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Practices.Mobile.EndpointCatalog;
using Microsoft.Practices.Mobile.DataAccess;
using System.Threading;
using System.Net;
namespace Microsoft.Practices.Mobile.DisconnectedAgent
{
public sealed class RequestManager
{
private static volatile RequestManager instance;
private static object syncRoot = new object();
private static object syncLockObject = new object();
private IRequestQueue requestQueue;
private IRequestQueue deadLetterQueue;
private IRequestDispatcher requestDispatcher;
private IEndpointCatalog endpointCatalog;
private IConnectionMonitor connectionMonitor;
private DispatchRequestThread thread;
private bool running;
private Queue<Command> dispatchCommands;
/// <summary>
/// Event fired when the RequestManager has tried to dispatch a Request.
/// It could be a successful, failing or expired dispatch.
/// </summary>
public event EventHandler<RequestDispatchedEventArgs> RequestDispatched;
private RequestManager()
{
}
/// <summary>
/// This method initializes the RequestManager for request dispatching with all the configurable elements.
/// </summary>
/// <param name="requestQueue">Queue for the pending requests.</param>
/// <param name="deadLetterQueue">Queue for the failures.</param>
/// <param name="connectionMonitor">IConnectionMonitor for connectivity events and info.</param>
/// <param name="requestDispatcher">Dispatcher to be used by the manager.</param>
/// <param name="endpointCatalog">Catalog to get the Endpoint especific information for the dispatching.</param>
public void Initialize(IRequestQueue requestQueue, IRequestQueue deadLetterQueue, IConnectionMonitor connectionMonitor,
IRequestDispatcher requestDispatcher, IEndpointCatalog endpointCatalog)
{
this.requestQueue = requestQueue;
this.requestDispatcher = requestDispatcher;
this.endpointCatalog = endpointCatalog;
this.deadLetterQueue = deadLetterQueue;
this.connectionMonitor = connectionMonitor;
this.dispatchCommands = new Queue<Command>();
running = false;
}
/// <summary>
/// This method initializes the RequestManager in a simpler way, providing only the minimum set of elements required.
/// It creates a default WebServiceRequestDispatcher and "Requests" queue for the pending queue and "Dlq" queue
/// for failures in the given database.
/// </summary>
/// <param name="catalog">Catalog to get the Endpoint especific information for the dispatching.</param>
/// <param name="connectionMonitor">IConnectionMonitor for connectivity events an state info.</param>
/// <param name="queuesDatabase">Database to create the queues.</param>
public void Initialize(IEndpointCatalog catalog, IConnectionMonitor connectionMonitor, Database queuesDatabase)
{
Initialize(
new DatabaseRequestQueue(queuesDatabase, "Requests"),
new DatabaseRequestQueue(queuesDatabase, "Dlq"),
connectionMonitor,
new WebServiceRequestDispatcher(catalog),
catalog);
}
/// <summary>
/// Starts the automatic dispatch regarding connectivity events and enquing.
/// </summary>
public void StartAutomaticDispatch()
{
if (running) return;
running = true;
requestQueue.RequestEnqueued += OnAutomaticDispatch;
connectionMonitor.ConnectionStatusChanged += OnConnectionDispatch;
if (connectionMonitor.IsConnected)
DispatchAllPendingRequestsForConnection();
}
/// <summary>
/// Stops the automatic dispatch.
/// </summary>
public void StopAutomaticDispatch()
{
if (!running) return;
connectionMonitor.ConnectionStatusChanged -= OnConnectionDispatch;
requestQueue.RequestEnqueued -= OnAutomaticDispatch;
running = false;
}
private void OnAutomaticDispatch(object sender, EventArgs args)
{
if (connectionMonitor.IsConnected)
{
DispatchAllPendingRequestsForConnection();
}
}
private void OnConnectionDispatch(object sender, EventArgs args)
{
if (thread != null)
{
//Stop current dispatching
thread.Stop();
thread.Join(2000);
}
if (connectionMonitor.IsConnected)
{
//Continue with new connection status
DispatchAllPendingRequestsForConnection();
}
}
/// <summary>
/// Dispatch all the requests in the pending queue having an endpoint valid address
/// regardless of stamp's value and connectivity price.
/// </summary>
public void DispatchAllPendingRequestsForConnection()
{
DispatchRequests(new Command(this, "GetRequestsForCurrentConnectionPrice"));
}
/// <summary>
/// Dispatch all the requests in the pending queue that match (like) the given tag
/// having endpoint valid address regardless of stamp's value.
/// </summary>
/// <param name="tag"></param>
public void DispatchPendingRequestsByTag(string tag)
{
DispatchRequests(new Command(this, "GetPendingRequestsByTag", tag));
}
/// <summary>
/// Dispatch all the requests in the pending queue having endpoint valid address regardless of stamp's value.
/// </summary>
public void DispatchAllPendingRequests()
{
DispatchRequests(new Command(this, "GetPendingRequests"));
}
/// <summary>
/// Dispatch the request if it exists in the pending queue having an endpoint valid address
/// regardless of stamp's value.
/// </summary>
public void DispatchRequest(Request request)
{
Guard.ArgumentNotNull(request, "request");
DispatchRequests(new Command(this, "GetRequest", request.RequestId));
}
private void DispatchRequests(Command command)
{
lock (syncLockObject)
{
dispatchCommands.Enqueue(command);
if (thread == null)
{
thread = new DispatchRequestThread(this, dispatchCommands, syncLockObject);
thread.Start();
}
}
}
private class DispatchRequestThread
{
private Thread thread;
private RequestManager manager;
private Queue<Command> commands;
private bool stop;
private object syncLockObject;
public DispatchRequestThread(RequestManager manager, Queue<Command> commands, object syncLockObject)
{
this.manager = manager;
this.commands = commands;
this.syncLockObject = syncLockObject;
}
public void Start()
{
thread = new Thread(new ThreadStart(DispatchRequests));
thread.Start();
}
private void DispatchRequests()
{
try
{
while (stop == false)
{
IEnumerable<Request> requests;
lock (syncLockObject)
{
if (commands.Count == 0)
break;
requests = GetRequestsFromNextCommand();
}
foreach (Request request in requests)
{
//If there is not a connection stops the thread and doesn't remove the command from the queue.
if (!manager.connectionMonitor.IsConnected)
stop = true;
if (!manager.endpointCatalog.EndpointExists(request.Endpoint))
{
manager.deadLetterQueue.Enqueue(request);
manager.requestQueue.Remove(request);
continue;
}
if (stop)
break;
try
{
if (manager.endpointCatalog.AddressExistsForEndpoint(request.Endpoint, manager.connectionMonitor.CurrentNetwork))
{
DispatchRequestInternal(request, manager.connectionMonitor.CurrentNetwork);
}
}
catch
{
//If there was an exception getting the currentNetwork or Endpoint
//the connection has been lost between the check and the get / DispatchRequestInternal
stop = true;
break;
}
}
lock (syncLockObject)
{
if (!stop)
commands.Dequeue();
}
}
}
finally
{
manager.thread = null;
}
}
private void DispatchRequestInternal(Request request, string networkName)
{
DispatchResult result = DispatchResult.Expired;
if (request.Behavior.Expiration == null || request.Behavior.Expiration >= DateTime.Now)
{
try
{
result = manager.requestDispatcher.Dispatch(request, networkName);
}
catch (WebException)
{
//Any WebException should fail the dispatch request
result = DispatchResult.Failed;
}
if (result == DispatchResult.Failed)
{
if (!manager.connectionMonitor.IsConnected)
return;
manager.deadLetterQueue.Enqueue(request);
}
}
manager.requestQueue.Remove(request);
if (manager.RequestDispatched != null)
manager.RequestDispatched(this, new RequestDispatchedEventArgs(request, result));
}
private IEnumerable<Request> GetRequestsFromNextCommand()
{
if (commands.Peek().CommandName == "GetRequest")
{
Request requestByGuid = (Request)commands.Peek().Execute();
if (requestByGuid == null)
return new Request[] { };
else
return new Request[] { requestByGuid };
}
else
return (IEnumerable<Request>)commands.Peek().Execute();
}
public bool Join(int timeout)
{
return thread.Join(timeout);
}
public void Stop()
{
stop = true;
}
}
/// <summary>
/// Gets the pending requests queue.
/// </summary>
public IRequestQueue RequestQueue
{
get { return requestQueue; }
}
/// <summary>
/// Gets the failed requests queue.
/// </summary>
public IRequestQueue DeadLetterQueue
{
get { return deadLetterQueue; }
}
/// <summary>
/// Wait for the dispatch inner thread finish.
/// </summary>
/// <param name="timeout">Milliseconds to wait.</param>
/// <returns>true if the thread has finished before timeout, otherwise false.</returns>
public bool Join(int timeout)
{
lock (this)
{
if (thread == null)
return true;
return thread.Join(timeout);
}
}
/// <summary>
/// Gets the automatic dispatch running state of the RequestManager.
/// </summary>
public bool Running
{
get { return running; }
}
/// <summary>
/// Stops the dispatching thread.
/// </summary>
public void Stop()
{
lock (this)
{
if (thread != null)
thread.Stop();
}
}
#region Command Methods
public Request GetRequest(Guid requestId)
{
return requestQueue.GetRequest(requestId);
}
public IEnumerable<Request> GetPendingRequestsByTag(string tag)
{
foreach (Request request in requestQueue.GetRequests(tag))
{
yield return request;
}
}
public IEnumerable<Request> GetPendingRequests()
{
foreach (Request request in requestQueue.GetRequests())
{
yield return request;
}
}
public IEnumerable<Request> GetRequestsForCurrentConnectionPrice()
{
uint? price = null;
try
{
price = connectionMonitor.CurrentConnectionPrice;
}
catch
{
price = null;
thread.Stop();
} //Exception getting the currentConnectionPrice seems connectivity issue
if (price != null)
foreach (Request request in requestQueue.GetRequests((uint)price))
{
yield return request;
}
}
#endregion
/// <summary>
/// Gets the single instance for the RequestManager.
/// If there is not an instance it creates a new one and returns it.
/// </summary>
public static RequestManager Instance
{
get
{
if (instance == null)
{
lock (syncRoot)
{
if (instance == null)
instance = new RequestManager();
}
}
return instance;
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -