jmssession.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,204 行 · 第 1/2 页
JAVA
1,204 行
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * * @author Scott Ferguson */package com.caucho.jms.connection;import com.caucho.jms.message.*;import com.caucho.jms.queue.*;import com.caucho.util.Alarm;import com.caucho.util.L10N;import com.caucho.util.ThreadPool;import com.caucho.util.ThreadTask;import javax.jms.*;import javax.jms.IllegalStateException;import javax.naming.*;import javax.transaction.*;import javax.transaction.xa.*;import java.io.Serializable;import java.util.ArrayList;import java.util.logging.Level;import java.util.logging.Logger;/** * Manages the JMS session. */public class JmsSession implements XASession, ThreadTask, XAResource{ protected static final Logger log = Logger.getLogger(JmsSession.class.getName()); protected static final L10N L = new L10N(JmsSession.class); private static final long SHUTDOWN_WAIT_TIME = 10000; private boolean _isXA; private Xid _xid; private TransactionManager _tm; private boolean _isTransacted; private int _acknowledgeMode; private ClassLoader _classLoader; private ConnectionImpl _connection; private final ArrayList<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>(); private MessageFactory _messageFactory = new MessageFactory(); private MessageListener _messageListener; private boolean _isAsynchronous; // 4.4.1 - client's responsibility private Thread _thread; // transacted messages private ArrayList<TransactedMessage> _transactedMessages; // true if the listener thread is running private volatile boolean _isRunning; private volatile boolean _isClosed; private volatile boolean _hasMessage; public JmsSession(ConnectionImpl connection, boolean isTransacted, int ackMode, boolean isXA) throws JMSException { _classLoader = Thread.currentThread().getContextClassLoader(); _connection = connection; _isXA = isXA; _isTransacted = isTransacted; _acknowledgeMode = ackMode; if (isTransacted) _acknowledgeMode = 0; else { switch (ackMode) { case CLIENT_ACKNOWLEDGE: case DUPS_OK_ACKNOWLEDGE: case AUTO_ACKNOWLEDGE: _acknowledgeMode = ackMode; break; default: try { log.warning(L.l("JmsSession {0} is an illegal acknowledge mode", ackMode)); // XXX: tck // throw new JMSException(L.l("{0} is an illegal acknowledge mode", ackMode)); log.warning(L.l("JmsSession {0} is an illegal acknowledge mode", ackMode)); _acknowledgeMode = AUTO_ACKNOWLEDGE; } catch (Exception e) { log.log(Level.FINE, e.toString(), e); } break; } } try { InitialContext ic = new InitialContext(); _tm = (TransactionManager) ic.lookup("java:comp/TransactionManager"); } catch (Exception e) { log.log(Level.FINER, e.toString(), e); } _connection.addSession(this); } /** * Returns the connection. */ ConnectionImpl getConnection() { return _connection; } /** * Returns the ClassLoader. */ ClassLoader getClassLoader() { return _classLoader; } /** * Returns the connection's clientID */ public String getClientID() throws JMSException { return _connection.getClientID(); } /** * Returns true if the connection is active. */ public boolean isActive() { return ! _isClosed && _connection.isActive(); } /** * Returns true if the connection is active. */ boolean isStopping() { return _connection.isStopping(); } /** * Returns true if the session is in a transaction. */ public boolean getTransacted() throws JMSException { checkOpen(); return _isTransacted; } /** * Returns the acknowledge mode for the session. */ public int getAcknowledgeMode() throws JMSException { checkOpen(); return _acknowledgeMode; } /** * Returns the message listener */ public MessageListener getMessageListener() throws JMSException { checkOpen(); return _messageListener; } /** * Sets the message listener */ public void setMessageListener(MessageListener listener) throws JMSException { checkOpen(); _messageListener = listener; setAsynchronous(); } /** * Set true for a synchronous session. */ void setAsynchronous() { _isAsynchronous = true; notifyMessageAvailable(); } /** * Set true for a synchronous session. */ boolean isAsynchronous() { return _isAsynchronous; } /** * Creates a new byte[] message. */ public BytesMessage createBytesMessage() throws JMSException { checkOpen(); return new BytesMessageImpl(); } /** * Creates a new map message. */ public MapMessage createMapMessage() throws JMSException { checkOpen(); return new MapMessageImpl(); } /** * Creates a message. Used when only header info is important. */ public Message createMessage() throws JMSException { checkOpen(); return new MessageImpl(); } /** * Creates an object message. */ public ObjectMessage createObjectMessage() throws JMSException { checkOpen(); return new ObjectMessageImpl(); } /** * Creates an object message. * * @param obj a serializable message. */ public ObjectMessage createObjectMessage(Serializable obj) throws JMSException { checkOpen(); ObjectMessage msg = createObjectMessage(); msg.setObject(obj); return msg; } /** * Creates a stream message. */ public StreamMessage createStreamMessage() throws JMSException { checkOpen(); return new StreamMessageImpl(); } /** * Creates a text message. */ public TextMessage createTextMessage() throws JMSException { checkOpen(); return new TextMessageImpl(); } /** * Creates a text message. */ public TextMessage createTextMessage(String message) throws JMSException { checkOpen(); TextMessage msg = createTextMessage(); msg.setText(message); return msg; } /** * Creates a consumer to receive messages. * * @param destination the destination to receive messages from. */ public MessageConsumer createConsumer(Destination destination) throws JMSException { checkOpen(); return createConsumer(destination, null, false); } /** * Creates a consumer to receive messages. * * @param destination the destination to receive messages from. * @param messageSelector query to restrict the messages. */ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkOpen(); return createConsumer(destination, messageSelector, false); } /** * Creates a consumer to receive messages. * * @param destination the destination to receive messages from. * @param messageSelector query to restrict the messages. */ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { checkOpen(); if (destination == null) throw new InvalidDestinationException(L.l("destination is null. Destination may not be null for Session.createConsumer")); MessageConsumerImpl consumer; if (destination instanceof AbstractQueue) { AbstractQueue dest = (AbstractQueue) destination; consumer = new MessageConsumerImpl(this, dest, messageSelector, noLocal); } else if (destination instanceof AbstractTopic) { AbstractTopic dest = (AbstractTopic) destination; consumer = new TopicSubscriberImpl(this, dest, messageSelector, noLocal); } else throw new InvalidDestinationException(L.l("'{0}' is an unknown destination. The destination must be a Resin JMS Destination.", destination)); addConsumer(consumer); return consumer; } /** * Creates a producer to produce messages. * * @param destination the destination to send messages from. */ public MessageProducer createProducer(Destination destination) throws JMSException { checkOpen(); if (destination == null) { return new MessageProducerImpl(this, null); } if (! (destination instanceof AbstractDestination)) throw new InvalidDestinationException(L.l("'{0}' is an unknown destination. The destination must be a Resin JMS destination for Session.createProducer.", destination)); AbstractDestination dest = (AbstractDestination) destination; return new MessageProducerImpl(this, dest); } /** * Creates a QueueBrowser to browse messages in the queue. * * @param queue the queue to send messages to. */ public QueueBrowser createBrowser(Queue queue) throws JMSException { checkOpen(); return createBrowser(queue, null); } /** * Creates a QueueBrowser to browse messages in the queue. * * @param queue the queue to send messages to. */ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkOpen(); if (queue == null) throw new InvalidDestinationException(L.l("queue is null. Queue may not be null for Session.createBrowser")); if (! (queue instanceof AbstractQueue)) throw new InvalidDestinationException(L.l("'{0}' is an unknown queue. The queue must be a Resin JMS Queue for Session.createBrowser.", queue)); return new MessageBrowserImpl(this, (AbstractQueue) queue, messageSelector); } /** * Creates a new queue. */ public Queue createQueue(String queueName) throws JMSException { checkOpen(); return _connection.createQueue(queueName); } /** * Creates a temporary queue. */ public TemporaryQueue createTemporaryQueue() throws JMSException { checkOpen(); return new TemporaryQueueImpl(this); } /** * Creates a new topic. */ public Topic createTopic(String topicName) throws JMSException { checkOpen(); return _connection.createTopic(topicName); } /** * Creates a temporary topic. */ public TemporaryTopic createTemporaryTopic() throws JMSException { checkOpen(); return new TemporaryTopicImpl(this); } /** * Creates a durable subscriber to receive messages. * * @param topic the topic to receive messages from. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkOpen(); if (getClientID() == null) throw new JMSException(L.l("connection may not create a durable subscriber because it does not have an assigned ClientID.")); return createDurableSubscriber(topic, name, null, false); } /** * Creates a subscriber to receive messages. * * @param topic the topic to receive messages from. * @param messageSelector topic to restrict the messages. * @param noLocal if true, don't receive messages we've sent */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { checkOpen(); if (topic == null) throw new InvalidDestinationException(L.l("destination is null. Destination may not be null for Session.createDurableSubscriber")); if (! (topic instanceof AbstractTopic)) throw new InvalidDestinationException(L.l("'{0}' is an unknown destination. The destination must be a Resin JMS Destination.", topic)); AbstractTopic topicImpl = (AbstractTopic) topic; if (_connection.getDurableSubscriber(name) != null) { // jms/2130 // unsubscribe(name); /* throw new JMSException(L.l("'{0}' is already an active durable subscriber", name)); */ } AbstractQueue queue = topicImpl.createSubscriber(this, name, noLocal); TopicSubscriberImpl consumer; consumer = new TopicSubscriberImpl(this, topicImpl, queue, messageSelector, noLocal); _connection.putDurableSubscriber(name, consumer); addConsumer(consumer); return consumer; } /** * Unsubscribe from a durable subscription. */ public void unsubscribe(String name) throws JMSException { checkOpen(); if (name == null) throw new InvalidDestinationException(L.l("destination is null. Destination may not be null for Session.unsubscribe")); TopicSubscriber subscriber = _connection.removeDurableSubscriber(name); if (subscriber == null) throw new InvalidDestinationException(L.l("'{0}' is an unknown subscriber for Session.unsubscribe", name)); subscriber.close(); } /** * Starts the session. */ void start() { if (log.isLoggable(Level.FINE)) log.fine(toString() + " active"); notifyMessageAvailable(); } /**
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?