⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagemgr.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     */
    public void add(Connection connection, MessageImpl message)
            throws JMSException {

        JmsDestination destination =
                (JmsDestination) message.getJMSDestination();

        // if the message's delivery mode is PERSISTENT, and the destination
        // is also persistent, then then process it accordingly, otherwise use
        // the non-persistent quality of service
        if (message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
                && DestinationManager.instance().isPersistent(destination)) {
            addPersistentMessage(connection, message);
        } else {
            addNonPersistentMessage(message);
        }
    }

    /**
     * Processes a non-persistent message
     *
     * @param message - the message to add
     * @throws JMSException if the message cannot be processed
     */
    private void addNonPersistentMessage(MessageImpl message)
            throws JMSException {

        // notify all registered listeners that a new message has arrived
        // for the specified destination.
        JmsDestination destination = (JmsDestination) message.getJMSDestination();

        MessageManagerEventListener listener =
                (MessageManagerEventListener) _listeners.get(destination);

        if (listener != null) {
            // if there is a registered destination cache then let the cache
            // process it.
            listener.messageAdded(destination, message);
        } else {
            // let the DestinationManager handle the message
            DestinationManager.instance().messageAdded(destination, message);
        }
    }

    /**
     * Add a persistent message
     *
     * @param message the message to add
     * @throws JMSException if the message cannot be processed
     */
    private void addPersistentMessage(MessageImpl message) throws JMSException {
        JmsDestination destination =
                (JmsDestination) message.getJMSDestination();

        Connection connection = null;
        // do all persistent work in this block
        try {
            connection = DatabaseService.getConnection();

            // add the message to the database
            DatabaseService.getAdapter().addMessage(connection, message);

            // notify all listeners that a persistent message has arrived
            notifyOnAddPersistentMessage(connection, destination, message);

            // commit the work
            connection.commit();
        } catch (Exception exception) {
            SQLHelper.rollback(connection);
            _log.error("Failed to make message persistent", exception);
            throw new JMSException("Failed to make message persistent: " +
                                   exception.toString());
        } finally {
            SQLHelper.close(connection);
        }
    }

    /**
     * This method is used to process persistent messages published through the
     * resource manager.
     *
     * @param connection - the database connection to use.
     * @param message    - the message to process
     * @throws JMSException - if the message cannot be processed
     */
    private void addPersistentMessage(Connection connection,
                                      MessageImpl message) throws JMSException {
        JmsDestination destination = (JmsDestination) message.getJMSDestination();
        try {
            // notify all listeners that a persistent message has arrived
            notifyOnAddPersistentMessage(connection, destination, message);
        } catch (PersistenceException exception) {
            throw new JMSException("Failed in addPersistentMessage : "
                                   + exception.toString());
        } catch (Exception exception) {
            throw new JMSException("Failed in addPersistentMessage : "
                                    + exception.toString());
        }
    }

    /**
     * Prepares a message prior to it being passed through the system.
     * This
     *
     * @param message the message
     * @throws JMSException - if the message is invalid or cannot be prep'ed
     */
    public void prepare(MessageImpl message)
            throws JMSException {
        if (message == null) {
            throw new JMSException("Null message");
        }
        Destination destination = message.getJMSDestination();
        if (destination == null) {
            throw new InvalidDestinationException("Message has no destination");
        }
        if (!(destination instanceof JmsDestination)) {
            throw new InvalidDestinationException(
                    "Destination not a JmsDestination");
        }

        // mark the message as accepted and attach a sequence number
        message.setAcceptedTime((new Date()).getTime());
        message.setSequenceNumber(++_sequenceNumberGenerator);
        message.setReadOnly(true);
    }

    /**
     * Resolves a destination given its name
     *
     * @param name the name of the destination
     * @return JmsDestination      if an active destination exists for the given
     *         name, else it returns <tt>null</tt>
     */
    public JmsDestination resolve(String name) {
        return DestinationManager.instance().getDestination(name);
    }

    /**
     * Resolves a consumer given its destination and an identity. Should look
     * removing t from here.
     *
     * @param destination the destination
     * @param consumerId  the consumer identifier
     * @return ConsumerIfc         if an active consumer exists for the given
     *         name, else it returns <tt>null</tt>
     */
    public ConsumerEndpoint resolveConsumer(JmsDestination destination,
                                            String consumerId) {
        return ConsumerManager.instance().getConsumerEndpoint(consumerId);
    }

    /**
     * Stop/start a consumer. When stopped, the consumer will not receive
     * messages until the consumer is re-started. This is invoked when the
     * underlying connection is stopped or started
     *
     * @param consumer the consumer to stop/start
     * @param stop     when <tt>true</tt> stop the consumer else start it.
     */
    public void setStopped(ConsumerEndpoint consumer, boolean stop)
            throws JMSException {
        // need to implement this for the consumer
    }

    /**
     * Add a message listener for a specific destination to be informed when
     * messages, for the destination are added or removed from the queue. More
     * than one listener can be registered per desitnation and the same listener
     * can be registered for multiple destinations.
     * <p/>
     * If a listener is already registered for a particuler destination then it
     * fails silently.
     *
     * @param destination - what messgaes to listen for
     * @param listener    - a JmsMessageListener instance
     */
    public void addEventListener(JmsDestination destination,
                                 MessageManagerEventListener listener) {

        if ((destination != null) &&
                (listener != null)) {
            synchronized (_listeners) {
                if (!_listeners.containsKey(destination)) {
                    _listeners.put(destination, listener);
                }
            }
        }
    }

    /**
     * Remove the listener for the specified destination. If one is not
     * registered then ignore it.
     *
     * @param destination - destination that it listens for
     * @param listener    - listener for that destination.
     */
    public void removeEventListener(JmsDestination destination,
                                    MessageManagerEventListener listener) {
        if ((destination != null) &&
                (listener != null)) {
            synchronized (_listeners) {
                if (_listeners.containsKey(destination)) {
                    _listeners.remove(destination);
                }
            }
        }
    }

    /**
     * Notify the listeners registered for the destination that a persistent
     * message has been added to the message manager.
     *
     * @param connection  the database connection to use.
     * @param destination the message destination
     * @param message     the message that was added
     * @throws JMSException         is a processing error occured
     * @throws PersistenceException if a persistence error occured
     */
    private void notifyOnAddPersistentMessage(Connection connection,
                                              JmsDestination destination,
                                              MessageImpl message)
            throws JMSException, PersistenceException {

        MessageManagerEventListener listener =
                (MessageManagerEventListener) _listeners.get(destination);

        if (listener != null) {
            // if there is a registered destination cache then let the cache
            // process it.
            listener.persistentMessageAdded(connection, destination, message);
        } else {
            // let the DestinationManager handle the message
            DestinationManager.instance().persistentMessageAdded(connection,
                                                                 destination,
                                                                 message);
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -