⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagemgr.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
 */
package org.exolab.jms.messagemgr;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.transaction.TransactionManager;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.exolab.core.service.BasicService;
import org.exolab.core.service.ServiceException;
import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsQueue;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.config.AdministeredDestinations;
import org.exolab.jms.config.AdministeredQueue;
import org.exolab.jms.config.AdministeredTopic;
import org.exolab.jms.config.Configuration;
import org.exolab.jms.config.DatabaseConfiguration;
import org.exolab.jms.config.MessageManagerConfiguration;
import org.exolab.jms.config.Subscriber;
import org.exolab.jms.message.MessageHandle;
import org.exolab.jms.message.MessageImpl;
import org.exolab.jms.persistence.DatabaseService;
import org.exolab.jms.persistence.PersistenceException;


/**
 * This is the active message handling component within the JMS server.
 * Messages are passed in and added to the appropriate dispatchers for delivery
 * to the clients.
 *
 * @version     $Revision: 1.93 $ $Date: 2003/08/17 01:32:24 $
 * @author      <a href="mailto:mourikis@intalio.com">Jim Mourikis</a>
 * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
 */
public class MessageMgr extends BasicService {

    /**
     * The service name of the message manager
     */
    private final static String MM_SERVICE_NAME = "MessageManager";

    /**
     * Caches the singleton instance of the message manager.
     */
    private static MessageMgr _instance;

    /**
     * used to synchronise the singleton construction
     */
    private static final Object _block = new Object();

    /**
     * Maintain a list of registered MessageManagerEventListener objects, that
     * get notified when certain events occur in the MessageManager
     */
    private transient HashMap _listeners = new HashMap(1023);

    /**
     * The sequence number generator is used to differentiate messages arriving
     * on the same millisecond
     */
    private long _sequenceNumberGenerator = 0;

    /**
     * This is the maximum size the cache can reach before we are forced to
     * run garbage collection. Garbage collection will also execute in the
     * background periodically to remove processed messages from the cache.
     */
    private int _maximumSize = 2500;

    /**
     * Tracks the number of messages processed
     */
    private long _messagesProcessed;

    /**
     * The logger
     */
    private static final Log _log = LogFactory.getLog(MessageMgr.class);


    /**
     * Create and return an instance of the singleton. If the singleton already
     * exists then simply return it. If there is a problem creating the
     * singleton then throw an exception
     *
     * @return MessageMgr - the singleton instance
     * @throws MessageMgrException
     */
    public static MessageMgr createInstance() throws MessageMgrException {
        if (_instance == null) {
            synchronized (_block) {
                if (_instance == null) {
                    _instance = new MessageMgr();
                }
            }
        }
        return _instance;
    }

    /**
     * Return an instance to the MessageMgr singleton. This method assumes that
     * the singleton has already been created with a call to
     * {@link #createInstance}
     *
     * @return MessageMgr
     */
    public static MessageMgr instance() {
        return _instance;
    }

    /**
     * The constructor will register itself with the garbage collection
     * service.
     *
     * @throws MessageMgrException - if there are problems during construction
     */
    private MessageMgr() throws MessageMgrException {
        super(MM_SERVICE_NAME);
    }

    // ovverride BasicService.start
    public void start() throws ServiceException {
        try {
            DestinationManager.createInstance();
            ConsumerManager.createInstance();
        } catch (ServiceException exception) {
            throw exception;
        } catch (Exception exception) {
            String msg = "Failed to start MessageMgr";
            _log.error(msg, exception);
            throw new ServiceException(msg + ":" + exception);
        }
    }

    // implement BasicService.run
    public void run() {
        // do nothing
    }

    // override BasicService.stop
    public synchronized void stop() throws ServiceException {
        try {
            // destroy the consumer manager.
            ConsumerManager.instance().destroy();

            // destroy the destination manager.
            DestinationManager.instance().destroy();

            // clear state
            _listeners.clear();
        } catch (Exception error) {
            error.printStackTrace();
            throw new ServiceException("Failed to stop MessageMgr : " +
                error.toString());
        }

        // clear the static reference
        synchronized (_block) {
            _instance = null;

        }
    }

    /**
     * Create the specified destination. The destination is a container
     * for messages and consumers. Consumers listen for messages posted on a
     * particular destination.
     * <p>
     * This can be called multiple times without any side effects. If the
     * destination is null then it throws a JMSException
     *
     * @param destination - create this destination
     * @throws JMSException - if the params is null
     */
    public void addDestination(JmsDestination destination)
        throws JMSException {

        // check the methods preconditions
        if (destination == null) {
            throw new JMSException("Call to addDestination with null object");
        }

        DestinationManager.instance().createDestinationCache(destination);
    }

    /**
     * Remove this destination and all attached consumers. If the destination
     * is null then throw an exception.
     *
     * @param destination - the destination to remove
     * @throws JMSException
     */
    public void removeDestination(JmsDestination destination)
        throws JMSException {

        // check the method's preconditions
        if (destination == null) {
            throw new JMSException("Call to removeDestination with null object");
        }

        DestinationManager.instance().destroyDestinationCache(destination);
    }

    /**
     * Return true if the specified destination exists.
     *
     * @param destination - destination to check
     * @return boolean - true if a it exists
     */
    public boolean exists(JmsDestination destination) {
        return DestinationManager.instance().hasDestinationCache(destination);
    }

    /**
     * Add a message to the message manager for the specified destination.
     * If the message or the destination are null then throw a JMSException
     * <p>
     * If the destination, specified in the message, does not exist then
     * create it.
     * destinations
     *
     * @param       message             the message to add
     * @throws      JMSException        if the message cannot be added
     */
    public void add(MessageImpl message) throws JMSException {
        if (message != null) {
            // if the message is persistent then process it accordingly,
            // otherwise use the non-persistent quality of service
            if (message.isPersistent()) {
                addPersistentMessage(message);
            } else {
                addNonPersistentMessage(message);
            }
        } else {
            _log.error("Cannot process a null message.");
        }
    }

    /**
     * Add a message to the message manager for the specified destination.
     * Note that this method is called exclusively by the 
     * {@link ResourceManager} and should not be used for any other purpose.
     *
     * @param connection - this is the database connection that is used
     * @param message - the message to add
     * @throws JMSException - thrown if  there is a problem processing msg
     */
    void add(Connection connection, MessageImpl message) throws JMSException {
        if (message != null) {
            // if the message is persistent then process it accordingly,
            // otherwise use the non-persistent quality of service
            if (message.isPersistent()) {
                addPersistentMessage(connection, message);
            } else {
                addNonPersistentMessage(message);
            }
        } else {
            _log.error("Cannot process a null message.");
        }
    }

    /**
     * This method is used to process non-persistent messages.
     *
     * @param message - the message to process
     * @throws JMSException - if the message cannot be processed
     */
    protected void addNonPersistentMessage(MessageImpl message)
        throws JMSException {
        // mark the message as accepted and attach a sequence number
        message.setAcceptedTime((new Date()).getTime());
        message.setSequenceNumber(++_sequenceNumberGenerator);
        message.setReadOnly(true);

        // Use the message to retrieve the corresponding destination object.
        JmsDestination destination =
            (JmsDestination) message.getJMSDestination();
        if (destination != null) {
            // notify all registered listeners that a new message has arrived
            // for the specified destination.
            notifyOnAddMessage(destination, message);
            _messagesProcessed++;
        } else {
            _log.error("Can't locate destination for message");
        }
    }

    /**
     * This method is used to process persistent messages.
     *
     * @param message - the message to process
     * @throws JMSException - if the message cannot be processed
     */
    protected void addPersistentMessage(MessageImpl message)
        throws JMSException {

        // mark the message as accepted and attach a sequence number
        message.setAcceptedTime((new Date()).getTime());
        message.setSequenceNumber(++_sequenceNumberGenerator);
        message.setReadOnly(true);

        JmsDestination destination =
            (JmsDestination) message.getJMSDestination();
        if (destination == null) {
            throw new JMSException(
                "Can't process message - JMSDestination is null");
        }

        Connection connection = null;
        TransactionManager tm = null;

        // do all persistent work in this block
        try {
            // get a database connection
            connection = DatabaseService.getConnection();

            // add the message to the database
            DatabaseService.getAdapter().addMessage(connection, message);

            if (destination instanceof JmsTopic) {
                // let the consumer manager handle this
                ConsumerManager.instance().persistentMessageAdded(
                    connection, message);
            } else {
                // if this message is for a queue then simply create a
                // persistent handle for the destination
                MessageHandleFactory.createPersistentHandle(
                    connection, destination, null, message);
            }

            // commit the persistent message and handle
            connection.commit();

            // Notify all listeners that a persistent message has arrived
            notifyOnAddPersistentMessage(null, destination, message);
            _messagesProcessed++;
        } catch (Exception exception) {
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (SQLException ignore) {
                    // no-op
                }
            }
            _log.error("Failed to make message persistent", exception);
            throw new JMSException(
                "Failed to make message persistent: " +
                exception.toString());
        } finally {
            if (connection != null) {
                try {
                    connection.close();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -