⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagemgr.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                } catch (Exception ignore) {
                    // no-op
                }
            }
        }
    }

    /**
     * This method is used to process persistent messages published through
     * the resource manager.
     *
     * @param connection - the database connection to use.
     * @param message - the message to process
     * @throws JMSException - if the message cannot be processed
     */
    protected void addPersistentMessage(Connection connection,
                                        MessageImpl message)
        throws JMSException {

        // Use the message to retrieve the corresponding destination object.
        // This method will create the object if one does not already exist.
        JmsDestination destination = (JmsDestination) message.getJMSDestination();
        if (destination != null) {
            try {
                // notify all listeensers that a persistent message has arrived
                notifyOnAddPersistentMessage(connection, destination, message);
                _messagesProcessed++;
            } catch (PersistenceException exception) {
                throw new JMSException("Failed in addPersistentMessage : " +
                    exception.toString());
            } catch (Exception exception) {
                throw new JMSException("Failed in addPersistentMessage : " +
                    exception.toString());
            }
        } else {
            // shouldn't really get here, since the message should have been
            // checked and prepared before passed to this routine.
            _log.error("Can't locate destination for message");
        }
    }

    /**
     * Return the message given the specified message handle.
     * This will delegate to the appropriate {@link DestinationCache} or
     * {@link ConsumerManager}
     *
     * @param handle - the handle
     * @return MessageImpl - the associated message or null
     */
    MessageImpl getMessage(MessageHandle handle) {
        // precondition; ensure that the handle is not null
        if (handle == null) {
            return null;
        }

        MessageImpl message = null;
        if (handle.getDestination() instanceof JmsTopic) {
            // is for a topic so check the consumer endpoint
            // cache
            TopicConsumerEndpoint endpoint = (TopicConsumerEndpoint)
                ConsumerManager.instance().getConsumerEndpoint(
                    handle.getConsumerName());
            if (endpoint != null) {
                message = endpoint.getMessage(handle);
            }
        } else {
            // must be for a queue so check the destination cache
            DestinationCache cache =
                DestinationManager.instance().getDestinationCache(
                    handle.getDestination());
            if (cache != null) {
                message = cache.getMessage(handle);
            }
        }

        return message;
    }

    /**
     * This method prepares the message without actually passing it through the
     * system. It is used by the {@link ResourceManager} to process incoming
     * messages.
     * <p>
     * If there are any issues with the message the method will throw an
     * exception
     *
     * @param message - the message
     * @throws JMSException - if the message is invalid or cannot be prep'ed
     */
    void checkAndPrepareMessage(MessageImpl message)
        throws JMSException {
        if (message != null) {
            // mark the message as accepted and attach a sequence number
            message.setAcceptedTime((new Date()).getTime());
            message.setSequenceNumber(++_sequenceNumberGenerator);
            message.setReadOnly(true);

            if (message.getJMSDestination() == null) {
                throw new JMSException("Null destination specified in message");
            }
        } else {
            throw new JMSException("checkAndPrepareMessage failed for null message");
        }
    }

    /**
     * Returns true if there are any messages for the specified consumer
     *
     * @param consumer - the consumer to check
     * @return boolean - true if messages are queued
     * @throws JMSException - if the consumer can't be checked
     */
    public boolean hasMessages(ConsumerEndpoint consumer) throws JMSException {
        if (consumer == null) {
            throw new JMSException(
                "Can't call hasMessages with null consumer");
        }
        return (consumer.getMessageCount() > 0);
    }

    /**
     * Returns a list of active destinations
     *
     * @return List a list of JmsDestination objects
     */
    public Iterator getDestinations() {
        return DestinationManager.instance().destinations();
    }

    /**
     * Returns an iterator of active consumers registered to a given
     * destination
     *
     * @return Iterator - iterator of {@link ConsumerEndpoint} objects.
     * @throws JMSException
     */
    public Iterator getConsumers(JmsDestination destination)
        throws JMSException {
        //check to see that the destination is not null
        if (destination == null) {
            throw new JMSException("destination is null in getConsumer");
        }

        DestinationCache dest =
            DestinationManager.instance().getDestinationCache(destination);

        return (dest == null) ? null : dest.getConsumers();
    }

    /**
     * Resolves a destination given its name
     *
     * @param       name                the name of the destination
     * @return      JmsDestination      if an active destination exists for
     *                                  the given name, else it returns
     *                                  <tt>null</tt>
     */
    public JmsDestination resolve(String name) {
        return DestinationManager.instance().destinationFromString(name);
    }

    /**
     * Resolves a consumer given its destination and an identity. Should look
     * removing t from here.
     *
     * @param       destination         the destination
     * @param       name                the name of the consumer
     * @return      ConsumerIfc         if an active consumer exists for
     *                                  the given name, else it returns
     *                                  <tt>null</tt>
     */
    public ConsumerEndpoint resolveConsumer(JmsDestination destination,
                                            String id) {
        return ConsumerManager.instance().getConsumerEndpoint(id);
    }

    /**
     * Stop/start a consumer. When stopped, the consumer will not receive
     * messages until the consumer is re-started.
     * This is invoked when the underlying connection is stopped or started
     *
     * @param       consumer            the consumer to stop/start
     * @param       stop                when <tt>true</tt> stop the consumer
     *                                  else start it.
     */
    public void setStopped(ConsumerEndpoint consumer, boolean stop)
        throws JMSException {
        // need to implement this for the consumer
    }

    /**
     * Add a message listener for a specific destination to be informed
     * when messages, for the destination are added or removed from the
     * queue. More than one listener can be registered per desitnation
     * and the same listener can be registered for multiple destinations.
     * <p>
     * If a listener is already registered for a particuler destination
     * then it fails silently.
     *
     * @param destination - what messgaes to listen for
     * @param listener - a JmsMessageListener instance
     */
    public void addEventListener(JmsDestination destination,
                                 MessageManagerEventListener listener) {

        if ((destination != null) &&
            (listener != null)) {
            synchronized (_listeners) {
                if (!_listeners.containsKey(destination)) {
                    _listeners.put(destination, listener);
                }
            }
        }
    }

    /**
     * Remove the listener for the specified destination. If one is not
     * registered then ignore it.
     *
     * @param destination - destination that it listens for
     * @param listener - listener for that destination.
     */
    public void removeEventListener(JmsDestination destination,
                                    MessageManagerEventListener listener) {
        if ((destination != null) &&
            (listener != null)) {
            synchronized (_listeners) {
                if (_listeners.containsKey(destination)) {
                    _listeners.remove(destination);
                }
            }
        }
    }

    /**
     * Notify the listeners, registered for the destination that a message has
     * been added to the message manager.
     * <p>
     * All errors are propagated as JMSException exceptions
     *
     * @param destination - destination for which message exits
     * @param message - message that was added
     * @return boolean - true if the message was processed
     * @throws JMSException - for any processing error
     */
    boolean notifyOnAddMessage(JmsDestination destination,
                               MessageImpl message) throws JMSException {
        boolean result = false;
        MessageManagerEventListener listener =
            (MessageManagerEventListener) _listeners.get(destination);

        if (listener != null) {
            // if there is a registered destination cache then let the cache
            // process it.
            result = listener.messageAdded(destination, message);
        } else {
            // let the {@link DestinationManager handle the message
            result = DestinationManager.instance().messageAdded(destination,
                message);
        }

        return result;
    }

    /**
     * Notify the listeners, registered for the destination that a message has
     * been removed from the message manager. There maybe several reason why
     * this has happened (i.e the message has expired, message has been
     * purged, message has been consumed etc).
     *
     * @param destination - destination for which message exits
     * @param message - message that was removed
     * @throws JMSException for any processing error
     */
    void notifyOnRemoveMessage(JmsDestination destination,
                               MessageImpl message) throws JMSException {

        MessageManagerEventListener listener =
            (MessageManagerEventListener) _listeners.get(destination);

        if (listener != null) {
            // send the notification to the active listener
            listener.messageRemoved(destination, message);
        } else {
            // there is not active listener, send it to the Destination
            // Manager
            DestinationManager.instance().messageRemoved(destination, message);
        }
    }

    /**
     * Notify the listeners, registered for the destination that a persistent
     * message has been added to the message manager.
     *
     * @param connection - the database connection to use.
     * @param destination - destination for which message exits
     * @param message - message that was added
     * @return boolean - true if the message was processed
     * @throws JMSException - is a processing error occured
     * @throws PersistenceException - if a persistence error occured
     */
    boolean notifyOnAddPersistentMessage(Connection connection,
                                         JmsDestination destination,
                                         MessageImpl message)
        throws JMSException, PersistenceException {

        boolean result = false;
        MessageManagerEventListener listener =
            (MessageManagerEventListener) _listeners.get(destination);

        if (listener != null) {
            // if there is a registered destination cache then let the cache
            // process it.
            result = listener.persistentMessageAdded(connection,
                destination, message);
        } else {
            // let the {@link DestinationManager} handle the message
            result = DestinationManager.instance().persistentMessageAdded(
                connection, destination, message);
        }

        return result;
    }

    /**
     * Notify the listeners, registered for the destination that a persistent
     * message has been removed from the message manager. There maybe several
     * reason why this has happened (i.e the message has expired, message has
     * been purged, message has been consumed etc).
     *
     * @param connection - the database connection to use
     * @param destination - destination for which message exits
     * @param message - message that was removed
     * @throws JMSException - for any processing problem
     * @throws PersistenceException - for any persistence related problem
     */
    void notifyOnRemovePersistentMessage(Connection connection,
                                         JmsDestination destination,
                                         MessageImpl message)
        throws JMSException, PersistenceException {

        MessageManagerEventListener listener =
            (MessageManagerEventListener) _listeners.get(destination);

        if (listener != null) {
            // send the notification to the active listener
            listener.persistentMessageRemoved(connection, destination,
                message);
        } else {
            // there is not active listener, send it to the Destination
            // Manager
            DestinationManager.instance().persistentMessageRemoved(connection,
                destination, message);
        }
    }

    /**
     * Return the maximum size of the cache
     *
     * @return int - maximum size of cache
     */
    public int getMaximumSize() {
        return _maximumSize;
    }

    /**
     * Notify the destruction of a handle.
     * <p>
     * If the handle has been destroyed then we need to do the following
     * 1. if the handle is for a queue then we can remove the message
     *    from the cache
     * 2. if the handle is for a topic then we need to see whether we can
     *    garbage collect it
     *
     * @param handle a TransientMessageHandle
     */
    public void handleDestroyed(MessageHandle handle) {

        // precondition: handle != null
        if (handle == null) {
            return;
        }

        if (handle.getDestination() instanceof JmsTopic) {
            TopicConsumerEndpoint endpoint = (TopicConsumerEndpoint)
                ConsumerManager.instance().getConsumerEndpoint(
                    handle.getConsumerName());

            if (endpoint != null) {
                endpoint.deleteMessage(handle);
            }
        } else {
            DestinationCache cache =
                DestinationManager.instance().getDestinationCache(
                    handle.getDestination());

            if (cache != null) {
                cache.deleteMessage(handle);
            }
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -