jmssession.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,204 行 · 第 1/2 页

JAVA
1,204
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.jms.connection;import com.caucho.jms.message.*;import com.caucho.jms.queue.*;import com.caucho.util.Alarm;import com.caucho.util.L10N;import com.caucho.util.ThreadPool;import com.caucho.util.ThreadTask;import javax.jms.*;import javax.jms.IllegalStateException;import javax.naming.*;import javax.transaction.*;import javax.transaction.xa.*;import java.io.Serializable;import java.util.ArrayList;import java.util.logging.Level;import java.util.logging.Logger;/** * Manages the JMS session. */public class JmsSession implements XASession, ThreadTask, XAResource{  protected static final Logger log    = Logger.getLogger(JmsSession.class.getName());  protected static final L10N L = new L10N(JmsSession.class);  private static final long SHUTDOWN_WAIT_TIME = 10000;  private boolean _isXA;  private Xid _xid;  private TransactionManager _tm;    private boolean _isTransacted;  private int _acknowledgeMode;  private ClassLoader _classLoader;    private ConnectionImpl _connection;    private final ArrayList<MessageConsumerImpl> _consumers    = new ArrayList<MessageConsumerImpl>();  private MessageFactory _messageFactory = new MessageFactory();  private MessageListener _messageListener;  private boolean _isAsynchronous;  // 4.4.1 - client's responsibility  private Thread _thread;  // transacted messages  private ArrayList<TransactedMessage> _transactedMessages;  // true if the listener thread is running  private volatile boolean _isRunning;    private volatile boolean _isClosed;  private volatile boolean _hasMessage;  public JmsSession(ConnectionImpl connection,		     boolean isTransacted, int ackMode,                     boolean isXA)    throws JMSException  {    _classLoader = Thread.currentThread().getContextClassLoader();        _connection = connection;    _isXA = isXA;        _isTransacted = isTransacted;    _acknowledgeMode = ackMode;    if (isTransacted)      _acknowledgeMode = 0;    else {      switch (ackMode) {      case CLIENT_ACKNOWLEDGE:      case DUPS_OK_ACKNOWLEDGE:      case AUTO_ACKNOWLEDGE:	_acknowledgeMode = ackMode;	break;      default:        try {          log.warning(L.l("JmsSession {0} is an illegal acknowledge mode",                          ackMode));          // XXX: tck          // throw new JMSException(L.l("{0} is an illegal acknowledge mode", ackMode));          log.warning(L.l("JmsSession {0} is an illegal acknowledge mode",                          ackMode));          _acknowledgeMode = AUTO_ACKNOWLEDGE;        } catch (Exception e) {          log.log(Level.FINE, e.toString(), e);        }        break;      }    }    try {      InitialContext ic = new InitialContext();              _tm = (TransactionManager) ic.lookup("java:comp/TransactionManager");    } catch (Exception e) {      log.log(Level.FINER, e.toString(), e);    }        _connection.addSession(this);  }  /**   * Returns the connection.   */  ConnectionImpl getConnection()  {    return _connection;  }  /**   * Returns the ClassLoader.   */  ClassLoader getClassLoader()  {    return _classLoader;  }  /**   * Returns the connection's clientID   */  public String getClientID()    throws JMSException  {    return _connection.getClientID();  }  /**   * Returns true if the connection is active.   */  public boolean isActive()  {    return ! _isClosed && _connection.isActive();  }  /**   * Returns true if the connection is active.   */  boolean isStopping()  {    return _connection.isStopping();  }  /**   * Returns true if the session is in a transaction.   */  public boolean getTransacted()    throws JMSException  {    checkOpen();        return _isTransacted;  }  /**   * Returns the acknowledge mode for the session.   */  public int getAcknowledgeMode()    throws JMSException  {    checkOpen();        return _acknowledgeMode;  }  /**   * Returns the message listener   */  public MessageListener getMessageListener()    throws JMSException  {    checkOpen();        return _messageListener;  }  /**   * Sets the message listener   */  public void setMessageListener(MessageListener listener)    throws JMSException  {    checkOpen();        _messageListener = listener;    setAsynchronous();  }  /**   * Set true for a synchronous session.   */  void setAsynchronous()  {    _isAsynchronous = true;    notifyMessageAvailable();  }  /**   * Set true for a synchronous session.   */  boolean isAsynchronous()  {    return _isAsynchronous;  }  /**   * Creates a new byte[] message.   */  public BytesMessage createBytesMessage()    throws JMSException  {    checkOpen();        return new BytesMessageImpl();  }  /**   * Creates a new map message.   */  public MapMessage createMapMessage()    throws JMSException  {    checkOpen();        return new MapMessageImpl();  }  /**   * Creates a message.  Used when only header info is important.   */  public Message createMessage()    throws JMSException  {    checkOpen();        return new MessageImpl();  }  /**   * Creates an object message.   */  public ObjectMessage createObjectMessage()    throws JMSException  {    checkOpen();        return new ObjectMessageImpl();  }  /**   * Creates an object message.   *   * @param obj a serializable message.   */  public ObjectMessage createObjectMessage(Serializable obj)    throws JMSException  {    checkOpen();        ObjectMessage msg = createObjectMessage();    msg.setObject(obj);    return msg;  }  /**   * Creates a stream message.   */  public StreamMessage createStreamMessage()    throws JMSException  {    checkOpen();        return new StreamMessageImpl();  }  /**   * Creates a text message.   */  public TextMessage createTextMessage()    throws JMSException  {    checkOpen();        return new TextMessageImpl();  }  /**   * Creates a text message.   */  public TextMessage createTextMessage(String message)    throws JMSException  {    checkOpen();        TextMessage msg = createTextMessage();    msg.setText(message);    return msg;  }  /**   * Creates a consumer to receive messages.   *   * @param destination the destination to receive messages from.   */  public MessageConsumer createConsumer(Destination destination)    throws JMSException  {    checkOpen();    return createConsumer(destination, null, false);  }  /**   * Creates a consumer to receive messages.   *   * @param destination the destination to receive messages from.   * @param messageSelector query to restrict the messages.   */  public MessageConsumer createConsumer(Destination destination,                                        String messageSelector)    throws JMSException  {    checkOpen();        return createConsumer(destination, messageSelector, false);  }  /**   * Creates a consumer to receive messages.   *   * @param destination the destination to receive messages from.   * @param messageSelector query to restrict the messages.   */  public MessageConsumer createConsumer(Destination destination,                                        String messageSelector,                                        boolean noLocal)    throws JMSException  {    checkOpen();    if (destination == null)      throw new InvalidDestinationException(L.l("destination is null.  Destination may not be null for Session.createConsumer"));    MessageConsumerImpl consumer;        if (destination instanceof AbstractQueue) {      AbstractQueue dest = (AbstractQueue) destination;      consumer = new MessageConsumerImpl(this, dest, messageSelector, noLocal);    }    else if (destination instanceof AbstractTopic) {      AbstractTopic dest = (AbstractTopic) destination;      consumer = new TopicSubscriberImpl(this, dest, messageSelector, noLocal);    }    else      throw new InvalidDestinationException(L.l("'{0}' is an unknown destination.  The destination must be a Resin JMS Destination.",						destination));        addConsumer(consumer);    return consumer;  }  /**   * Creates a producer to produce messages.   *   * @param destination the destination to send messages from.   */  public MessageProducer createProducer(Destination destination)    throws JMSException  {    checkOpen();    if (destination == null) {      return new MessageProducerImpl(this, null);    }        if (! (destination instanceof AbstractDestination))      throw new InvalidDestinationException(L.l("'{0}' is an unknown destination.  The destination must be a Resin JMS destination for Session.createProducer.",						destination));    AbstractDestination dest = (AbstractDestination) destination;    return new MessageProducerImpl(this, dest);  }  /**   * Creates a QueueBrowser to browse messages in the queue.   *   * @param queue the queue to send messages to.   */  public QueueBrowser createBrowser(Queue queue)    throws JMSException  {    checkOpen();        return createBrowser(queue, null);  }  /**   * Creates a QueueBrowser to browse messages in the queue.   *   * @param queue the queue to send messages to.   */  public QueueBrowser createBrowser(Queue queue, String messageSelector)    throws JMSException  {    checkOpen();    if (queue == null)      throw new InvalidDestinationException(L.l("queue is null.  Queue may not be null for Session.createBrowser"));        if (! (queue instanceof AbstractQueue))      throw new InvalidDestinationException(L.l("'{0}' is an unknown queue.  The queue must be a Resin JMS Queue for Session.createBrowser.",						queue));        return new MessageBrowserImpl(this, (AbstractQueue) queue,				  messageSelector);  }  /**   * Creates a new queue.   */  public Queue createQueue(String queueName)    throws JMSException  {    checkOpen();    return _connection.createQueue(queueName);  }  /**   * Creates a temporary queue.   */  public TemporaryQueue createTemporaryQueue()    throws JMSException  {    checkOpen();        return new TemporaryQueueImpl(this);  }  /**   * Creates a new topic.   */  public Topic createTopic(String topicName)    throws JMSException  {    checkOpen();    return _connection.createTopic(topicName);  }  /**   * Creates a temporary topic.   */  public TemporaryTopic createTemporaryTopic()    throws JMSException  {    checkOpen();        return new TemporaryTopicImpl(this);  }  /**   * Creates a durable subscriber to receive messages.   *   * @param topic the topic to receive messages from.   */  public TopicSubscriber createDurableSubscriber(Topic topic, String name)    throws JMSException  {    checkOpen();        if (getClientID() == null)      throw new JMSException(L.l("connection may not create a durable subscriber because it does not have an assigned ClientID."));    return createDurableSubscriber(topic, name, null, false);  }  /**   * Creates a subscriber to receive messages.   *   * @param topic the topic to receive messages from.   * @param messageSelector topic to restrict the messages.   * @param noLocal if true, don't receive messages we've sent   */  public TopicSubscriber createDurableSubscriber(Topic topic,                                                 String name,                                                 String messageSelector,                                                 boolean noLocal)    throws JMSException  {    checkOpen();    if (topic == null)      throw new InvalidDestinationException(L.l("destination is null.  Destination may not be null for Session.createDurableSubscriber"));        if (! (topic instanceof AbstractTopic))      throw new InvalidDestinationException(L.l("'{0}' is an unknown destination.  The destination must be a Resin JMS Destination.",						topic));        AbstractTopic topicImpl = (AbstractTopic) topic;    if (_connection.getDurableSubscriber(name) != null) {      // jms/2130      // unsubscribe(name);      /*      throw new JMSException(L.l("'{0}' is already an active durable subscriber",				 name));      */    }    AbstractQueue queue = topicImpl.createSubscriber(this, name, noLocal);    TopicSubscriberImpl consumer;    consumer = new TopicSubscriberImpl(this, topicImpl, queue,				       messageSelector, noLocal);        _connection.putDurableSubscriber(name, consumer);        addConsumer(consumer);    return consumer;  }  /**   * Unsubscribe from a durable subscription.   */  public void unsubscribe(String name)    throws JMSException  {    checkOpen();    if (name == null)      throw new InvalidDestinationException(L.l("destination is null.  Destination may not be null for Session.unsubscribe"));    TopicSubscriber subscriber = _connection.removeDurableSubscriber(name);    if (subscriber == null)      throw new InvalidDestinationException(L.l("'{0}' is an unknown subscriber for Session.unsubscribe",                                          name));    subscriber.close();  }  /**   * Starts the session.   */  void start()  {    if (log.isLoggable(Level.FINE))      log.fine(toString() + " active");        notifyMessageAvailable();  }  /**

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?