📄 messageconsumer.java
字号:
// Synchronizing with a possible "close". synchronized(this) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": requests to receive a message."); if (closed) throw new IllegalStateException("Forbidden call on a closed consumer."); if (messageListener != null) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, "Improper call as a" + " listener exists for this consumer."); } else if (sess.msgListeners > 0) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, "Improper call as" + " asynchronous consumers have already" + " been set on the session."); } pendingReq = new ConsumerReceiveRequest(targetName, selector, timeOut, queueMode); pendingReq.setRequestId(sess.cnx.nextRequestId()); receiving = true; // In case of a timer, scheduling the receive: if (timeOut > 0) { replyingTask = new ConsumerReplyTask(pendingReq); sess.schedule(replyingTask, timeOut); } } // Expecting an answer: ConsumerMessages reply = (ConsumerMessages) sess.cnx.syncRequest(pendingReq); // Synchronizing again with a possible "close": synchronized(this) { receiving = false; pendingReq = null; if (replyingTask != null) replyingTask.cancel(); if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": received a" + " reply."); Vector msgs = reply.getMessages(); if (msgs != null && ! msgs.isEmpty()) { com.scalagent.kjoram.messages.Message msg = (com.scalagent.kjoram.messages.Message) msgs.elementAt(0); String msgId = msg.getIdentifier(); // Auto ack: acknowledging the message: if (sess.autoAck) sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId, queueMode)); // Session ack: passing the id for later ack or deny: else sess.prepareAck(targetName, msgId, queueMode); return Message.wrapMomMessage(sess, msg); } else return null; } } /** * API method. * * @exception IllegalStateException If the consumer is closed, or if the * connection is broken. * @exception JMSSecurityException If the requester is not a READER on the * destination. * @exception JMSException If the request fails for any other reason. */ public Message receive() throws JMSException { return receive(0); } /** * API method. * * @exception IllegalStateException If the consumer is closed, or if the * connection is broken. * @exception JMSSecurityException If the requester is not a READER on the * destination. * @exception JMSException If the request fails for any other reason. */ public Message receiveNoWait() throws JMSException { return receive(-1); } /** * API method. * * @exception JMSException Actually never thrown. */ public void close() throws JMSException { // Ignoring the call if consumer is already closed: if (closed) return; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "--- " + this + ": closing..."); // Synchronizig with a possible receive() or onMessage() ongoing process. syncro(); // Removing this resource's reference from everywhere: Object lock = null; if (pendingReq != null) lock = sess.cnx.requestsTable.remove(pendingReq.getKey()); sess.consumers.removeElement(this); // Unsetting the listener, if any: try { if (messageListener != null) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "Unsetting listener."); if (queueMode) { ConsumerUnsetListRequest unsetLR = new ConsumerUnsetListRequest(true); unsetLR.setCancelledRequestId(pendingReq.getRequestId()); sess.cnx.syncRequest(unsetLR); } } if (durableSubscriber) sess.cnx.syncRequest(new ConsumerCloseSubRequest(targetName)); else if (! queueMode) sess.cnx.syncRequest(new ConsumerUnsubRequest(targetName)); } // A JMSException might be caught if the connection is broken. catch (JMSException jE) {} // In the case of a pending "receive" request, replying by a null to it: if (lock != null && receiving) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, "Replying to the" + " pending receive " + pendingReq.getRequestId() + " with a null message."); sess.cnx.repliesTable.put(pendingReq.getKey(), new ConsumerMessages()); synchronized(lock) { lock.notify(); } } // Synchronizing again: syncro(); closed = true; if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.DEBUG, this + ": closed."); } /** * Returns when the consumer isn't busy executing a "onMessage" or a * "receive" anymore; method called for synchronization purposes. */ synchronized void syncro() {} /** * Method called by the session daemon for passing an asynchronous message * delivery to the listener. */ synchronized void onMessage(com.scalagent.kjoram.messages.Message message) { String msgId = message.getIdentifier(); try { // If the listener has been unset without having stopped the // connection, this case might happen: if (messageListener == null) { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, this + ": an" + " asynchronous delivery arrived" + " for an improperly unset listener:" + " denying the message."); sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode, true)); } else { // In session ack mode, preparing later ack or deny: if (! sess.autoAck) sess.prepareAck(targetName, msgId, queueMode); try { messageListener.onMessage(Message.wrapMomMessage(sess, message)); // Auto ack: acknowledging the message: if (sess.autoAck) sess.cnx.asyncRequest(new ConsumerAckRequest(targetName, msgId, queueMode)); } // Catching a JMSException means that the building of the Joram // message went wrong: denying as expected by the spec: catch (JMSException jE) { JoramTracing.log(JoramTracing.ERROR, this + ": error while processing the" + " received message: " + jE); if (queueMode) sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); else sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); } // Catching a RuntimeException means that the client onMessage() code // is incorrect; denying as expected by the JMS spec: catch (RuntimeException rE) { JoramTracing.log(JoramTracing.ERROR, this + ": RuntimeException thrown" + " by the listener: " + rE); if (sess.autoAck && queueMode) sess.cnx.syncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); else if (sess.autoAck && ! queueMode) sess.cnx.asyncRequest(new ConsumerDenyRequest(targetName, msgId, queueMode)); } // Sending a new request if queue mode: if (queueMode) { pendingReq = new ConsumerSetListRequest(targetName, selector, true); pendingReq.setRequestId(sess.cnx.nextRequestId()); sess.cnx.requestsTable.put(pendingReq.getKey(), this); sess.cnx.asyncRequest(pendingReq); } } } // Catching an IllegalStateException means that the acknowledgement or // denying went wrong because the connection has been lost. Nothing more // can be done here. catch (JMSException jE) { JoramTracing.log(JoramTracing.ERROR, this + ": " + jE); } } /** * The <code>ConsumerReplyTask</code> class is used by "receive" requests * with timer for taking care of answering them if the timer expires. */ private class ConsumerReplyTask extends TimerTask { /** The request to answer. */ private AbstractJmsRequest request; /** The reply to put in the connection's table. */ private ConsumerMessages nullReply; /** * Constructs a <code>ConsumerReplyTask</code> instance. * * @param requestId The request to answer. */ ConsumerReplyTask(AbstractJmsRequest request) { this.request = request; this.nullReply = new ConsumerMessages(request.getRequestId(), targetName, queueMode); } /** * Method called when the timer expires, actually putting a null answer * in the replies table and unlocking the requester. */ public void run() { try { if (JoramTracing.dbgClient) JoramTracing.log(JoramTracing.WARN, "Receive request" + " answered because timer expired"); Lock lock = (Lock) sess.cnx.requestsTable.remove(request.getKey()); if (lock == null) return; synchronized (lock) { sess.cnx.repliesTable.put(request.getKey(), nullReply); lock.notify(); } } catch (Exception e) {} } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -