jmssession.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,204 行 · 第 1/2 页

JAVA
1,204
字号
   * Stops the session.   */  void stop()  {    if (log.isLoggable(Level.FINE))      log.fine(toString() + " stopping");        synchronized (_consumers) {      long timeout = Alarm.getCurrentTime() + SHUTDOWN_WAIT_TIME;      while (_isRunning && Alarm.getCurrentTime() < timeout) {	try {	  _consumers.wait(SHUTDOWN_WAIT_TIME);		  if (Alarm.isTest()) {	    return;	  }	} catch (Throwable e) {	  log.log(Level.FINER, e.toString(), e);	}      }      ArrayList<MessageConsumerImpl> consumers	= new ArrayList<MessageConsumerImpl>(_consumers);            for (MessageConsumerImpl consumer : consumers) {	try {	  // XXX: should be stop()?	  	  consumer.stop();	} catch (Throwable e) {	  log.log(Level.FINE, e.toString(), e);	}      }    }  }    /**   * Commits the messages.   */  public void commit()    throws JMSException  {    checkOpen();        commit(false);  }    /**   * Commits the messages.   */  private void commit(boolean isXA)    throws JMSException  {    _xid = null;    if (! _isTransacted && ! _isXA)      throw new IllegalStateException(L.l("commit() can only be called on a transacted session."));    _isXA = false;    ArrayList<TransactedMessage> messages = _transactedMessages;    if (messages != null) {      try {	for (int i = 0; i < messages.size(); i++) {	  messages.get(i).commit();	}      } finally {	messages.clear();      }    }    if (! isXA)      acknowledge();  }    /**   * Acknowledge received   */  public void acknowledge()    throws JMSException  {    checkOpen();    if (_transactedMessages != null) {      for (int i = _transactedMessages.size() - 1; i >= 0; i--) {	TransactedMessage msg = _transactedMessages.get(i);	if (msg instanceof ReceiveMessage) {	  _transactedMessages.remove(i);	  msg.commit();	}      }    }  }    /**   * Recovers the messages.   */  public void recover()    throws JMSException  {    checkOpen();    if (_isTransacted)      throw new IllegalStateException(L.l("recover() may not be called on a transacted session."));    if (_transactedMessages != null) {      for (int i = _transactedMessages.size() - 1; i >= 0; i--) {	TransactedMessage msg = _transactedMessages.get(i);	if (msg instanceof ReceiveMessage) {	  _transactedMessages.remove(i);	  msg.rollback();	}      }    }  }    /**   * Rollsback the messages.   */  public void rollback()    throws JMSException  {    checkOpen();    rollbackImpl();  }    /**   * Rollsback the messages.   */  public void rollbackImpl()    throws JMSException  {    if (! _isTransacted && ! _isXA)      throw new IllegalStateException(L.l("rollback() can only be called on a transacted session."));    if (_transactedMessages != null) {      for (int i = 0; i < _transactedMessages.size(); i++)	_transactedMessages.get(i).rollback();      _transactedMessages.clear();    }  }    /**   * Closes the session   */  public void close()    throws JMSException  {    if (_isClosed)      return;    try {      stop();    } catch (Exception e) {      log.log(Level.WARNING, e.toString(), e);    }    ArrayList<TransactedMessage> messages = _transactedMessages;        if (messages != null && _xid == null) {      _transactedMessages = null;            try {	for (int i = 0; i < messages.size(); i++) {	  messages.get(i).close();	}      } catch (Exception e) {	log.log(Level.WARNING, e.toString(), e);      }    }    for (int i = 0; i < _consumers.size(); i++) {      MessageConsumerImpl consumer = _consumers.get(i);      try {	consumer.close();      } catch (Exception e) {	log.log(Level.WARNING, e.toString(), e);      }    }    try {      _connection.removeSession(this);    } finally {      _isClosed = true;    }    _classLoader = null;  }  protected void addConsumer(MessageConsumerImpl consumer)  {    _consumers.add(consumer);    notifyMessageAvailable();  }  protected void removeConsumer(MessageConsumerImpl consumer)  {    if (_consumers != null)      _consumers.remove(consumer);  }  /**   * Notifies the receiver.   */  boolean notifyMessageAvailable()  {    synchronized (_consumers) {      _hasMessage = true;      if (_isRunning || ! _isAsynchronous || ! isActive())	return false;      _isRunning = true;    }    ThreadPool.getThreadPool().schedule(this);    // the yield is only needed for the regressions    Thread.yield();    return true;  }  /**   * Adds a message to the session message queue.   */  public void send(AbstractDestination queue,                   Message appMessage,                   int deliveryMode,                   int priority,                   long timeout)    throws JMSException  {    checkOpen();        if (queue == null)      throw new UnsupportedOperationException(L.l("empty queue is not allowed for this session."));        MessageImpl message = _messageFactory.copy(appMessage);    long now = Alarm.getExactTime();    long expiration = now + timeout;    message.setJMSMessageID(queue.generateMessageID());    if (message.getJMSDestination() == null)      message.setJMSDestination(queue);    message.setJMSDeliveryMode(deliveryMode);    if (message.getJMSTimestamp() == 0)      message.setJMSTimestamp(now);    if (message.getJMSExpiration() == 0)      message.setJMSExpiration(expiration);    message.setJMSPriority(priority);    // ejb/0970    boolean isXA = false;    try {      if (_isTransacted && _tm != null && _tm.getTransaction() != null)        isXA = true;    } catch (Exception e) {      log.log(Level.FINE, e.toString(), e);    }        if (_isTransacted || isXA) {      if (_transactedMessages == null)	_transactedMessages = new ArrayList<TransactedMessage>();      TransactedMessage transMsg = new SendMessage(queue, message);            _transactedMessages.add(transMsg);      if (_xid == null)        enlist();    }    else {      if (log.isLoggable(Level.FINE))	log.fine(queue + " sending " + message);            queue.send(this, message, priority, expiration);    }  }  private void enlist()  {    if (_tm != null) {      try {        Transaction trans = _tm.getTransaction();        if (trans != null)          trans.enlistResource(this);      } catch (Exception e) {        throw new RuntimeException(e);      }    }  }  private void delist()  {    if (_tm != null) {      try {        Transaction trans = _tm.getTransaction();        if (trans != null)          trans.delistResource(this, 0);      } catch (Exception e) {        throw new RuntimeException(e);      }    }  }  /**   * Adds a message to the session message queue.   */  void addTransactedReceive(AbstractDestination queue,			    MessageImpl message)  {    message.setSession(this);        if (_transactedMessages == null)      _transactedMessages = new ArrayList<TransactedMessage>();        TransactedMessage transMsg = new ReceiveMessage(queue, message);          _transactedMessages.add(transMsg);    if (_tm != null && _transactedMessages.size() == 1) {      enlist();    }  }  //  // XA  //  public Session getSession()  {    return this;  }    public XAResource getXAResource()  {    return this;  }    /**   * Returns true if the specified resource has the same RM.   */  public boolean isSameRM(XAResource xa)    throws XAException  {    return this == xa;  }    /**   * Sets the transaction timeout in seconds.   */  public boolean setTransactionTimeout(int timeout)    throws XAException  {    return true;  }    /**   * Gets the transaction timeout in seconds.   */  public int getTransactionTimeout()    throws XAException  {    return 0;  }    /**   * Called when the resource is associated with a transaction.   */  public void start(Xid xid, int flags)    throws XAException  {    _xid = xid;  }    /**   * Called when the resource is is done with a transaction.   */  public void end(Xid xid, int flags)    throws XAException  {    _xid = null;  }    /**   * Called to start the first phase of the commit.   */  public int prepare(Xid xid)    throws XAException  {    return 0;  }    /**   * Called to commit.   */  public void commit(Xid xid, boolean onePhase)    throws XAException  {    try {      commit(true);    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      delist();      _isXA = false;    }  }    /**   * Called to roll back.   */  public void rollback(Xid xid)    throws XAException  {    try {      rollbackImpl();    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      delist();      _isXA = false;    }  }    /**   * Called to forget an Xid that had a heuristic commit.   */  public void forget(Xid xid)    throws XAException  {  }    /**   * Called to find Xid's that need recovery.   */  public Xid[] recover(int flag)    throws XAException  {    return null;  }  /**   * Called to synchronously receive messages   */  public void run()  {    Thread.currentThread().setContextClassLoader(_classLoader);        boolean isValid = true;    while (isValid) {      isValid = false;      _hasMessage = false;	      try {	for (int i = 0; i < _consumers.size(); i++) {	  MessageConsumerImpl consumer = _consumers.get(i);	  while (isActive() && consumer.handleMessage(_messageListener)) {	  }	}	isValid = isActive();      } finally {	synchronized (_consumers) {	  if (! isValid)	    _isRunning = false;	  else if (! _hasMessage) {	    _isRunning = false;	    isValid = false;	  }	  // notification, e.g. for shutdown	  _consumers.notifyAll();	}      }    }  }  public boolean isClosed()  {    return _isClosed;  }  /**   * Checks that the session is open.   */  public void checkOpen()    throws javax.jms.IllegalStateException  {    if (_isClosed)      throw new javax.jms.IllegalStateException(L.l("session is closed"));  }  /**   * Verifies that multiple threads aren't using the session.   *   * 4.4.1 the client takes the responsibility.  There's no   * validation check.   */  void checkThread()    throws JMSException  {    Thread thread = _thread;        if (thread != Thread.currentThread() && thread != null) {      Exception e = new IllegalStateException(L.l("Can't use session from concurrent threads."));      log.log(Level.WARNING, e.toString(), e);    }  }  @Override  public String toString()  {    return getClass().getSimpleName() + "[]";  }  abstract class TransactedMessage {    abstract void commit()      throws JMSException;        abstract void rollback()      throws JMSException;        abstract void close()      throws JMSException;  }  class SendMessage extends TransactedMessage {    private final AbstractDestination _queue;    private final MessageImpl _message;        SendMessage(AbstractDestination queue, MessageImpl message)    {      _queue = queue;      _message = message;    }    void commit()      throws JMSException    {      _queue.send(JmsSession.this, _message, _message.getJMSPriority(), 0);    }    void rollback()      throws JMSException    {    }        void close()      throws JMSException    {      commit();    }  }  class ReceiveMessage extends TransactedMessage {    private final AbstractDestination _queue;    private final String _msgId;        ReceiveMessage(AbstractDestination queue, MessageImpl message)    {      _queue = queue;      _msgId = message.getJMSMessageID();      if (_msgId == null)	throw new NullPointerException();    }    void commit()      throws JMSException    {      _queue.acknowledge(_msgId);    }    void rollback()      throws JMSException    {      _queue.rollback(_msgId);    }        void close()      throws JMSException    {      rollback();    }  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?