⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagehandles.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
 */

package org.exolab.jms.persistence;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.message.MessageId;
import org.exolab.jms.messagemgr.PersistentMessageHandle;


/**
 * This class provides persistency for PersistentMessageHandle objects
 * in an RDBMS database
 *
 * @version     $Revision: 1.30 $ $Date: 2004/01/08 05:55:07 $
 * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
 * @see         org.exolab.jms.persistence.RDBMSAdapter
 */
class MessageHandles {

    /**
     * prepared statement for inserting a message handle
     */
    private static final String INSERT_MSG_HANDLE_STMT =
        "insert into message_handles values " + "(?,?,?,?,?,?,?,?)";

    /**
     * prepared statements for deleting message handle
     */
    private static final String DELETE_MSG_HANDLE_STMT1 =
        "delete from message_handles where messageId=? and consumerId=?";
    private static final String DELETE_MSG_HANDLE_STMT2 =
        "delete from message_handles where messageId=? and destinationId=? " +
        "and consumerId=?";

    /**
     * Delete all message handles with the specified message id
     */
    private static final String DELETE_MSG_HANDLES_STMT =
        "delete from message_handles where messageId=?";

    /**
     * determine the number of handles with the specified message identity
     */
    private static final String GET_MSG_HANDLE_COUNT =
        "select count(messageId) from message_handles where messageId=?";

    /**
     * Delete a message from the message table
     */
    private static final String DELETE_MESSAGE =
        "delete from messages where messageId=?";

    /**
     * Update a row in the message handles table
     */
    private static final String UPDATE_MSG_HANDLE_STMT =
        "update message_handles set delivered=? where messageId=? and " +
        "destinationId=? and consumerId=?";

    /**
     * Delete all message handles for a destination
     */
    private static final String DELETE_MSG_HANDLES_FOR_DEST =
        "delete from message_handles where destinationId=?";

    /**
     * Retrieve all message handles for a particular consumer
     */
    private static final String GET_MSG_HANDLES_FOR_DEST =
        "select * from message_handles where consumerId=? order by " +
        "acceptedTime asc";

    /**
     * Retrieve a range of message handles between the specified times
     */
    private static final String GET_MESSAGE_HANDLES_IN_RANGE =
        "select distinct messageId from message_handles where " +
        " acceptedTime >= ? and acceptedTime <=?";

    /**
     * Retrieve a handle with the specified id
     */
    private static final String GET_MESSAGE_HANDLE_WITH_ID =
        "select distinct messageId from message_handles where messageId=?";

    /**
     * Return the number of messages and a specified destination and cousmer
     */
    private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
        "select count(messageId) from message_handles where destinationId=? " +
        "and consumerId=?";

    /**
     * Return the number of messages and a specified consumer
     */
    private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
        "select count(messageId) from message_handles where consumerId=?";

    /**
     * Delete all expired messages
     */
    private static final String DELETE_EXPIRED_MESSAGES =
        "delete from message_handles where consumerId=? and expiryTime != 0 " +
        "and expiryTime<?";

    /**
     * Singleton to this class
     */
    private static MessageHandles _instance;

    /**
     * Cache the transaction retry count
     */
    private static int _retryCount = 10;

    /**
     * Cache the retry interval in milliseconds
     */
    private static long _retryInterval = 50;

    /**
     * Used to ensure that only one thread initialises the class
     */
    private static final Object _block = new Object();

    /**
     * The logger
     */
    private static final Log _log = LogFactory.getLog(MessageHandles.class);


    /**
     * Returns the singleton instance.
     *
     * Note that initialise() must have been invoked first for this
     * to return a valid instance.
     *
     * @return MessageHandles
     */
    public static MessageHandles instance() {
        return _instance;
    }

    /**
     * Constructor
     */
    protected MessageHandles() {
    }

    /**
     * Initialise the singleton _instance
     *
     * @return      MessageHandles
     */
    public static MessageHandles initialise() {
        if (_instance == null) {
            synchronized (_block) {
                if (_instance == null) {
                    _instance = new MessageHandles();
                }
            }
        }

        return _instance;
    }

    /**
     * Add the specified message handle to the database
     *
     * @param connection - the connection to use
     * @param handle - message handle to add
     * @throws PersistenceException - if add does not complete
     */
    public void addMessageHandle(Connection connection,
                                 PersistentMessageHandle handle)
        throws PersistenceException {

        if (_log.isDebugEnabled()) {
            _log.debug("addMessageHandle(handle=[consumer="
                       + handle.getConsumerName()
                       + ", destination=" + handle.getDestination() 
                       + ", id=" + handle.getMessageId().getId() + "])");
        }

        PreparedStatement insert = null;
        try {
            // map the destination name to an actual identity
            long destinationId = Destinations.instance().getId(
                handle.getDestination().getName());
            if (destinationId == 0) {
                throw new PersistenceException(
                    "Cannot add message handle id=" + handle.getMessageId() +
                    " for destination=" + handle.getDestination().getName() +
                    " and consumer=" + handle.getConsumerName() +
                    " since the destination cannot be mapped to an id");
            }

            // map the consumer name ot an identity
            long consumerId = Consumers.instance().getConsumerId(
                handle.getConsumerName());
            if (consumerId == 0) {
                throw new PersistenceException(
                    "Cannot add message handle id=" + handle.getMessageId() +
                    " for destination=" + handle.getDestination().getName() +
                    " and consumer=" + handle.getConsumerName() +
                    " since the consumer cannot be mapped to an id");
            }

            insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
            insert.setString(1, handle.getMessageId().getId());
            insert.setLong(2, destinationId);
            insert.setLong(3, consumerId);
            insert.setInt(4, handle.getPriority());
            insert.setLong(5, handle.getAcceptedTime());
            insert.setLong(6, handle.getSequenceNumber());
            insert.setLong(7, handle.getExpiryTime());
            insert.setInt(8, (handle.getDelivered()) ? 1 : 0);

            // execute the insert
            if (insert.executeUpdate() != 1) {
                _log.error(
                    "Failed to execute addMessageHandle for handle="
                    + handle.getMessageId().getId() + ", destination Id="
                    + destinationId);
            }
        } catch (SQLException exception) {
            throw new PersistenceException("Failed to add message handle=" +
                handle, exception);
        } finally {
            SQLHelper.close(insert);
        }
    }

    /**
     * Remove the specified message handle from the database. Once the handle
     * has been removed check to see whether there are any more message handles
     * referencing the same message. If there are not then remove the
     * corresponding message from the messages tables.
     *
     * @param connection - the connection to use
     * @param  handle - the handle to remove
     * @throws  PersistenceException - sql releated exception
     */
    public void removeMessageHandle(Connection connection,
                                    PersistentMessageHandle handle)
        throws PersistenceException {

        if (_log.isDebugEnabled()) {
            _log.debug("removeMessageHandle(handle=[consumer="
                       + handle.getConsumerName()
                       + ", destination=" + handle.getDestination() 
                       + ", id=" + handle.getMessageId().getId() + "])");
        }

        PreparedStatement delete = null;
        PreparedStatement select = null;
        ResultSet rs = null;

        try {
            // first check to see that the consumer exists and only
            // proceed if it non-zero.
            long consumerId = Consumers.instance().getConsumerId(
                handle.getConsumerName());
            if (consumerId != 0) {
                // get the message id
                String id = handle.getMessageId().getId();

                // map the destination name to an actual identity. If it is
                // null then the destination does not currently exist but we
                // may need to delete orphaned handles
                long destinationId = Destinations.instance().getId(
                    handle.getDestination().getName());

                if (destinationId == 0) {
                    delete = connection.prepareStatement(
                        DELETE_MSG_HANDLE_STMT1);
                    delete.setString(1, id);
                    delete.setLong(2, consumerId);

                } else {
                    delete = connection.prepareStatement(
                        DELETE_MSG_HANDLE_STMT2);
                    delete.setString(1, id);
                    delete.setLong(2, destinationId);
                    delete.setLong(3, consumerId);
                }

                // execute the delete
                if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
                    // only log if the message hasn't been garbage
                    // collected
                    _log.error("Failed to execute removeMessageHandle for "
                        + "handle=" + id + " destination id="
                        + destinationId + " consumer id=" + consumerId);
                }

                // if there are no more handles with the specified message id
                // then delete the corresponding message from the message table
                select = connection.prepareStatement(GET_MSG_HANDLE_COUNT);
                select.setString(1, id);
                rs = select.executeQuery();
                if (rs.next() && (rs.getInt(1) == 0)) {
                    delete.close();
                    delete = connection.prepareStatement(DELETE_MESSAGE);
                    delete.setString(1, id);
                    if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
                        // can get 2 durable consumers trying to do this
                        // simultaneously, so don't log an error
                        // See bug 819212.
                        if (_log.isDebugEnabled()) {
                            _log.debug(
                                "Failed to delete the message with id=" + id
                                + " in a call to removeMessageHandle.");
                        }
                    }
                }
            }
        } catch (SQLException exception) {
            throw new PersistenceException("Failed to remove message handle=" +
                handle, exception);
        } finally {
            SQLHelper.close(rs);
            SQLHelper.close(delete);
            SQLHelper.close(select);
        }
    }

    /**
     * Update the specified message handle from the database
     *
     * @param connection - the connection to use
     * @param  handle - the handle to update
     * @throws  PersistenceException - sql releated exception
     */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -