⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 destinationcache.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     */
    public Iterator getConsumers() {
        return _consumers.iterator();
    }

    /**
     * Return the consumers as an array of {@link ConsumerEndpoint} objects
     * This is a safer way to get a list of consumers since it avoids
     * concurrent modification exceptions
     *
     * @return Object[] - an array of ConsumerEndpoint objects
     */
    Object[] getConsumersByArray() {
        return _consumers.toArray();
    }

    /**
     * This method is called when the {@link MessageMgr} adds a message
     * for this destination to the cache
     *
     * @param message - message added to cache
     */
    abstract public boolean messageAdded(JmsDestination destination,
                                         MessageImpl message);

    /**
     * This method is called when the {@link MessageMgr} removes a
     * message from the cache.
     *
     * @param message - message removed from cache
     */
    abstract public void messageRemoved(JmsDestination destination,
                                        MessageImpl message);

    /**
     * Return the number of messages currently active for this destination
     *
     * @return int - number of active messages
     */
    public int getMessageCount() {
        return _cache.getHandleCount();
    }

    /**
     * Notify the listeners that a non-persistent message has been added to the
     * cache
     *
     * @param handle - message that was added
     * @return boolean - true of at least one listener has processed itx
     */
    abstract boolean notifyOnAddMessage(MessageImpl message);

    /**
     * Notify the listeners that a non-persistent message has been removed form
     * the cache
     *
     * @param handle - message that was removed
     */
    abstract void notifyOnRemoveMessage(MessageImpl message);

    /**
     * Notify the listeners that a persistent message has been added to the
     * cache
     *
     * @param connection - the persistent connection to use
     * @param handle - message that was added
     * @return boolean - true of at least one listener has processed it
     * @throws PersistenceException - if there is a persistence related error
     */
    boolean notifyOnAddPersistentMessage(Connection connection,
                                         MessageImpl message)
        throws PersistenceException {

        //default implementation
        return true;
    }

    /**
     * Notify the listeners that a persistent message has been removed form
     * the cache
     *
     * @param connection - the persistent connection to use.
     * @param handle - message that was removed
     * @throws PersistenceException - if there is a persistence related error
     */
    void notifyOnRemovePersistentMessage(Connection connection,
                                         MessageImpl message)
        throws PersistenceException {
        //default implementation is empty
    }

    /**
     * Check whether there are any attached consumers to this cache
     *
     * @return boolean - true if there are attached consumers
     */
    abstract boolean hasActiveConsumers();

    /**
     * This method is called whenever a lease expires. It passes the
     * object that has expired.
     *
     * @param       leasedObject        reference to the leased object
     */
    public void onLeaseExpired(Object leasedObject) {
        if (leasedObject != null) {
            MessageHandle handle = (MessageHandle) leasedObject;

            // retrieve an instance of the message
            MessageImpl message = resolveExpiredMessage(handle);

            // determine whether  the message is persistent or not and take
            // the corresponding action
            if (handle instanceof PersistentMessageHandle) {
                Connection connection = null;
                try {
                    connection = DatabaseService.getConnection();
                    persistentMessageRemoved(connection, getDestination(),
                        message);
                    connection.commit();
                } catch (Exception exception) {
                    SQLHelper.rollback(connection);
                    _log.error("Failure in onLeaseExpired", exception);
                } finally {
                    SQLHelper.close(connection);
                }
            } else {
                // notify it's listeners that the non-persistent message has
                // been removed
                messageRemoved(getDestination(), message);
            }
        }
    }

    /**
     * Determines if this cache can be destroyed
     *
     * @return <code>true</code> if the cache can be destroyed, otherwise
     * <code>false</code>
     */
    public abstract boolean canDestroy();

    /**
     * Check to see if the message has a TTL. If so then set up a lease
     * for it. An expiry time of 0 means that the message never expires
     *
     * @param message - message to add
     */
    void checkMessageExpiry(MessageImpl message) {
        if (message != null) {
            _leaseHelper.addLease(message);
        }
    }

    /**
     * Destory this cache.
     */
    synchronized void destroy() {
        // clear the cache
        _cache.clear();

        // remove the consumers
        _consumers.clear();

        // unregister itself from the message manager
        MessageMgr.instance().removeEventListener(getDestination(), this);

        // remove the lease
        _leaseHelper.clear();
    }

    /**
     * Close the cache and unregister all the consumers. Notify any and all
     * DestinationCacheLifecycleListeners.
     * <p>
     * Once the cache is closed it will no longger receive messages for this
     * destination.
     */
    public void shutdown() {
        destroy();
    }

    /**
     * Insert the specified handle to the handles cache.
     *
     * @param handle - handle to add
     */
    void addMessage(MessageHandle handle) {
        handle.setConsumerName(getDestination().getName());
        _cache.addHandle(handle);
    }

    /**
     * Add the following handle and corresponding message to their respective
     * caches
     *
     * @param handle - handle to add
     * @param message - the corresponding message to add
     */
    void addMessage(MessageHandle handle, MessageImpl message) {
        handle.setConsumerName(getDestination().getName());
        _cache.addMessage(handle, message);
    }

    /**
     * Return the message for the specified handle
     *
     * @param handle - the handle
     * @return MessageImpl - the associated message
     */
    MessageImpl getMessage(MessageHandle handle) {
        return _cache.getMessage(handle);
    }

    /**
     * Remove the message handle from the cache, if it exists.
     *
     * @param handle - handle to remove
     * @return boolean - true if it was removed
     */
    boolean removeMessage(MessageHandle handle) {
        return _cache.removeHandle(handle);
    }

    /**
     * Remove and return the first message handle in the cache
     *
     * @return MessageHandle - the first handle or null if cache is empty
     */
    final MessageHandle removeFirstMessage() {
        return _cache.removeFirstHandle();
    }

    /**
     * Return the message handles in the cache as an array
     *
     * @return Object[] - array of message handles
     */
    final Object[] toMessageArray() {
        return _cache.getHandleArray();
    }

    /**
     * Delete the message with the specified handle from the cache
     *
     * @param handle - the handle
     */
    void deleteMessage(MessageHandle handle) {
        _cache.removeMessage(handle.getMessageId());
    }

    // implementation of Identifiable.getId
    public String getId() {
        return _id;
    }

    // implementation of GarbageCollectable.collectGarbage
    public void collectGarbage(boolean aggressive) {
        if (aggressive) {
            // clear all persistent messages in the cache
            _cache.clearPersistentMessages();
            if (_log.isDebugEnabled()) {
                _log.debug("Evicted all persistent messages from cache "
                    + getDestination().getName());
            }
        }

        if (_log.isDebugEnabled()) {
            _log.debug("DESTCACHE -" + getDestination().getName()
                + " Messages: P[" + _cache.getPersistentCount()
                + "] T[" + _cache.getTransientCount() + "] Handles: ["
                + _cache.getHandleCount() + "]");
        }
    }

    /**
     * Resolve an expired message through its handle
     *
     * @param handle the expired message's handle
     * @return the expired message. May be null.
     */
    protected MessageImpl resolveExpiredMessage(MessageHandle handle) {
        return handle.getMessage();
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -