📄 messageconsumerlistener.java
字号:
/** * Called by Session. */ synchronized void start() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.start()"); if (status == Status.INIT) { subscribe(null); setStatus(Status.RUN); } else { // Should not happen throw new IllegalStateException("Status error"); } } private void subscribe(String[] toAck) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.subscribe()"); ConsumerSetListRequest req = new ConsumerSetListRequest( targetName, selector, queueMode, toAck, queueMessageReadMax); // Change the receive status before sending // the request. subscribe() is not synchronized // so the reply can be received before the end // of this method. setReceiveStatus(ReceiveStatus.WAIT_FOR_REPLY); rm.sendRequest(req, this); requestId = req.getRequestId(); } /** * Called by Session. */ public void close() throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.close()"); synchronized (this) { while (status == Status.ON_MSG) { try { // Wait for the message listener to return from // onMessage() wait(); } catch (InterruptedException exc) {} } if (status == Status.INIT || status == Status.CLOSE) return; rm.abortRequest(requestId); // If session ack mode is DUPS_OK acknowledge(0); setStatus(Status.CLOSE); } if (queueMode) { // Out of the synchronized block because it could // lead to a dead lock with // the connection driver thread calling replyReceived. ConsumerUnsetListRequest unsetLR = new ConsumerUnsetListRequest( queueMode); unsetLR.setTarget(targetName); unsetLR.setCancelledRequestId(requestId); rm.sendRequest(unsetLR); } // else useless for a topic // because the subscription // is deleted (see MessageConsumer.close()) } private void acknowledge(int threshold) { try { synchronized (messagesToAck) { if (messagesToAck.size() > threshold) { ConsumerAckRequest ack = new ConsumerAckRequest( targetName, queueMode); for (int i = 0; i < messagesToAck.size(); i++) { String msgId = (String) messagesToAck.elementAt(i); ack.addId(msgId); } rm.sendRequest(ack); messagesToAck.clear(); } } } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.ERROR)) logger.log( BasicLevel.ERROR, "", exc); } } /** * Called by RequestMultiplexer. */ public synchronized boolean replyReceived(AbstractJmsReply reply) throws AbortedRequestException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.replyReceived(" + reply + ')'); if (status == Status.CLOSE) { throw new AbortedRequestException(); } else { if (queueMode) { // 1- Change the status before pushing the // messages into the session queue. setReceiveStatus(ReceiveStatus.CONSUMING_REPLY); } try { ConsumerMessages cm = (ConsumerMessages)reply; // 2- increment messageCount (synchronized) messageCount += cm.getMessageCount(); pushMessages(cm); } catch (StoppedQueueException exc) { throw new AbortedRequestException(); } catch (JMSException exc) { throw new AbortedRequestException(); } if (queueMode) { return true; } else { return false; } } } /** * Pushes the received messages. * Currently two behaviors: * 1- SingleSessionConsumer pushes the message * in a single session (standard JMS) * 2- MultiSessionConsumer pushes the message * in several session (from a session pool) * * @param cm */ public abstract void pushMessages(ConsumerMessages cm) throws JMSException; public void replyAborted(int requestId) { // Nothing to do. } public synchronized boolean isClosed() { return (status == Status.CLOSE); } public final MessageListener getMessageListener() { return listener; } public final boolean getQueueMode() { return queueMode; } public final String getTargetName() { return targetName; } protected void activateListener( Message msg, MessageListener listener, int ackMode) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "MessageConsumerListener.onMessage(" + msg + ')'); // Consume one message decreaseMessageCount(ackMode); try { listener.onMessage(msg); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, " -> consumer.onMessage(" + msg + ") returned"); } catch (RuntimeException re) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", re); JMSException exc = new JMSException(re.toString()); exc.setLinkedException(re); throw exc; } } public abstract void onMessage( Message msg, MessageListener listener, int ackMode) throws JMSException; /** * Called by Session (standard JMS, mono-threaded */ public void onMessage(Message msg, int ackMode) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "MessageConsumerListener.onMessage(" + msg + ')'); if (listener != null) { try { synchronized (this) { if (status == Status.RUN) { setStatus(Status.ON_MSG); } else { throw new javax.jms.IllegalStateException("Message listener closed"); } } activateListener(msg, listener, ackMode); } finally { synchronized (this) { setStatus(Status.RUN); // Notify threads trying to close the listener. notifyAll(); } } } else { throw new JMSException("Null listener"); } } void ack(String msgId, int ackMode) throws JMSException { if (ackMode == javax.jms.Session.DUPS_OK_ACKNOWLEDGE) { // All the operations on messagesToAck are synchronized // on the vector (see subscribe() and acknowledge()). messagesToAck.addElement(msgId); if (! queueMode) { acknowledge(topicAckBufferMax); } } else { ConsumerAckRequest ack = new ConsumerAckRequest(targetName, queueMode); ack.addId(msgId); rm.sendRequest(ack); } } void activateMessageInput() throws JMSException { rm.sendRequest( new ActivateConsumerRequest(targetName, true)); } void passivateMessageInput() throws JMSException { rm.sendRequest( new ActivateConsumerRequest(targetName, false)); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -