sessionconsumer.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 771 行 · 第 1/2 页

JAVA
771
字号
    /**     * Browse up to count messages.     *     * @param consumerId the consumer identifier     * @param count      the maximum number of messages to receive     * @return a list of {@link MessageImpl} instances     * @throws JMSException for any JMS error     */    public List browse(long consumerId, int count) throws JMSException {        ConsumerEndpoint consumer = getConsumer(consumerId);        if (!(consumer instanceof QueueBrowserEndpoint)) {            throw new JMSException("Can't browse messages: invalid consumer");        }        List messages = new ArrayList(count);        try {            _database.begin();            for (int i = 0; i < count && !_stop.get();) {                MessageHandle handle = consumer.receive(_stop);                if (handle == null) {                    break;                }                MessageImpl orig = handle.getMessage();                if (orig != null) {                    messages.add(copy(orig, handle));                    ++i;                }            }            _database.commit();        } catch (Exception exception) {            rethrow("Failed to browse messages", exception);        }        return messages;    }    /**     * Acknowledge that a message has been processed.     *     * @param consumerId the identity of the consumer performing the ack     * @param messageId  the message identifier     * @throws JMSException for any error     */    public synchronized void acknowledge(long consumerId, String messageId)            throws JMSException {        try {            _database.begin();            _sent.acknowledge(messageId, consumerId);            _database.commit();        } catch (Exception exception) {            rethrow("Failed to acknowledge message", exception);        }    }    /**     * Close the consumer.     *     * @throws JMSException for any eror     */    public synchronized void close() throws JMSException {        _log.debug("close");        stop();        _listener = null;        try {            _database.begin();            _sent.clear();            _database.commit();        } catch (Exception exception) {            rethrow(exception.getMessage(), exception);        }    }    /**     * Notifies that a message is available for a particular consumer.     *     * @param consumer the consumer     */    public void messageAvailable(ConsumerEndpoint consumer) {        if (queue(consumer)) {            try {                _runner.schedule();            } catch (InterruptedException exception) {                _log.error("Failed to schedule worker", exception);            }        }    }    /**     * Send messages to the client.     */    private void dispatch() {        final Condition timeout = TimedCondition.after(30 * 1000);        Condition done = new Condition() {            public boolean get() {                return _stop.get() || timeout.get();            }        };        _log.debug("dispatch");        int sent = 0;        while (sent < MAX_MESSAGES && !done.get()) {            ConsumerEndpoint consumer;            synchronized (_pending) {                if (!_pending.isEmpty()) {                    consumer = (ConsumerEndpoint) _pending.removeFirst();                } else {                    break;                }            }            if (wantsMessages(consumer)) {                if (consumer.isAsynchronous()) {                    if (send(consumer, done)) {                        ++sent;                    }                    if (needsScheduling(consumer)) {                        queue(consumer);                    }                } else {                    notifyMessageAvailable();                }            }        }        boolean empty;        synchronized (_pending) {            empty = _pending.isEmpty();        }        if (!empty && !_stop.get()) {            // reschedule this if needed            try {                _runner.schedule();            } catch (InterruptedException exception) {                _log.error("Failed to reschedule worker", exception);            }        }        _log.debug("dispatch[sent=" + sent + "]");    }    private void notifyMessageAvailable() {        try {            // notify the client sesssion.            _listener.onMessageAvailable();        } catch (RemoteException exception) {            _log.debug("Failed to notify client", exception);        }    }    private boolean queue(ConsumerEndpoint consumer) {        boolean queued = false;        if (!_stop.get()) {            synchronized (_pending) {                if (!_pending.contains(consumer)) {                    _pending.add(consumer);                    queued = true;                }            }        }        return queued;    }    private boolean send(ConsumerEndpoint consumer, Condition cancel) {        boolean sent = false;        MessageHandle handle = null;        try {            _database.begin();            try {                synchronized (_removeLock) {                    _consumerId = consumer.getId();                }                handle = consumer.receive(cancel);                if (handle != null) {                    MessageImpl message = handle.getMessage();                    if (message != null) {                        // send the client a copy.                        message = copy(message, handle);                        // clear any wait condition                        // @todo - possible race condition? Could                        // syncbronous client timeout and request again,                        // and this trash subsequent wait?                        consumer.setWaitingForMessage(null);                        _sent.preSend(handle);                        _database.commit();                        // send the message                        sent = send(message);                        if (sent) {                            _database.begin();                            _sent.postSend(handle);                            _database.commit();                        }                    }                } else {                    _database.commit();                }            } finally {                synchronized (_removeLock) {                    _consumerId = -1;                    _removeLock.notify();                }            }        } catch (Exception exception) {            cleanup(exception.getMessage(), exception);        }        if (!sent && handle != null) {            try {                _database.begin();                handle.release();                _database.commit();            } catch (Exception exception) {                cleanup("Failed to release unsent message", exception);            }        }        return sent;    }    /**     * Send the specified message to the client.     *     * @param message the message     * @return <code>true</code> if the message was successfully sent     */    protected boolean send(MessageImpl message) {        boolean delivered = false;        try {            // send the message to the listener.            delivered = _listener.onMessage(message);            if (_log.isDebugEnabled()) {                _log.debug("send[JMSMessageID=" + message.getMessageId()                        + ", delivered=" + delivered + "]");            }        } catch (RemoteException exception) {            _log.info("Failed to notify client", exception);        }        return delivered;    }    private boolean wantsMessages(ConsumerEndpoint consumer) {        boolean result = false;        if (consumer.isAsynchronous() || consumer.isWaitingForMessage()) {            result = true;        }        return result;    }    private boolean needsScheduling(ConsumerEndpoint consumer) {        boolean result = false;        if (wantsMessages(consumer) && consumer.getMessageCount() != 0) {            result = true;        }        return result;    }    private MessageImpl doReceive(long consumerId, final Condition wait)            throws JMSException {        ConsumerEndpoint consumer = getConsumer(consumerId);        Condition cancel;        if (wait != null) {            cancel = new Condition() {                public boolean get() {                    return _stop.get() || !wait.get();                }            };        } else {            cancel = _stop;        }        MessageImpl message = null;        try {            _database.begin();            MessageHandle handle = consumer.receive(cancel);            if (handle != null) {                // retrieve the message and copy it                message = handle.getMessage();                if (message != null) {                    message = copy(message, handle);                }            }            if (message == null) {                // no message available. Mark the consumer as (possibly) waiting                // for a message.                consumer.setWaitingForMessage(wait);            } else {                // clear any wait condition                consumer.setWaitingForMessage(null);                // if we have a non-null message then add it to the sent message                // cache. Additionally, if we are part of a global transaction                // then we must also send it to the ResourceManager for recovery.                _sent.preSend(handle);            }            _database.commit();        } catch (Exception exception) {            rethrow(exception.getMessage(), exception);        }        if (_log.isDebugEnabled()) {            if (message != null) {                _log.debug("doReceive(consumerId=" + consumerId +                        ") -> JMSMesssageID=" + message.getMessageId());            }        }        return message;    }    /**     * Helper to copy a message.     *     * @param message the message to copy     * @param handle  the handle the message came from     * @return a copy of the message     * @throws JMSException if the copy fails     */    private MessageImpl copy(MessageImpl message, MessageHandle handle)            throws JMSException {        MessageImpl result;        try {            result = (MessageImpl) message.clone();            result.setJMSRedelivered(handle.getDelivered());            result.setConsumerId(handle.getConsumerId());        } catch (JMSException exception) {            throw exception;        } catch (CloneNotSupportedException exception) {            _log.error(exception, exception);            throw new JMSException(exception.getMessage());        }        return result;    }    /**     * Returns the consumer endpoint given its identifier.     *     * @param consumerId the consumer identifier     * @return the consumer endpoint corresponding to <code>consumerId</code>     * @throws JMSException if the consumer doesn't exist     */    private ConsumerEndpoint getConsumer(long consumerId)            throws JMSException {        ConsumerEndpoint consumer                = (ConsumerEndpoint) _consumers.get(new Long(consumerId));        if (consumer == null) {            throw new JMSException("Consumer not registered: " + consumerId);        }        return consumer;    }    /**     * Helper to clean up after a failed call.     *     * @param message   the message to log     * @param exception the exception to log     */    private void cleanup(String message, Throwable exception) {        _log.error(message, exception);        try {            if (_database.isTransacted()) {                _database.rollback();            }        } catch (PersistenceException error) {            _log.warn("Failed to rollback after error", error);        }    }    /**     * Helper to clean up after a failed call, and rethrow.     *     * @param message   the message to log     * @param exception the exception     * @throws JMSException the original exception adapted to a     *                      <code>JMSException</code> if necessary     */    private void rethrow(String message, Throwable exception)            throws JMSException {        cleanup(message, exception);        if (exception instanceof JMSException) {            throw (JMSException) exception;        }        throw new JMSException(exception.getMessage());    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?