jmssession.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,204 行 · 第 1/2 页
JAVA
1,204 行
* Stops the session. */ void stop() { if (log.isLoggable(Level.FINE)) log.fine(toString() + " stopping"); synchronized (_consumers) { long timeout = Alarm.getCurrentTime() + SHUTDOWN_WAIT_TIME; while (_isRunning && Alarm.getCurrentTime() < timeout) { try { _consumers.wait(SHUTDOWN_WAIT_TIME); if (Alarm.isTest()) { return; } } catch (Throwable e) { log.log(Level.FINER, e.toString(), e); } } ArrayList<MessageConsumerImpl> consumers = new ArrayList<MessageConsumerImpl>(_consumers); for (MessageConsumerImpl consumer : consumers) { try { // XXX: should be stop()? consumer.stop(); } catch (Throwable e) { log.log(Level.FINE, e.toString(), e); } } } } /** * Commits the messages. */ public void commit() throws JMSException { checkOpen(); commit(false); } /** * Commits the messages. */ private void commit(boolean isXA) throws JMSException { _xid = null; if (! _isTransacted && ! _isXA) throw new IllegalStateException(L.l("commit() can only be called on a transacted session.")); _isXA = false; ArrayList<TransactedMessage> messages = _transactedMessages; if (messages != null) { try { for (int i = 0; i < messages.size(); i++) { messages.get(i).commit(); } } finally { messages.clear(); } } if (! isXA) acknowledge(); } /** * Acknowledge received */ public void acknowledge() throws JMSException { checkOpen(); if (_transactedMessages != null) { for (int i = _transactedMessages.size() - 1; i >= 0; i--) { TransactedMessage msg = _transactedMessages.get(i); if (msg instanceof ReceiveMessage) { _transactedMessages.remove(i); msg.commit(); } } } } /** * Recovers the messages. */ public void recover() throws JMSException { checkOpen(); if (_isTransacted) throw new IllegalStateException(L.l("recover() may not be called on a transacted session.")); if (_transactedMessages != null) { for (int i = _transactedMessages.size() - 1; i >= 0; i--) { TransactedMessage msg = _transactedMessages.get(i); if (msg instanceof ReceiveMessage) { _transactedMessages.remove(i); msg.rollback(); } } } } /** * Rollsback the messages. */ public void rollback() throws JMSException { checkOpen(); rollbackImpl(); } /** * Rollsback the messages. */ public void rollbackImpl() throws JMSException { if (! _isTransacted && ! _isXA) throw new IllegalStateException(L.l("rollback() can only be called on a transacted session.")); if (_transactedMessages != null) { for (int i = 0; i < _transactedMessages.size(); i++) _transactedMessages.get(i).rollback(); _transactedMessages.clear(); } } /** * Closes the session */ public void close() throws JMSException { if (_isClosed) return; try { stop(); } catch (Exception e) { log.log(Level.WARNING, e.toString(), e); } ArrayList<TransactedMessage> messages = _transactedMessages; if (messages != null && _xid == null) { _transactedMessages = null; try { for (int i = 0; i < messages.size(); i++) { messages.get(i).close(); } } catch (Exception e) { log.log(Level.WARNING, e.toString(), e); } } for (int i = 0; i < _consumers.size(); i++) { MessageConsumerImpl consumer = _consumers.get(i); try { consumer.close(); } catch (Exception e) { log.log(Level.WARNING, e.toString(), e); } } try { _connection.removeSession(this); } finally { _isClosed = true; } _classLoader = null; } protected void addConsumer(MessageConsumerImpl consumer) { _consumers.add(consumer); notifyMessageAvailable(); } protected void removeConsumer(MessageConsumerImpl consumer) { if (_consumers != null) _consumers.remove(consumer); } /** * Notifies the receiver. */ boolean notifyMessageAvailable() { synchronized (_consumers) { _hasMessage = true; if (_isRunning || ! _isAsynchronous || ! isActive()) return false; _isRunning = true; } ThreadPool.getThreadPool().schedule(this); // the yield is only needed for the regressions Thread.yield(); return true; } /** * Adds a message to the session message queue. */ public void send(AbstractDestination queue, Message appMessage, int deliveryMode, int priority, long timeout) throws JMSException { checkOpen(); if (queue == null) throw new UnsupportedOperationException(L.l("empty queue is not allowed for this session.")); MessageImpl message = _messageFactory.copy(appMessage); long now = Alarm.getExactTime(); long expiration = now + timeout; message.setJMSMessageID(queue.generateMessageID()); if (message.getJMSDestination() == null) message.setJMSDestination(queue); message.setJMSDeliveryMode(deliveryMode); if (message.getJMSTimestamp() == 0) message.setJMSTimestamp(now); if (message.getJMSExpiration() == 0) message.setJMSExpiration(expiration); message.setJMSPriority(priority); // ejb/0970 boolean isXA = false; try { if (_isTransacted && _tm != null && _tm.getTransaction() != null) isXA = true; } catch (Exception e) { log.log(Level.FINE, e.toString(), e); } if (_isTransacted || isXA) { if (_transactedMessages == null) _transactedMessages = new ArrayList<TransactedMessage>(); TransactedMessage transMsg = new SendMessage(queue, message); _transactedMessages.add(transMsg); if (_xid == null) enlist(); } else { if (log.isLoggable(Level.FINE)) log.fine(queue + " sending " + message); queue.send(this, message, priority, expiration); } } private void enlist() { if (_tm != null) { try { Transaction trans = _tm.getTransaction(); if (trans != null) trans.enlistResource(this); } catch (Exception e) { throw new RuntimeException(e); } } } private void delist() { if (_tm != null) { try { Transaction trans = _tm.getTransaction(); if (trans != null) trans.delistResource(this, 0); } catch (Exception e) { throw new RuntimeException(e); } } } /** * Adds a message to the session message queue. */ void addTransactedReceive(AbstractDestination queue, MessageImpl message) { message.setSession(this); if (_transactedMessages == null) _transactedMessages = new ArrayList<TransactedMessage>(); TransactedMessage transMsg = new ReceiveMessage(queue, message); _transactedMessages.add(transMsg); if (_tm != null && _transactedMessages.size() == 1) { enlist(); } } // // XA // public Session getSession() { return this; } public XAResource getXAResource() { return this; } /** * Returns true if the specified resource has the same RM. */ public boolean isSameRM(XAResource xa) throws XAException { return this == xa; } /** * Sets the transaction timeout in seconds. */ public boolean setTransactionTimeout(int timeout) throws XAException { return true; } /** * Gets the transaction timeout in seconds. */ public int getTransactionTimeout() throws XAException { return 0; } /** * Called when the resource is associated with a transaction. */ public void start(Xid xid, int flags) throws XAException { _xid = xid; } /** * Called when the resource is is done with a transaction. */ public void end(Xid xid, int flags) throws XAException { _xid = null; } /** * Called to start the first phase of the commit. */ public int prepare(Xid xid) throws XAException { return 0; } /** * Called to commit. */ public void commit(Xid xid, boolean onePhase) throws XAException { try { commit(true); } catch (Exception e) { throw new RuntimeException(e); } finally { delist(); _isXA = false; } } /** * Called to roll back. */ public void rollback(Xid xid) throws XAException { try { rollbackImpl(); } catch (Exception e) { throw new RuntimeException(e); } finally { delist(); _isXA = false; } } /** * Called to forget an Xid that had a heuristic commit. */ public void forget(Xid xid) throws XAException { } /** * Called to find Xid's that need recovery. */ public Xid[] recover(int flag) throws XAException { return null; } /** * Called to synchronously receive messages */ public void run() { Thread.currentThread().setContextClassLoader(_classLoader); boolean isValid = true; while (isValid) { isValid = false; _hasMessage = false; try { for (int i = 0; i < _consumers.size(); i++) { MessageConsumerImpl consumer = _consumers.get(i); while (isActive() && consumer.handleMessage(_messageListener)) { } } isValid = isActive(); } finally { synchronized (_consumers) { if (! isValid) _isRunning = false; else if (! _hasMessage) { _isRunning = false; isValid = false; } // notification, e.g. for shutdown _consumers.notifyAll(); } } } } public boolean isClosed() { return _isClosed; } /** * Checks that the session is open. */ public void checkOpen() throws javax.jms.IllegalStateException { if (_isClosed) throw new javax.jms.IllegalStateException(L.l("session is closed")); } /** * Verifies that multiple threads aren't using the session. * * 4.4.1 the client takes the responsibility. There's no * validation check. */ void checkThread() throws JMSException { Thread thread = _thread; if (thread != Thread.currentThread() && thread != null) { Exception e = new IllegalStateException(L.l("Can't use session from concurrent threads.")); log.log(Level.WARNING, e.toString(), e); } } @Override public String toString() { return getClass().getSimpleName() + "[]"; } abstract class TransactedMessage { abstract void commit() throws JMSException; abstract void rollback() throws JMSException; abstract void close() throws JMSException; } class SendMessage extends TransactedMessage { private final AbstractDestination _queue; private final MessageImpl _message; SendMessage(AbstractDestination queue, MessageImpl message) { _queue = queue; _message = message; } void commit() throws JMSException { _queue.send(JmsSession.this, _message, _message.getJMSPriority(), 0); } void rollback() throws JMSException { } void close() throws JMSException { commit(); } } class ReceiveMessage extends TransactedMessage { private final AbstractDestination _queue; private final String _msgId; ReceiveMessage(AbstractDestination queue, MessageImpl message) { _queue = queue; _msgId = message.getJMSMessageID(); if (_msgId == null) throw new NullPointerException(); } void commit() throws JMSException { _queue.acknowledge(_msgId); } void rollback() throws JMSException { _queue.rollback(_msgId); } void close() throws JMSException { rollback(); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?