📄 session.java
字号:
checkThreadOfControl(); MessageConsumer cons; if (consumers != null) { for (int i = 0; i < consumers.size(); i++) { cons = (MessageConsumer) consumers.get(i); if (! cons.queueMode && cons.targetName.equals(name)) throw new JMSException("Can't delete durable subscription " + name + " as long as an active subscriber exists."); } } syncRequest(new ConsumerUnsubRequest(name)); } /** * API method. * * @exception JMSException */ public void close() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.close()"); closer.close(); } /** * This class synchronizes the close. * Close can't be synchronized with 'this' * because the Session must be accessed * concurrently during its closure. So * we need a second lock. */ class Closer { synchronized void close() throws JMSException { doClose(); } } void doClose() throws JMSException { synchronized (this) { if (status == Status.CLOSE) return; } // Don't synchronize the consumer closure because // it could deadlock with message listeners or // client threads still using the session. Vector consumersToClose = (Vector)consumers.clone(); consumers.clear(); for (int i = 0; i < consumersToClose.size(); i++) { MessageConsumer mc = (MessageConsumer)consumersToClose.elementAt(i); try { mc.close(); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } } Vector browsersToClose = (Vector)browsers.clone(); browsers.clear(); for (int i = 0; i < browsersToClose.size(); i++) { QueueBrowser qb = (QueueBrowser)browsersToClose.elementAt(i); try { qb.close(); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } } Vector producersToClose = (Vector)producers.clone(); producers.clear(); for (int i = 0; i < producersToClose.size(); i++) { MessageProducer mp = (MessageProducer)producersToClose.elementAt(i); try { mp.close(); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); } } // This is now in removeMessageListener // called by MessageConsumer.close() // (see above)// try {// repliesIn.stop();// } catch (InterruptedException iE) {} stop(); // The requestor must be closed because // it could be used by a concurrent receive // as it is not synchronized (see receive()). receiveRequestor.close(); // Denying the non acknowledged messages: if (transacted) { rollback(); } else { deny(); } cnx.closeSession(this); synchronized (this) { setStatus(Status.CLOSE); } } /** * Starts the asynchronous deliveries in the session. * <p> * This method is called by a started connection. */ synchronized void start() { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.start()"); if (status == Status.CLOSE) return; if (status == Status.START) return; if (listenerCount > 0) { doStart(); } setStatus(Status.START); } private void doStart() { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.doStart()"); repliesIn.start(); daemon = new SessionDaemon(); daemon.setDaemon(false); daemon.start(); singleThreadOfControl = daemon.getThread(); } /** * Stops the asynchronous deliveries processing in the session. * <p> * This method must be carefully used. When the session is stopped, the * connection might very well going on pushing deliveries in the * session's queue. If the session is never re-started, these deliveries * will never be poped out, and this may lead to a situation of consumed * but never acknowledged messages. * <p> * This fatal situation never occurs as the <code>stop()</code> method is * either called by he <code>Session.close()</code> * and <code>Connection.stop()</code> methods, which first empties the * session's deliveries and forbid any further push. */ synchronized void stop() { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.stop()"); if (status == Status.STOP || status == Status.CLOSE) return; // DF: According to JMS 1.1 java doc // the method stop "blocks until receives in progress have completed." // But the JMS 1.1 specification doesn't mention this point. // So we don't implement it: a stop doesn't block until // receives have completed.// while (requestStatus != RequestStatus.NONE) {// try {// wait();// } catch (InterruptedException exc) {}// } doStop(); setStatus(Status.STOP); } private void doStop() { if (daemon != null) { daemon.stop(); daemon = null; singleThreadOfControl = null; } } /** * Method called by message producers when producing a message for * preparing the session to later commit it. * * @param dest The destination the message is destinated to. * @param msg The message. */ private void prepareSend( Destination dest, org.objectweb.joram.shared.messages.Message msg) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.prepareSend(" + dest + ',' + msg + ')'); checkClosed(); checkThreadOfControl(); // If the transaction was scheduled, cancelling: if (scheduled) closingTask.cancel(); ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName()); if (pM == null) { pM = new ProducerMessages(dest.getName()); sendings.put(dest.getName(), pM); } pM.addMessage(msg); // If the transaction was scheduled, re-scheduling it: if (scheduled) closingTask.start(); } /** * Method called by message consumers when receiving a message for * preparing the session to later acknowledge or deny it. * * @param name Name of the destination or of the proxy subscription * the message comes from. * @param id Identifier of the consumed message. * @param queueMode <code>true</code> if the message consumed comes from * a queue. */ private void prepareAck(String name, String id, boolean queueMode) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.prepareAck(" + name + ',' + id + ',' + queueMode + ')'); // If the transaction was scheduled, cancelling: if (scheduled) closingTask.cancel(); MessageAcks acks = (MessageAcks) deliveries.get(name); if (acks == null) { acks = new MessageAcks(queueMode); deliveries.put(name, acks); } acks.addId(id); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, " -> acks = " + acks); // If the transaction must be scheduled, scheduling it: if (closingTask != null) { scheduled = true; closingTask.start(); } } /** * Method acknowledging the received messages. * Called by Message. */ synchronized void acknowledge() throws JMSException { checkClosed(); if (transacted || acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE) { return; } doAcknowledge(); } /** * Method acknowledging the received messages. */ private void doAcknowledge() throws JMSException { Enumeration targets = deliveries.keys(); while (targets.hasMoreElements()) { String target = (String) targets.nextElement(); MessageAcks acks = (MessageAcks) deliveries.remove(target); mtpx.sendRequest( new SessAckRequest( target, acks.getIds(), acks.getQueueMode())); } } /** * Method denying the received messages. * * Called from: * - rollback -> synchronized client thread * - recover -> synchronized client thread * - close -> synchronized client thread * - onMessage -> not synchronized session daemon. * It is the only thread that can run into the session * (session mode = LISTENER) except for the method close that * can be called concurrently. But close() first stops the session * daemon and then calls deny(). * * The hashtable deliveries is also accessed from: * - acknowledge -> synchronized client thread * - commit -> synchronized client thread * - receive -> synchronized client thread. * - onMessage -> not synchronized session daemon (see above). */ private void deny() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.deny()"); Enumeration targets = deliveries.keys(); while (targets.hasMoreElements()) { String target = (String) targets.nextElement(); MessageAcks acks = (MessageAcks) deliveries.remove(target); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, " -> acks = " + acks + ')'); SessDenyRequest deny = new SessDenyRequest( target, acks.getIds(), acks.getQueueMode()); if (acks.getQueueMode()) { requestor.request(deny); } else { mtpx.sendRequest(deny); } } } /** * Called by MessageConsumer * Not synchronized because ot it can be * concurrently called by close() * and Connection.stop(). */ javax.jms.Message receive( long requestTimeToLive, long waitTimeOut, MessageConsumer mc, String targetName, String selector, boolean queueMode) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.receive(" + requestTimeToLive + ',' + waitTimeOut + ',' + targetName + ',' + selector + ',' + queueMode + ')'); preReceive(mc); try { ConsumerMessages reply = null; ConsumerReceiveRequest request = new ConsumerReceiveRequest( targetName, selector, requestTimeToLive, queueMode); if (receiveAck) request.setReceiveAck(true); reply = (ConsumerMessages)receiveRequestor.request( request, waitTimeOut); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, " -> reply = " + reply); synchronized (this) { // The session may have been // closed in between. if (status == Status.CLOSE) { if (reply != null) { mtpx.deny(reply); } return null; } if (reply != null) { Vector msgs = reply.getMessages(); if (msgs != null && ! msgs.isEmpty()) { org.objectweb.joram.shared.messages.Message msg = (org.objectweb.joram.shared.messages.Message) msgs.get(0); String msgId = msg.getIdentifier(); // Auto ack: acknowledging the message: if (autoAck && ! receiveAck) { ConsumerAckRequest req = new ConsumerAckRequest( targetName, queueMode); req.addId(msgId); mtpx.sendRequest(req); } else { prepareAck(targetName, msgId, queueMode); } return Message.wrapMomMessage(this, msg); } else { return null; } } else { return null; } } } finally { postReceive(); } } /** * First stage before calling the proxy and waiting * for the reply. It is synchronized because it * locks the session in order to prevent any other * thread to make another operation. */ private synchronized void preReceive( MessageConsumer mc) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.preReceive(" + mc + ')'); // The message consumer may have been closed // after the first check (in MessageConsumer.receive()) // and before preReceive. mc.checkClosed(); checkClosed(); checkThreadOfControl(); // Don't call checkSessionMode because // we also check that the session mode is not // already set to RECEIVE. switch (sessionMode) { case SessionMode.NONE: setSessionMode(SessionMode.RECEIVE); break; default: throw new IllegalStateException("Illegal session mode"); } if (requestStatus != RequestStatus.NONE) throw new IllegalStateException("Illegal request status"); singleThreadOfControl = Thread.currentThread(); pendingMessageConsumer = mc; setRequestStatus(RequestStatus.RUN); } /** * Final stage after calling the reply has been returned * by the roxy. It releases the session and enables another * thread to call it. */ private synchronized void postReceive() { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.postReceive()"); singleThreadOfControl = null; pendingMessageConsumer = null; setRequestStatus(RequestStatus.NONE); setSessionMode(SessionMode.NONE); notifyAll(); } /** * Called here and by sub-classes. */ protected synchronized void addConsumer( MessageConsumer mc) { consumers.addElement(mc); } /** * Called by MessageConsumer. */ synchronized void closeConsumer(MessageConsumer mc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.closeConsumer(" + mc + ')'); consumers.removeElement(mc); if (pendingMessageConsumer == mc) { if (requestStatus == RequestStatus.RUN) { // Close the requestor. A call to abortRequest() // is not enough because the receiving thread // may call request() just after this thread // calls abort(). receiveRequestor.close(); // Wait for the end of the request try { while (requestStatus != RequestStatus.NONE) { wait(); } } catch (InterruptedException exc) {} // Create a new requestor. receiveRequestor = new Requestor(mtpx); } } } /** * Called by Connection (i.e. temporary destinations deletion) */ synchronized void checkConsumers(String agentId)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -