⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractdestinationcache.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        // remove the consumers
        _consumers.clear();

        // unregister itself from the message manager
        MessageMgr.instance().removeEventListener(getDestination(), this);

        // remove the leases
        MessageLease[] leases = null;
        synchronized (_leases) {
            leases = (MessageLease[]) _leases.values().toArray(
                    new MessageLease[0]);
            _leases.clear();
        }

        for (int i = 0; i < leases.length; ++i) {
            MessageLease lease = leases[i];
            LeaseManager.instance().removeLease(lease);
        }
    }

    /**
     * Invoked when a message lease has expired.
     *
     * @param object an instance of {@link MessageRef}
     */
    public void onLeaseExpired(Object object) {
        MessageRef reference = (MessageRef) object;
        String messageId = ((MessageRef) reference).getMessageId();
        synchronized (_leases) {
            _leases.remove(messageId);
        }

        // determine whether the message is persistent or not and take
        // the corresponding action
        try {
            if (reference.isPersistent()) {
                Connection connection = null;
                try {
                    connection = DatabaseService.getConnection();
                    persistentMessageExpired(reference, connection);
                    reference.destroy(connection);
                    connection.commit();
                } catch (JMSException exception) {
                    SQLHelper.rollback(connection);
                    throw exception;
                } catch (Exception exception) {
                    SQLHelper.rollback(connection);
                    _log.error("Failed to expire message", exception);
                    throw new JMSException(exception.getMessage());
                } finally {
                    SQLHelper.close(connection);
                }

            } else {
                messageExpired(reference);
                reference.destroy();
            }
        } catch (JMSException exception) {
            _log.error("Failed to expire message", exception);
        }
    }

    public void collectGarbage(boolean aggressive) {
        if (aggressive) {
            // clear all persistent messages in the cache
            _cache.clearPersistentMessages();
            if (_log.isDebugEnabled()) {
                _log.debug("Evicted all persistent messages from cache "
                           + getDestination().getName());
            }
        }

        if (_log.isDebugEnabled()) {
            _log.debug("DESTCACHE -" + getDestination().getName()
                       + " Messages: P[" + _cache.getPersistentCount()
                       + "] T[" + _cache.getTransientCount() + "] Total: ["
                       + _cache.getMessageCount() + "]");
        }
    }

    /**
     * Initialise the cache from the database.
     *
     * @param connection the database connection to use
     * @throws JMSException         for any JMS error
     * @throws PersistenceException for any persistence error
     */
    protected abstract void init(Connection connection) throws JMSException,
            PersistenceException;

    /**
     * Add a message reference and its corresponding message to the cache
     *
     * @param reference the reference to the message
     * @param message   the message
     */
    protected void addMessage(MessageRef reference, MessageImpl message) {
        _cache.addMessage(reference, message);
    }

    /**
     * Returns the message cache
     *
     * @return the message cache
     */
    protected DefaultMessageCache getMessageCache() {
        return _cache;
    }

    /**
     * Determines if there are any registered consumers
     *
     * @return <code>true</code> if there are registered consumers
     */
    protected boolean hasActiveConsumers() {
        return !_consumers.isEmpty();
    }

    /**
     * Returns a consumer endpoint, given its id
     *
     * @param consumerId the consumer identity
     * @return the consumer corresponding to <code>id</code>, or
     *         <code>null</code> if none is registered
     */
    protected ConsumerEndpoint getConsumerEndpoint(long consumerId) {
        return (ConsumerEndpoint) _consumers.get(new Long(consumerId));
    }

    /**
     * Helper to return the consumers as an array
     *
     * @return the consumers of this cache
     */
    protected ConsumerEndpoint[] getConsumerArray() {
        ConsumerEndpoint[] result =
                (ConsumerEndpoint[]) _consumers.values().toArray(
                        new ConsumerEndpoint[0]);
        return result;
    }

    /**
     * Remove an expired non-peristent message, and notify any listeners
     *
     * @param reference the reference to the expired message
     * @throws JMSException for any error
     */
    protected void messageExpired(MessageRef reference)
            throws JMSException {
        // notify consumers
        String messageId = reference.getMessageId();
        ConsumerEndpoint[] consumers = getConsumerArray();
        for (int i = 0; i < consumers.length; ++i) {
            consumers[i].messageRemoved(messageId);
        }
    }

    /**
     * Remove an expired persistent message, and notify any listeners.
     *
     * @param reference the reference to the expired message
     * @param connection the database connection to use
     * @throws JMSException if a listener fails to handle the expiration
     * @throws PersistenceException if there is a persistence related problem
     */
    protected void persistentMessageExpired(MessageRef reference,
                                            Connection connection)
            throws JMSException, PersistenceException {
        // notify consumers
        String messageId = reference.getMessageId();
        ConsumerEndpoint[] consumers = getConsumerArray();

        for (int i = 0; i < consumers.length; ++i) {
            consumers[i].persistentMessageRemoved(messageId, connection);
        }
    }

    /**
     * Check to see if the message has a TTL. If so then set up a lease for it.
     * An expiry time of 0 means that the message never expires
     *
     * @param reference a reference to the message
     * @param message   the message
     * @throws JMSException if the JMSExpiration property can't be accessed
     */
    protected void checkMessageExpiry(MessageRef reference,
                                      MessageImpl message) throws JMSException {
        checkMessageExpiry(reference, message.getJMSExpiration());
    }

    /**
     * Check to see if the message has a TTL. If so then set up a lease for it.
     * An expiry time of 0 means that the message never expires
     *
     * @param reference  a reference to the message
     * @param expiryTime the time when the message expires
     */
    protected void checkMessageExpiry(MessageRef reference,
                                      long expiryTime) {
        if (expiryTime != 0) {
            synchronized (_leases) {
                // ensure that a lease for this message does not already exist.
                if (!_leases.containsKey(reference.getMessageId())) {
                    long duration = expiryTime - System.currentTimeMillis();
                    if (duration <= 0) {
                        duration = 1;
                    }
                    MessageLease lease = new MessageLease(reference, duration,
                                                          this);
                    LeaseManager.instance().addLease(lease);
                    _leases.put(reference.getMessageId(), lease);
                }
            }
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -