📄 queue.pas
字号:
unit Queue;
interface
uses
{$IFDEF LINUX}
SysUtils, Types, Classes, QGraphics, QControls, QForms, QDialogs,
QDBCtrls, QStdCtrls, QComCtrls, QExtCtrls, QButtons, OdacClx,
{$ELSE}
Windows, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
DBCtrls, ExtCtrls, StdCtrls, ComCtrls, Buttons, OdacVcl,
{$ENDIF}
{$IFDEF FPC}
LResources,
{$ENDIF}
DB, DBAccess, Ora, OraObjects, OraAQ, OdacDemoForm, OdacDemoFrame;
type
TQueueFrame = class(TOdacDemoFrame)
ToolBar: TPanel;
Panel2: TPanel;
meLog: TMemo;
QueueRaw: TOraQueue;
QueueObject: TOraQueue;
Panel1: TPanel;
btEnqueue: TSpeedButton;
btDequeue: TSpeedButton;
Panel3: TPanel;
rbUseRaw: TRadioButton;
rbUseObject: TRadioButton;
btListen: TSpeedButton;
Panel4: TPanel;
cbNotification: TCheckBox;
Panel5: TPanel;
Label1: TLabel;
Label2: TLabel;
Label3: TLabel;
edPayloadDeptNo: TEdit;
edPayloadDeptName: TEdit;
Panel6: TPanel;
Panel7: TPanel;
Label4: TLabel;
Label5: TLabel;
edPriority: TEdit;
edWaitTimeout: TEdit;
Panel8: TPanel;
btCreateTable: TSpeedButton;
btAlterTable: TSpeedButton;
btDropTable: TSpeedButton;
btCreateQueue: TSpeedButton;
Panel9: TPanel;
Label6: TLabel;
edTableName: TEdit;
Label7: TLabel;
edPayloadType: TEdit;
Label8: TLabel;
Label9: TLabel;
edComment: TEdit;
btReadTableProperties: TSpeedButton;
QueueTable: TOraQueueTable;
QueueAdmin: TOraQueueAdmin;
cbSortList: TComboBox;
btAlterQueue: TSpeedButton;
btDropQueue: TSpeedButton;
btStatrQueue: TSpeedButton;
btStopQueue: TSpeedButton;
btReadQueueProperties: TSpeedButton;
Panel10: TPanel;
Label10: TLabel;
edQueueName: TEdit;
Label11: TLabel;
edQueueTable: TEdit;
Label12: TLabel;
edMaxRetries: TEdit;
edQueueComment: TEdit;
Label13: TLabel;
Panel11: TPanel;
OraSession2: TOraSession;
QueueRaw2: TOraQueue;
QueueObject2: TOraQueue;
procedure btEnqueueClick(Sender: TObject);
procedure btDequeueClick(Sender: TObject);
procedure edWaitTimeoutChange(Sender: TObject);
procedure btListenClick(Sender: TObject);
procedure cbNotificationClick(Sender: TObject);
procedure QueueMessage(Sender: TOraQueue; const MessageId: String;
const MessageProperties: TQueueMessageProperties);
procedure btCreateTableClick(Sender: TObject);
procedure btAlterTableClick(Sender: TObject);
procedure btDropTableClick(Sender: TObject);
procedure btReadTablePropertiesClick(Sender: TObject);
procedure btCreateQueueClick(Sender: TObject);
procedure btAlterQueueClick(Sender: TObject);
procedure btDropQueueClick(Sender: TObject);
procedure btStatrQueueClick(Sender: TObject);
procedure btStopQueueClick(Sender: TObject);
procedure btReadQueuePropertiesClick(Sender: TObject);
private
procedure ConnectSessions;
public
procedure Initialize; override;
end;
implementation
{$IFNDEF FPC}
{$IFDEF CLR}
{$R *.nfm}
{$ENDIF}
{$IFDEF WIN32}
{$R *.dfm}
{$ENDIF}
{$IFDEF LINUX}
{$R *.xfm}
{$ENDIF}
{$ENDIF}
procedure TQueueFrame.Initialize;
begin
inherited;
QueueRaw.Session := Connection as TOraSession;
QueueObject.Session := Connection as TOraSession;
QueueTable.Session := Connection as TOraSession;
QueueAdmin.Session := Connection as TOraSession;
edWaitTimeout.Text := IntToStr(QueueRaw2.DequeueOptions.WaitTimeout);
{$IFNDEF LINUX}
cbNotification.Checked := QueueRaw2.AsyncNotification;
{$ELSE}
cbNotification.Checked := False;
cbNotification.Enabled := False;
{$ENDIF}
end;
procedure TQueueFrame.ConnectSessions;
begin
Connection.Connect;
if not OraSession2.Connected then begin
AssignConnectionTo(OraSession2);
OraSession2.LoginPrompt := False;
OraSession2.Connect;
end;
end;
procedure TQueueFrame.btEnqueueClick(Sender: TObject);
var
MsgProperties: TQueueMessageProperties;
StringPayload, MsgId: string;
ObjectPayload: TOraObject;
begin
MsgProperties := TQueueMessageProperties.Create;
try
MsgProperties.Priority := StrToInt(edPriority.Text);
if rbUseRaw.Checked then begin
StringPayload := edPayloadDeptNo.Text + ', ' + edPayloadDeptName.Text;
MsgId := QueueRaw.Enqueue(StringPayload, MsgProperties);
meLog.Lines.Add('ENQUEUE: MsgId: ' + MsgId + ' Payload: ' + StringPayload);
end
else begin
ObjectPayload := TOraObject.Create;
try
OdacForm.OraSession.Connect; // session must be connected before using its OCISvcCtx property
ObjectPayload.AllocObject(OdacForm.OraSession.OCISvcCtx, 'OBJ_ODAC_DEPT');
try
ObjectPayload.AttrAsInteger['DeptNo'] := StrToInt(edPayloadDeptNo.Text);
except
on E: EConvertError do begin
edPayloadDeptNo.SetFocus;
ShowMessage('DeptNo must be an integer value');
exit;
end;
end;
ObjectPayload.AttrAsString['DeptName'] := edPayloadDeptName.Text;
MsgId := QueueObject.Enqueue(ObjectPayload, MsgProperties);
meLog.Lines.Add('ENQUEUE: MsgId: ' + MsgId + ' Payload: ' +
IntToStr(ObjectPayload.AttrAsInteger['DeptNo']) + ', ' +
ObjectPayload.AttrAsString['DeptName']);
finally
ObjectPayload.Free;
end;
end;
finally
MsgProperties.Free;
end;
end;
procedure TQueueFrame.btDequeueClick(Sender: TObject);
var
StringPayload, MsgId: string;
ObjectPayload: TOraObject;
begin
ConnectSessions;
if rbUseRaw.Checked then begin
MsgId := QueueRaw2.Dequeue(StringPayload);
meLog.Lines.Add('DEQUEUE: MsgId: ' + MsgId + ' Payload: ' + StringPayload);
end
else begin
ObjectPayload := TOraObject.Create;
try
MsgId := QueueObject2.Dequeue(ObjectPayload);
meLog.Lines.Add('DEQUEUE: MsgId: ' + MsgId + ' Payload: ' +
IntToStr(ObjectPayload.AttrAsInteger['DeptNo']) + ', ' +
ObjectPayload.AttrAsString['DeptName']);
finally
ObjectPayload.Free;
end;
end;
end;
procedure TQueueFrame.edWaitTimeoutChange(Sender: TObject);
var
WaitTimeout: integer;
begin
try
WaitTimeout := StrToInt(edWaitTimeout.Text);
except
Exit;
end;
QueueRaw2.DequeueOptions.WaitTimeout := WaitTimeout;
QueueObject2.DequeueOptions.WaitTimeout := WaitTimeout;
end;
procedure TQueueFrame.btListenClick(Sender: TObject);
var
Agents: TQueueAgents;
Agent: TQueueAgent;
begin
ConnectSessions;
Agents := TQueueAgents.Create;
try
Agents.Add.Address := 'QU_ODAC_RAW';
Agents.Add.Address := 'QU_ODAC_DEPT';
Agent := TQueueAgent.Create;
try
QueueRaw2.Listen(Agents, Agent, StrToInt(edWaitTimeout.Text));
ShowMessage('A message is available in queue ' + Agent.Address);
finally
Agent.Free;
end;
finally
Agents.Free;
end;
end;
procedure TQueueFrame.cbNotificationClick(Sender: TObject);
begin
{$IFNDEF LINUX}
if cbNotification.Checked then
ConnectSessions;
try
QueueRaw2.AsyncNotification := cbNotification.Checked;
QueueObject2.AsyncNotification := cbNotification.Checked;
except
cbNotification.Checked := QueueRaw2.AsyncNotification;
raise;
end;
{$ENDIF}
end;
procedure TQueueFrame.QueueMessage(Sender: TOraQueue;
const MessageId: String;
const MessageProperties: TQueueMessageProperties);
begin
meLog.Lines.Add('ASYNCNOTIFICATION: MsgId: ' + MessageId + ' Queue: ' + Sender.QueueName);
end;
procedure TQueueFrame.btCreateTableClick(Sender: TObject);
begin
QueueTable.QueueTableName := edTableName.Text;
QueueTable.PayloadTypeName := edPayloadType.Text;
case cbSortList.ItemIndex of
0:
QueueTable.SortList := qslEnqueueTime;
1:
QueueTable.SortList := qslPriority;
2:
QueueTable.SortList := qslPriorityEnqueueTime;
3:
QueueTable.SortList := qslEnqueueTimePriority;
end;
QueueTable.Comment := edComment.Text;
QueueTable.CreateQueueTable;
end;
procedure TQueueFrame.btAlterTableClick(Sender: TObject);
begin
QueueTable.QueueTableName := edTableName.Text;
QueueTable.AlterQueueTable(edComment.Text);
end;
procedure TQueueFrame.btDropTableClick(Sender: TObject);
begin
QueueTable.QueueTableName := edTableName.Text;
QueueTable.DropQueueTable;
end;
procedure TQueueFrame.btReadTablePropertiesClick(Sender: TObject);
begin
QueueTable.QueueTableName := edTableName.Text;
QueueTable.ReadQueueTableProperties;
edPayloadType.Text := QueueTable.PayloadTypeName;
case QueueTable.SortList of
qslEnqueueTime:
cbSortList.ItemIndex := 0;
qslPriority:
cbSortList.ItemIndex := 1;
qslPriorityEnqueueTime:
cbSortList.ItemIndex := 2;
qslEnqueueTimePriority:
cbSortList.ItemIndex := 3;
end;
edComment.Text := QueueTable.Comment;
end;
procedure TQueueFrame.btCreateQueueClick(Sender: TObject);
begin
QueueAdmin.QueueName := edQueueName.Text;
QueueAdmin.QueueTableName := edQueueTable.Text;
QueueAdmin.MaxRetries := StrToInt(edMaxRetries.Text);
QueueAdmin.Comment := edQueueComment.Text;
QueueAdmin.CreateQueue;
end;
procedure TQueueFrame.btAlterQueueClick(Sender: TObject);
begin
QueueAdmin.QueueName := edQueueName.Text;
QueueAdmin.AlterQueue(StrToInt(edMaxRetries.Text), AQ_NOT_DEFINED,
AQ_NOT_DEFINED, edQueueComment.Text);
end;
procedure TQueueFrame.btDropQueueClick(Sender: TObject);
begin
QueueAdmin.QueueName := edQueueName.Text;
QueueAdmin.DropQueue;
end;
procedure TQueueFrame.btStatrQueueClick(Sender: TObject);
begin
QueueAdmin.QueueName := edQueueName.Text;
QueueAdmin.StartQueue;
end;
procedure TQueueFrame.btStopQueueClick(Sender: TObject);
begin
QueueAdmin.QueueName := edQueueName.Text;
QueueAdmin.StopQueue;
end;
procedure TQueueFrame.btReadQueuePropertiesClick(Sender: TObject);
begin
QueueAdmin.QueueName := edQueueName.Text;
QueueAdmin.ReadQueueProperties;
edQueueTable.Text := QueueAdmin.QueueTableName;
edMaxRetries.Text := IntToStr(QueueAdmin.MaxRetries);
edQueueComment.Text := QueueAdmin.Comment;
end;
initialization
{$IFDEF FPC}
{$I Queue.lrs}
{$ENDIF}
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -