📄 databaserequestqueue.cs
字号:
//===============================================================================
// Microsoft patterns & practices
// Mobile Client Software Factory - July 2006
//===============================================================================
// Copyright Microsoft Corporation. All rights reserved.
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY
// OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT
// LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
// FITNESS FOR A PARTICULAR PURPOSE.
//===============================================================================
// The example companies, organizations, products, domain names,
// e-mail addresses, logos, people, places, and events depicted
// herein are fictitious. No association with any real company,
// organization, product, domain name, email address, logo, person,
// places, or events is intended or should be inferred.
//===============================================================================
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Practices.Mobile.DataAccess;
using System.Data.Common;
using System.Data;
using System.Globalization;
namespace Microsoft.Practices.Mobile.DisconnectedAgent
{
public class DatabaseRequestQueue : IRequestQueue
{
private string tableName;
private Database database;
/// <summary>
/// Constructor which creates a DatabaseRequestQueue that is persisted in the given database.
/// It uses the tablename "Requests" for persist the requests.
/// </summary>
/// <param name="database">Database for the queue persistance.</param>
public DatabaseRequestQueue(Database database)
: this(database, "Requests")
{
}
/// <summary>
/// Constructor which creates a DatabaseRequestQueue that is persisted in the given table name and database.
/// </summary>
/// <param name="database">Database for the queue persistance.</param>
/// <param name="tableName">Customized tablename to persist the requests.</param>
public DatabaseRequestQueue(Database database, string tableName)
{
Guard.ArgumentNotNull(database, "database");
this.tableName = tableName;
this.database = database;
if (!this.database.TableExists(tableName))
CreateDatabaseTables();
}
private void CreateDatabaseTables()
{
string sql = @"CREATE TABLE " + tableName + @"(
RequestId uniqueidentifier PRIMARY KEY NOT NULL,
Endpoint NVARCHAR(300) NOT NULL,
OnlineProxyType NVARCHAR(1000) NOT NULL,
MethodName NVARCHAR(200) NOT NULL,
Tag NVARCHAR(300),
Stamps int NOT NULL,
ReturnTargetType NVARCHAR(1000),
ReturnMethodName NVARCHAR(200),
ExceptionTargetType NVARCHAR(1000),
ExceptionMethodName NVARCHAR(200),
MaxRetries int NOT NULL,
CallParameters NTEXT,
Sequence int IDENTITY(1,1),
QueuedDate DATETIME NOT NULL,
MessageID UNIQUEIDENTIFIER NULL,
Expiration DATETIME NULL
)";
database.ExecuteNonQuery(sql);
//Create Indexes
sql = @"CREATE INDEX request_sequence_asc ON " + tableName + "(Sequence)";
database.ExecuteNonQuery(sql);
sql = @"CREATE INDEX request_sequence_desc ON " + tableName + "(Sequence desc)";
database.ExecuteNonQuery(sql);
}
/// <summary>
/// Event fired when a request is enqueued.
/// </summary>
public event EventHandler RequestEnqueued;
/// <summary>
/// This method is used to enqueue a request in the queue.
/// </summary>
/// <param name="request">Request to be enqueued.</param>
public void Enqueue(Request request)
{
Guard.ArgumentNotNull(request, "request");
Guard.ArgumentNotNull(request.Behavior, "request.Behavior");
Guard.ArgumentNotNull(request.MethodName, "request.MethodName");
Guard.ArgumentNotNull(request.OnlineProxyType, "request.OnlineProxyType");
Guard.ArgumentNotNull(request.Endpoint, "request.Endpoint");
request.Behavior.QueuedDate = DateTime.Now;
string serializedParameters = null;
if (request.CallParameters != null)
serializedParameters = CallParametersSerializer.Serialize(request.CallParameters);
string sql = @"INSERT " + tableName + @"
(
RequestId,
Endpoint,
OnlineProxyType,
MethodName,
Tag,
Stamps,
ReturnTargetType,
ReturnMethodName,
ExceptionTargetType,
ExceptionMethodName,
MaxRetries,
CallParameters,
QueuedDate,
MessageID,
Expiration)
VALUES(
" + database.BuildParameterName("RequestId") + @",
" + database.BuildParameterName("Endpoint") + @",
" + database.BuildParameterName("OnlineProxyType") + @",
" + database.BuildParameterName("MethodName") + @",
" + database.BuildParameterName("Tag") + @",
" + database.BuildParameterName("Stamps") + @",
" + database.BuildParameterName("ReturnTargetType") + @",
" + database.BuildParameterName("ReturnMethodName") + @",
" + database.BuildParameterName("ExceptionTargetType") + @",
" + database.BuildParameterName("ExceptionMethodName") + @",
" + database.BuildParameterName("MaxRetries") + @",
" + database.BuildParameterName("CallParameters") + @",
" + database.BuildParameterName("QueuedDate") + @",
" + database.BuildParameterName("MessageID") + @",
" + database.BuildParameterName("Expiration") + @")";
string returnTargetType = null;
string returnMethodName = null;
string exceptionTargetType = null;
string exceptionMethodName = null;
if (request.Behavior.ReturnCallback != null)
{
returnTargetType = request.Behavior.ReturnCallback.TargetType.AssemblyQualifiedName;
returnMethodName = request.Behavior.ReturnCallback.TargetMethodName;
}
if (request.Behavior.ExceptionCallback != null)
{
exceptionTargetType = request.Behavior.ExceptionCallback.TargetType.AssemblyQualifiedName;
exceptionMethodName = request.Behavior.ExceptionCallback.TargetMethodName;
}
DbParameter[] parameters = new DbParameter[] {
database.CreateParameter("RequestId", DbType.Guid, 1, request.RequestId),
database.CreateParameter("Endpoint", DbType.String, 300, request.Endpoint),
database.CreateParameter("OnlineProxyType", DbType.String, 1000, request.OnlineProxyType.AssemblyQualifiedName),
database.CreateParameter("MethodName", DbType.String, 200, request.MethodName),
database.CreateParameter("Tag", DbType.String, 300, request.Behavior.Tag),
database.CreateParameter("Stamps", DbType.Int16, 1, request.Behavior.Stamps),
database.CreateParameter("ReturnTargetType", DbType.String, 1000, returnTargetType),
database.CreateParameter("ReturnMethodName", DbType.String, 200, returnMethodName),
database.CreateParameter("ExceptionTargetType", DbType.String, 1000, exceptionTargetType),
database.CreateParameter("ExceptionMethodName", DbType.String, 200, exceptionMethodName),
database.CreateParameter("MaxRetries", DbType.Int16, 1, request.Behavior.MaxRetries),
((ISqlDatabase)database).CreateParameter("CallParameters", SqlDbType.NText, serializedParameters.Length+1, serializedParameters),
database.CreateParameter("QueuedDate", DbType.DateTime, 0, request.Behavior.QueuedDate),
database.CreateParameter("MessageID", DbType.Guid, 0, request.Behavior.MessageId),
database.CreateParameter("Expiration", DbType.DateTime, 0, request.Behavior.Expiration)
};
database.ExecuteNonQuery(sql, parameters);
if (RequestEnqueued != null)
RequestEnqueued(this, EventArgs.Empty);
}
/// <summary>
/// Gets the total count of requests in the queue.
/// </summary>
/// <returns>Count of requests in the queue.</returns>
public int GetCount()
{
string sql = @"SELECT Count(*) FROM " + tableName;
return (int)database.ExecuteScalar(sql);
}
private static string GetColumns()
{
return @" RequestId,
Endpoint,
OnlineProxyType,
MethodName,
Tag,
Stamps,
ReturnTargetType,
ReturnMethodName,
ExceptionTargetType,
ExceptionMethodName,
MaxRetries,
CallParameters,
QueuedDate,
MessageID,
Expiration ";
}
/// <summary>
/// Gets the next request in the queue.
/// </summary>
/// <returns>The next request in the queue.</returns>
public Request GetNextRequest()
{
Request request = null;
string sql = "SELECT" + GetColumns() + @"
FROM " + tableName + @"
ORDER BY sequence";
using (DbDataReader reader = database.ExecuteReader(sql))
{
if (reader.Read())
{
request = GetRequestFromReader(reader);
}
}
return request;
}
/// <summary>
/// This method creates an iterator for the queue validating the
/// tag field. The iterator contains all the requests with tags like the given tag.
/// </summary>
/// <param name="tag">String to be searched in the requests tags.</param>
/// <returns>IEnumerable with the matching requests </Requests></returns>
public IEnumerable<Request> GetRequests(string tag)
{
string sql = "SELECT" + GetColumns() + @"
FROM " + tableName + @"
WHERE Tag = " + database.BuildParameterName("Tag") + @"
ORDER BY Sequence";
DbParameter tagParameter = database.CreateParameter("Tag", DbType.String, 300, tag);
using (DbDataReader reader = database.ExecuteReader(sql, tagParameter))
{
while (reader.Read())
{
Request request = GetRequestFromReader(reader);
yield return request;
}
}
}
/// <summary>
/// This method creates an iterator for the requests in the queue that have
/// equal or more stamps than the given value.
/// </summary>
/// <param name="stampsEqualOrMoreThan">Minimum stamps value for a request to be dispatched.</param>
/// <returns>IEnumerable with the matching requests.</returns>
public IEnumerable<Request> GetRequests(uint stampsEqualOrMoreThan)
{
string sql = "SELECT" + GetColumns() + @"
FROM " + tableName + @"
WHERE Stamps >= " + database.BuildParameterName("Stamps") + @"
ORDER BY Sequence";
DbParameter priceParameter = database.CreateParameter("Stamps", DbType.UInt32, 0, stampsEqualOrMoreThan);
using (DbDataReader reader = database.ExecuteReader(sql, priceParameter))
{
while (reader.Read())
{
Request request = GetRequestFromReader(reader);
yield return request;
}
}
}
/// <summary>
/// This method creates an iterator for all the requests in the queue.
/// </summary>
/// <returns>IEnumerable with all the requests.</returns>
public IEnumerable<Request> GetRequests()
{
string sql = "SELECT" + GetColumns() + @"
FROM " + tableName + @"
ORDER BY Sequence";
using (DbDataReader reader = database.ExecuteReader(sql))
{
while (reader.Read())
{
Request request = GetRequestFromReader(reader);
yield return request;
}
}
}
/// <summary>
/// This method searches the request matching the given requestId in the queue.
/// </summary>
/// <param name="requestId">Request Id to be searched.</param>
/// <returns>The matching request or null if it's not found.</returns>
public Request GetRequest(Guid requestId)
{
string sql = "SELECT" + GetColumns() + @"
FROM " + tableName + @"
WHERE RequestId = " + database.BuildParameterName("RequestId");
DbParameter tagParameter = database.CreateParameter("RequestId", DbType.Guid, 1, requestId);
using (DbDataReader reader = database.ExecuteReader(sql, tagParameter))
{
while (reader.Read())
{
Request request = GetRequestFromReader(reader);
return request;
}
}
return null;
}
/// <summary>
/// This method removes an specific request from the queue.
/// If the request doesn't exist in the queue it doesn't do anything.
/// </summary>
/// <param name="request">Request to be removed from the queue.</param>
public void Remove(Request request)
{
Guard.ArgumentNotNull(request, "request");
string sql = @"DELETE FROM " + tableName + @" WHERE RequestId = " + database.BuildParameterName("RequestId");
DbParameter[] parameters = new DbParameter[] {
database.CreateParameter("RequestId", DbType.Guid, 1, request.RequestId),
};
database.ExecuteNonQuery(sql, parameters);
}
private static Request GetRequestFromReader(DbDataReader reader)
{
Request request = new Request();
request.Behavior = new OfflineBehavior();
request.RequestId = (Guid)reader["RequestId"];
request.Endpoint = (string)reader["Endpoint"];
request.MethodName = (string)reader["MethodName"];
request.OnlineProxyType = Type.GetType((string)reader["OnlineProxyType"]);
request.Behavior.Stamps = (ushort)(int)reader["Stamps"];
if (!(reader["Tag"] is DBNull))
request.Behavior.Tag = (string)reader["Tag"];
if (!(reader["ReturnTargetType"] is DBNull))
request.Behavior.ReturnCallback = GetCallbackFrom(reader, "ReturnTargetType", "ReturnMethodName");
if (!(reader["ExceptionTargetType"] is DBNull))
request.Behavior.ExceptionCallback = GetCallbackFrom(reader, "ExceptionTargetType", "ExceptionMethodName");
if (!(reader["MaxRetries"] is DBNull))
request.Behavior.MaxRetries = Convert.ToUInt32(reader["MaxRetries"], CultureInfo.InvariantCulture);
if (!(reader["CallParameters"] is DBNull))
{
string serializedParameters = (string)reader["CallParameters"];
request.CallParameters = CallParametersSerializer.Deserialize(serializedParameters);
}
request.Behavior.QueuedDate = (DateTime)reader["QueuedDate"];
if (!(reader["MessageId"] is DBNull))
request.Behavior.MessageId = (Guid)reader["MessageId"];
if (!(reader["Expiration"] is DBNull))
request.Behavior.Expiration = (DateTime)reader["Expiration"];
return request;
}
private static CommandCallback GetCallbackFrom(DbDataReader reader, string typeNameColumn, string methodNameColumn)
{
string typename = (string)reader[typeNameColumn];
Type type = Type.GetType(typename);
return new CommandCallback(type, (string)reader[methodNameColumn]);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -