📄 session.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - ScalAgent Distributed Technologies * Copyright (C) 1996 - Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): Nicolas Tachker (ScalAgent) */package com.scalagent.kjoram;import com.scalagent.kjoram.jms.*;import com.scalagent.kjoram.util.TimerTask;import java.util.*;import com.scalagent.kjoram.excepts.IllegalStateException;import com.scalagent.kjoram.excepts.*;public class Session{ public static final int SESSION_TRANSACTED = 0; public static final int AUTO_ACKNOWLEDGE = 1; public static final int CLIENT_ACKNOWLEDGE = 2; public static final int DUPS_OK_ACKNOWLEDGE = 3; /** Task for closing the session if it becomes pending. */ private TimerTask closingTask = null; /** <code>true</code> if the session's transaction is scheduled. */ private boolean scheduled = false; /** Timer for replying to expired consumers' requests. */ private com.scalagent.kjoram.util.Timer consumersTimer = null; /** The message listener of the session, if any. */ protected MessageListener messageListener = null; /** The identifier of the session. */ String ident; /** The connection the session belongs to. */ Connection cnx; /** <code>true</code> if the session is transacted. */ boolean transacted; /** The acknowledgement mode of the session. */ int acknowledgeMode; /** <code>true</code> if the session is closed. */ boolean closed = false; /** <code>true</code> if the session is started. */ boolean started = false; /** <code>true</code> if the session's acknowledgements are automatic. */ boolean autoAck; /** Vector of message consumers. */ Vector consumers; /** Vector of message producers. */ Vector producers; /** Vector of queue browsers. */ Vector browsers; /** FIFO queue holding the asynchronous server deliveries. */ com.scalagent.kjoram.util.Queue repliesIn; /** Daemon distributing asynchronous server deliveries. */ SessionDaemon daemon = null; /** Counter of message listeners. */ int msgListeners = 0; /** * Table holding the <code>ProducerMessages</code> holding producers' * messages and destinated to be sent at commit. * <p> * <b>Key:</b> destination name<br> * <b>Object:</b> <code>ProducerMessages</code> */ Hashtable sendings; /** * Table holding the identifiers of the messages delivered per * destination or subscription, and not acknowledged. * <p> * <b>Key:</b> destination or subscription name<br> * <b>Object:</b> <code>MessageAcks</code> instance */ Hashtable deliveries; /** The connection consumer delivering messages to the session, if any. */ ConnectionConsumer connectionConsumer = null; /** * Opens a session. * * @param cnx The connection the session belongs to. * @param transacted <code>true</code> for a transacted session. * @param acknowledgeMode 1 (auto), 2 (client) or 3 (dups ok). * * @exception JMSException In case of an invalid acknowledge mode. */ Session(Connection cnx, boolean transacted, int acknowledgeMode) throws JMSException { if (! transacted && acknowledgeMode != Session.AUTO_ACKNOWLEDGE && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE && acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE) throw new JMSException("Can't create a non transacted session with an" + " invalid acknowledge mode."); this.ident = cnx.nextSessionId(); this.cnx = cnx; this.transacted = transacted; this.acknowledgeMode = acknowledgeMode; autoAck = ! transacted && acknowledgeMode != Session.CLIENT_ACKNOWLEDGE; consumers = new Vector(); producers = new Vector(); browsers = new Vector(); repliesIn = new com.scalagent.kjoram.util.Queue(); sendings = new Hashtable(); deliveries = new Hashtable(); // If the session is transacted and the transactions limited by a timer, // a closing task might be useful. if (transacted && cnx.factoryParameters.txPendingTimer != 0) closingTask = new SessionCloseTask(); cnx.sessions.addElement(this); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG,this + ": created."); } /** Returns a String image of this session. */ public String toString() { return "Sess:" + ident; } /** * API method. * * @exception JMSException Actually never thrown. */ public int getAcknowledgeMode() throws JMSException { return acknowledgeMode; } /** * API method. * * @exception JMSException Actually never thrown. */ public boolean getTransacted() throws JMSException { return transacted; } /** * API method. * * @exception JMSException Actually never thrown. */ public void setMessageListener(MessageListener messageListener) throws JMSException { this.messageListener = messageListener; } /** * API method. * * @exception JMSException Actually never thrown. */ public MessageListener getMessageListener() throws JMSException { return messageListener; } /** * API method. * * @exception IllegalStateException If the session is closed. */ public Message createMessage() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new Message(); } /** * API method. * * @exception IllegalStateException If the session is closed. */ public TextMessage createTextMessage() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new TextMessage(); } /** * API method. * * @exception IllegalStateException If the session is closed. */ public TextMessage createTextMessage(String text) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); TextMessage message = new TextMessage(); message.setText(text); return message; } /** * API method. * * @exception IllegalStateException If the session is closed. */ public BytesMessage createBytesMessage() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new BytesMessage(); } /** * API method. * * @exception IllegalStateException If the session is closed. */ public MapMessage createMapMessage() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new MapMessage(); } /** * API method * * @exception IllegalStateException If the session is closed. */ public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new QueueBrowser(this, (Queue) queue, selector); } /** * API method * * @exception IllegalStateException If the session is closed. */ public QueueBrowser createBrowser(Queue queue) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new QueueBrowser(this, (Queue) queue, null); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the creation fails for any other reason. */ public MessageProducer createProducer(Destination dest) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new MessageProducer(this, (Destination) dest); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the creation fails for any other reason. */ public MessageConsumer createConsumer(Destination dest, String selector, boolean noLocal) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new MessageConsumer(this, (Destination) dest, selector, null, noLocal); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the creation fails for any other reason. */ public MessageConsumer createConsumer(Destination dest, String selector) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new MessageConsumer(this, (Destination) dest, selector); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the creation fails for any other reason. */ public MessageConsumer createConsumer(Destination dest) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new MessageConsumer(this, (Destination) dest, null); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the creation fails for any other reason. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new TopicSubscriber(this, (Topic) topic, name, selector, noLocal); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the creation fails for any other reason. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new TopicSubscriber(this, (Topic) topic, name, null, false); } /** * API method. * * @exception IllegalStateException If the session is closed. */ public Queue createQueue(String queueName) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); return new Queue(queueName); } /** * API method. * * @exception IllegalStateException If the session is closed. * @exception JMSException If the topic creation failed. */ public Topic createTopic(String topicName) throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); // Checks if the topic to retrieve is the administration topic: if (topicName.equals("#AdminTopic")) { try { GetAdminTopicReply reply = (GetAdminTopicReply) cnx.syncRequest(new GetAdminTopicRequest()); if (reply.getId() != null) return new Topic(reply.getId()); else throw new JMSException("AdminTopic could not be retrieved."); } catch (JMSException exc) { throw exc; } catch (Exception exc) { throw new JMSException("AdminTopic could not be retrieved: " + exc); } } return new Topic(topicName); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the request fails for any other reason. */ public TemporaryQueue createTemporaryQueue() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); SessCreateTDReply reply = (SessCreateTDReply) cnx.syncRequest(new SessCreateTQRequest()); String tempDest = reply.getAgentId(); return new TemporaryQueue(tempDest, cnx); } /** * API method. * * @exception IllegalStateException If the session is closed or if the * connection is broken. * @exception JMSException If the request fails for any other reason. */ public TemporaryTopic createTemporaryTopic() throws JMSException { if (closed) throw new IllegalStateException("Forbidden call on a closed session."); SessCreateTDReply reply = (SessCreateTDReply) cnx.syncRequest(new SessCreateTTRequest()); String tempDest = reply.getAgentId(); return new TemporaryTopic(tempDest, cnx); } /** API method. */ public synchronized void run() { int load = repliesIn.size(); com.scalagent.kjoram.messages.Message momMsg; String msgId; String targetName = connectionConsumer.targetName; boolean queueMode = connectionConsumer.queueMode; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "-- " + this + ": loaded with " + load + " message(s) and started."); try { // Processing the current number of messages in the queue: for (int i = 0; i < load; i++) { momMsg = (com.scalagent.kjoram.messages.Message) repliesIn.pop(); msgId = momMsg.getIdentifier(); // If no message listener has been set for the session, denying the // processed message: if (messageListener == null) { JoramTracing.log(JoramTracing.ERROR,this + ": an" + " asynchronous delivery arrived for" + " a non existing session listener:"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -