sessionconsumer.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 771 行 · 第 1/2 页
JAVA
771 行
/** * Browse up to count messages. * * @param consumerId the consumer identifier * @param count the maximum number of messages to receive * @return a list of {@link MessageImpl} instances * @throws JMSException for any JMS error */ public List browse(long consumerId, int count) throws JMSException { ConsumerEndpoint consumer = getConsumer(consumerId); if (!(consumer instanceof QueueBrowserEndpoint)) { throw new JMSException("Can't browse messages: invalid consumer"); } List messages = new ArrayList(count); try { _database.begin(); for (int i = 0; i < count && !_stop.get();) { MessageHandle handle = consumer.receive(_stop); if (handle == null) { break; } MessageImpl orig = handle.getMessage(); if (orig != null) { messages.add(copy(orig, handle)); ++i; } } _database.commit(); } catch (Exception exception) { rethrow("Failed to browse messages", exception); } return messages; } /** * Acknowledge that a message has been processed. * * @param consumerId the identity of the consumer performing the ack * @param messageId the message identifier * @throws JMSException for any error */ public synchronized void acknowledge(long consumerId, String messageId) throws JMSException { try { _database.begin(); _sent.acknowledge(messageId, consumerId); _database.commit(); } catch (Exception exception) { rethrow("Failed to acknowledge message", exception); } } /** * Close the consumer. * * @throws JMSException for any eror */ public synchronized void close() throws JMSException { _log.debug("close"); stop(); _listener = null; try { _database.begin(); _sent.clear(); _database.commit(); } catch (Exception exception) { rethrow(exception.getMessage(), exception); } } /** * Notifies that a message is available for a particular consumer. * * @param consumer the consumer */ public void messageAvailable(ConsumerEndpoint consumer) { if (queue(consumer)) { try { _runner.schedule(); } catch (InterruptedException exception) { _log.error("Failed to schedule worker", exception); } } } /** * Send messages to the client. */ private void dispatch() { final Condition timeout = TimedCondition.after(30 * 1000); Condition done = new Condition() { public boolean get() { return _stop.get() || timeout.get(); } }; _log.debug("dispatch"); int sent = 0; while (sent < MAX_MESSAGES && !done.get()) { ConsumerEndpoint consumer; synchronized (_pending) { if (!_pending.isEmpty()) { consumer = (ConsumerEndpoint) _pending.removeFirst(); } else { break; } } if (wantsMessages(consumer)) { if (consumer.isAsynchronous()) { if (send(consumer, done)) { ++sent; } if (needsScheduling(consumer)) { queue(consumer); } } else { notifyMessageAvailable(); } } } boolean empty; synchronized (_pending) { empty = _pending.isEmpty(); } if (!empty && !_stop.get()) { // reschedule this if needed try { _runner.schedule(); } catch (InterruptedException exception) { _log.error("Failed to reschedule worker", exception); } } _log.debug("dispatch[sent=" + sent + "]"); } private void notifyMessageAvailable() { try { // notify the client sesssion. _listener.onMessageAvailable(); } catch (RemoteException exception) { _log.debug("Failed to notify client", exception); } } private boolean queue(ConsumerEndpoint consumer) { boolean queued = false; if (!_stop.get()) { synchronized (_pending) { if (!_pending.contains(consumer)) { _pending.add(consumer); queued = true; } } } return queued; } private boolean send(ConsumerEndpoint consumer, Condition cancel) { boolean sent = false; MessageHandle handle = null; try { _database.begin(); try { synchronized (_removeLock) { _consumerId = consumer.getId(); } handle = consumer.receive(cancel); if (handle != null) { MessageImpl message = handle.getMessage(); if (message != null) { // send the client a copy. message = copy(message, handle); // clear any wait condition // @todo - possible race condition? Could // syncbronous client timeout and request again, // and this trash subsequent wait? consumer.setWaitingForMessage(null); _sent.preSend(handle); _database.commit(); // send the message sent = send(message); if (sent) { _database.begin(); _sent.postSend(handle); _database.commit(); } } } else { _database.commit(); } } finally { synchronized (_removeLock) { _consumerId = -1; _removeLock.notify(); } } } catch (Exception exception) { cleanup(exception.getMessage(), exception); } if (!sent && handle != null) { try { _database.begin(); handle.release(); _database.commit(); } catch (Exception exception) { cleanup("Failed to release unsent message", exception); } } return sent; } /** * Send the specified message to the client. * * @param message the message * @return <code>true</code> if the message was successfully sent */ protected boolean send(MessageImpl message) { boolean delivered = false; try { // send the message to the listener. delivered = _listener.onMessage(message); if (_log.isDebugEnabled()) { _log.debug("send[JMSMessageID=" + message.getMessageId() + ", delivered=" + delivered + "]"); } } catch (RemoteException exception) { _log.info("Failed to notify client", exception); } return delivered; } private boolean wantsMessages(ConsumerEndpoint consumer) { boolean result = false; if (consumer.isAsynchronous() || consumer.isWaitingForMessage()) { result = true; } return result; } private boolean needsScheduling(ConsumerEndpoint consumer) { boolean result = false; if (wantsMessages(consumer) && consumer.getMessageCount() != 0) { result = true; } return result; } private MessageImpl doReceive(long consumerId, final Condition wait) throws JMSException { ConsumerEndpoint consumer = getConsumer(consumerId); Condition cancel; if (wait != null) { cancel = new Condition() { public boolean get() { return _stop.get() || !wait.get(); } }; } else { cancel = _stop; } MessageImpl message = null; try { _database.begin(); MessageHandle handle = consumer.receive(cancel); if (handle != null) { // retrieve the message and copy it message = handle.getMessage(); if (message != null) { message = copy(message, handle); } } if (message == null) { // no message available. Mark the consumer as (possibly) waiting // for a message. consumer.setWaitingForMessage(wait); } else { // clear any wait condition consumer.setWaitingForMessage(null); // if we have a non-null message then add it to the sent message // cache. Additionally, if we are part of a global transaction // then we must also send it to the ResourceManager for recovery. _sent.preSend(handle); } _database.commit(); } catch (Exception exception) { rethrow(exception.getMessage(), exception); } if (_log.isDebugEnabled()) { if (message != null) { _log.debug("doReceive(consumerId=" + consumerId + ") -> JMSMesssageID=" + message.getMessageId()); } } return message; } /** * Helper to copy a message. * * @param message the message to copy * @param handle the handle the message came from * @return a copy of the message * @throws JMSException if the copy fails */ private MessageImpl copy(MessageImpl message, MessageHandle handle) throws JMSException { MessageImpl result; try { result = (MessageImpl) message.clone(); result.setJMSRedelivered(handle.getDelivered()); result.setConsumerId(handle.getConsumerId()); } catch (JMSException exception) { throw exception; } catch (CloneNotSupportedException exception) { _log.error(exception, exception); throw new JMSException(exception.getMessage()); } return result; } /** * Returns the consumer endpoint given its identifier. * * @param consumerId the consumer identifier * @return the consumer endpoint corresponding to <code>consumerId</code> * @throws JMSException if the consumer doesn't exist */ private ConsumerEndpoint getConsumer(long consumerId) throws JMSException { ConsumerEndpoint consumer = (ConsumerEndpoint) _consumers.get(new Long(consumerId)); if (consumer == null) { throw new JMSException("Consumer not registered: " + consumerId); } return consumer; } /** * Helper to clean up after a failed call. * * @param message the message to log * @param exception the exception to log */ private void cleanup(String message, Throwable exception) { _log.error(message, exception); try { if (_database.isTransacted()) { _database.rollback(); } } catch (PersistenceException error) { _log.warn("Failed to rollback after error", error); } } /** * Helper to clean up after a failed call, and rethrow. * * @param message the message to log * @param exception the exception * @throws JMSException the original exception adapted to a * <code>JMSException</code> if necessary */ private void rethrow(String message, Throwable exception) throws JMSException { cleanup(message, exception); if (exception instanceof JMSException) { throw (JMSException) exception; } throw new JMSException(exception.getMessage()); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?