⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumerendpoint.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     */
    public JmsServerSession getSession() {
        return _session;
    }
   
    /**
     * Deliver messages in the cache to the consumer
     *
     * @return <code>true</code> if the endpoint should be rescheduled
     */
    public abstract boolean deliverMessages();

    /**
     * The run method is used to asynchronously deliver the messages in the
     * cache to the consumer, by invoking {@link #deliverMessages}. 
     * <p>
     * It is scheduled by the {@link Scheduler}.
     */
    public void run() {
        synchronized (_lock) {
            if (!_closed) {
                boolean reschedule = deliverMessages();
                _scheduled = false;
                if (reschedule) {
                    schedule();
                }
            }
        }
    }

    // implementation of DestinationCacheEventListener.messageAdded
    public synchronized boolean messageAdded(MessageImpl message) {
        boolean added = false;

        // create a message handle
        try {
            // if the nolocal indicator is set and the message arrived on
            // the same connection, add this consumer then mark it as
            // received, but do not add it to the queue
            if (getNoLocal()
                && message.getConnectionId() == getConnectionId()) {
                // inform them that we have processed the message
                return true;
            }

            MessageHandle handle =
                MessageHandleFactory.getHandle(this, message);

            if (!_cache.containsHandle(handle)) {
                // if the message is not already in the cache then add it
                // and flag that we have added the message to the cache
                addMessage(handle, message);
                added = true;

                schedule();
            }
        } catch (JMSException exception) {
            _log.error("Failed to add message to endpoint", exception);
        }

        return added;
    }

    // implementation of DestinationCacheEventListener.messageRemoved
    public synchronized boolean messageRemoved(MessageImpl message) {
        boolean removed = false;

        try {
            //retrieve the message handle
            MessageHandle handle = 
                MessageHandleFactory.getHandle(this, message);

            if (_cache.containsHandle(handle)) {
                // call remove regardless whether it exists
                removeMessage(handle);
                removed = true;
            }
        } catch (JMSException exception) {
            _log.error("Failed to remove message from endpoint", exception);
        }

        return removed;
    }

    // implementation of DestinationCacheEventListener.persistentMessageAdded
    public synchronized boolean persistentMessageAdded(Connection connection,
                                                       MessageImpl message)
        throws PersistenceException {

        return messageAdded(message);
    }

    // implementation of DestinationCacheEventListener.persistentMessageRemoved
    public synchronized boolean persistentMessageRemoved(Connection connection,
                                                         MessageImpl message)
        throws PersistenceException {

        return messageRemoved(message);
    }

    /**
     * Stop/start message delivery
     *
     * @param stop if <code>true</code> to stop message delivery, otherwise
     * start it
     */
    public synchronized void setStopped(boolean stop) {
        if (stop) {
            _stopped = true;
        } else {
            _stopped = false;
            // schedule message delivery if needed
            schedule();
        }
    }

    /**
     * This message will return all unacked messages to the queue and allow
     * them to be resent to the consumer with the redelivery flag on.
     */
    public synchronized void recover() {
        // default behaviour is to do nothing
    }

    /**
     * Close this endpoint.
     * <p>
     * This synchronizes with {@link #deliverMessages} before invoking
     * @link {#doClose}
     */
    public final void close() {
        _stopped = true;

        synchronized (_lock) {
            // synchronize with deliverMessages()            
            _scheduler.remove(this); // remove this, if it is scheduled
            _scheduled = false;

        }

        synchronized (this) {
            doClose();

            // clear all messages in the cache
            if (_cache != null) {
                _cache.clear();
            }

            _closed = true;
        }
    }

    /**
     * Set the message listener for this consmer. If a message listener is set
     * then messages will be scheduled to be sent to it when they are available
     * <p>
     * Each consumer cache can only have a single message listener. To remove
     * the message listener call this method with null argument
     *
     * @param listener - the message listener to add.
     */
    public synchronized void setMessageListener(
        InternalMessageListener listener) {
        _listener = listener;
        if (listener == null) {
            // remove this from the scheduler
            _scheduler.remove(this);
            _scheduled = false;
        } else {
            // scheduler for it to run
            schedule();
        }
    }

    /**
     * Return the specified message to the cache.
     *
     * @param handle - handle to return
     */
    public synchronized void returnMessage(MessageHandle handle) {
        if (_cache != null) {
            addMessage(handle);
            schedule();
        }
    }

    /**
     * Return the next message to the client. This will also mark the message as
     * sent and move it to the sent queue
     *
     * @param wait - the number of milliseconds to wait
     * @return MessageHandle - handle to the next message in the list
     */
    abstract public MessageHandle receiveMessage(long wait);

    // implementation of GarbageCollectable.collectGarbage
    public void collectGarbage(boolean aggressive) {
        if (aggressive) {
            // clear all persistent messages in the cache
            _cache.clearPersistentMessages();
            if (_log.isDebugEnabled()) {
                _log.debug("Evicted all persistent messages from dest "
                    + getDestination().getName() + " and name "
                    + getId());
            }
        }

        if (_log.isDebugEnabled()) {
            _log.debug("ENDPOINT- " + getDestination().getName() + ":"
                + getPersistentId() + " Messages: P["
                + _cache.getPersistentCount() + "] T["
                + _cache.getTransientCount() + "] Handles: ["
                + _cache.getHandleCount() + "]");
        }
    }

    /**
     * Closes the endpoint
     */
    protected abstract void doClose();

    /**
     * Schedule asynchronouse message delivery
     */
    protected void schedule() {
        if (!_stopped && !_closed && _listener != null && !_scheduled) {
            _scheduled = true;
            _scheduler.add(this);
        }
    }

    /**
     * Clear all messages in the cache, regardless of whether they are
     * persistent or non-persistent
     */
    protected void clearMessages() {
        _cache.clear();
    }

    /**
     * Check whether the vector of handles contains one or more persistent
     * handles
     *
     * @param handles - collection of {@link MessageHandle} objects
     * @return true if there is one or more persistent handles
     */
    protected boolean collectionHasPersistentHandles(Vector collection) {
        boolean result = false;
        Enumeration enum = collection.elements();

        while (enum.hasMoreElements()) {
            if (enum.nextElement() instanceof PersistentMessageHandle) {
                result = true;
                break;
            }
        }

        return result;
    }

    /**
     * Add the handle to the cache
     *
     * @param handle - the message handle to add
     */
    protected void addMessage(MessageHandle handle) {
        handle.setConsumerName(getPersistentId());
        _cache.addHandle(handle);

        // check to see whether the consumer is waiting to
        // be notified
        if (isWaitingForMessage()) {
            notifyMessageAvailable();
        }
    }

    /**
     * Cache a handle and its corresponding message
     *
     * @param handle the handle to cache
     * @param message the corresponding message to cache
     */
    protected void addMessage(MessageHandle handle, MessageImpl message) {
        handle.setConsumerName(getPersistentId());
        _cache.addMessage(handle, message);

        // check to see whether the consumer is waiting to
        // be notified
        if (isWaitingForMessage()) {
            notifyMessageAvailable();
        }
    }

    /**
     * Return the message for the specified handle
     *
     * @param handle - the handle
     * @return MessageImpl - the associated message
     */
    protected MessageImpl getMessage(MessageHandle handle) {
        return _cache.getMessage(handle);
    }

    /**
     * Remove the handle from the cache
     *
     * @param handle the handle to remove
     * @return <code>true</code> if the message was removed
     */
    protected boolean removeMessage(MessageHandle handle) {
        return _cache.removeHandle(handle);
    }

    /**
     * Determines if a message handle is already cached
     *
     * @return <code>true</code> if it is cached
     */
    protected boolean containsMessage(MessageHandle handle) {
        return _cache.containsHandle(handle);
    }

    /**
     * Return the first message handle in the cache
     *
     * @return the first message or null if cache is empty
     */
    protected MessageHandle removeFirstMessage() {
        return _cache.removeFirstHandle();
    }

    /**
     * Delete the message with the specified handle from the cache
     *
     * @param handle the handle
     */
    protected void deleteMessage(MessageHandle handle) {
        _cache.removeMessage(handle.getMessageId());
    }

    /**
     * Determines if this endpoint has been stopped
     *
     * @return <code>true</code> if this endpoint has been stopped
     */
    protected final boolean isStopped() {
        return _stopped;
    }

    /**
     * Check if the consumer is waiting for a message. If it is then
     * notify it that a message has arrived
     */
    protected void notifyMessageAvailable() {
        // if we need to notify then send out the request
        if (isWaitingForMessage()) {
            clearWaitingForMessage();

            try {
                _session.onMessageAvailable(getClientId());
            } catch (Exception exception) {
                //getLogger().logError("Error in notifyMessageAvailable " +
                //    getDestination().getName() + " " + exception.toString());
            }
        }
    }

    /**
     * Check whether the endpoint is waiting for a message
     *
     * @return boolean
     */
    protected final boolean isWaitingForMessage() {
        return _waitingForMessage;
    }

    /**
     * Set the waiting for message flag
     */
    protected final void setWaitingForMessage() {
        _waitingForMessage = true;
    }

    /**
     * Clear the waiting for message flag
     */
    protected final void clearWaitingForMessage() {
        _waitingForMessage = false;
    }

    /**
     * Helper for {@link #deliverMessages} implementations, to determines if 
     * asynchronous message delivery should be stopped
     *
     * @return <code>true</code> if asynchronous message delivery should be 
     * stopped
     */
    protected boolean stopDelivery() {
        return (_stopped || getMessageCount() == 0 || _listener == null);
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -