⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagehandles.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 * Redistribution and use of this software and associated documentation
 * ("Software"), with or without modification, are permitted provided
 * that the following conditions are met:
 *
 * 1. Redistributions of source code must retain copyright
 *    statements and notices.  Redistributions must also contain a
 *    copy of this document.
 *
 * 2. Redistributions in binary form must reproduce the
 *    above copyright notice, this list of conditions and the
 *    following disclaimer in the documentation and/or other
 *    materials provided with the distribution.
 *
 * 3. The name "Exolab" must not be used to endorse or promote
 *    products derived from this Software without prior written
 *    permission of Exoffice Technologies.  For written permission,
 *    please contact info@exolab.org.
 *
 * 4. Products derived from this Software may not be called "Exolab"
 *    nor may "Exolab" appear in their names without prior written
 *    permission of Exoffice Technologies. Exolab is a registered
 *    trademark of Exoffice Technologies.
 *
 * 5. Due credit should be given to the Exolab Project
 *    (http://www.exolab.org/).
 *
 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
 */
package org.exolab.jms.persistence;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.exolab.jms.client.JmsDestination;
import org.exolab.jms.client.JmsTopic;
import org.exolab.jms.messagemgr.PersistentMessageHandle;
import org.exolab.jms.messagemgr.MessageHandle;


/**
 * This class provides persistency for MessageHandle objects
 * in an RDBMS database
 *
 * @version     $Revision: 1.3 $ $Date: 2005/06/09 14:39:51 $
 * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
 */
class MessageHandles {

    /**
     * prepared statement for inserting a message handle
     */
    private static final String INSERT_MSG_HANDLE_STMT =
            "insert into message_handles (messageid, destinationid, consumerid, "
            + "priority, acceptedtime, sequencenumber, expirytime, delivered) "
            + "values (?,?,?,?,?,?,?,?)";

    /**
     * prepared statements for deleting message handle
     */
    private static final String DELETE_MSG_HANDLE_STMT1 =
        "delete from message_handles where messageId=? and consumerId=?";
    private static final String DELETE_MSG_HANDLE_STMT2 =
        "delete from message_handles where messageId=? and destinationId=? " +
        "and consumerId=?";

    /**
     * Delete all message handles with the specified message id
     */
    private static final String DELETE_MSG_HANDLES_STMT =
        "delete from message_handles where messageId=?";

    /**
     * Update a row in the message handles table
     */
    private static final String UPDATE_MSG_HANDLE_STMT =
        "update message_handles set delivered=? where messageId=? and " +
        "destinationId=? and consumerId=?";

    /**
     * Delete all message handles for a destination
     */
    private static final String DELETE_MSG_HANDLES_FOR_DEST =
        "delete from message_handles where destinationId=?";

    /**
     * Retrieve all message handles for a particular consumer
     */
    private static final String GET_MSG_HANDLES_FOR_DEST =
        "select messageid, destinationid, consumerid, priority, acceptedtime, "
        + "sequencenumber, expirytime, delivered from message_handles "
        + "where consumerId=? order by acceptedTime asc";

    /**
     * Retrieve a range of message handles between the specified times
     */
    private static final String GET_MESSAGE_HANDLES_IN_RANGE =
        "select distinct messageId from message_handles where " +
        " acceptedTime >= ? and acceptedTime <=?";

    /**
     * Retrieve a handle with the specified id
     */
    private static final String GET_MESSAGE_HANDLE_WITH_ID =
        "select distinct messageId from message_handles where messageId=?";

    /**
     * Return the number of messages and a specified destination and cousmer
     */
    private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
        "select count(messageId) from message_handles where destinationId=? " +
        "and consumerId=?";

    /**
     * Return the number of messages and a specified consumer
     */
    private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
        "select count(messageId) from message_handles where consumerId=?";

    /**
     * Delete all expired messages
     */
    private static final String DELETE_EXPIRED_MESSAGES =
        "delete from message_handles where consumerId=? and expiryTime != 0 " +
        "and expiryTime<?";

    /**
     * Singleton to this class
     */
    private static MessageHandles _instance;

    /**
     * Used to ensure that only one thread initialises the class
     */
    private static final Object _block = new Object();

    /**
     * The logger
     */
    private static final Log _log = LogFactory.getLog(MessageHandles.class);


    /**
     * Returns the singleton instance.
     *
     * Note that initialise() must have been invoked first for this
     * to return a valid instance.
     *
     * @return MessageHandles
     */
    public static MessageHandles instance() {
        return _instance;
    }

    /**
     * Constructor
     */
    protected MessageHandles() {
    }

    /**
     * Initialise the singleton _instance
     *
     * @return      MessageHandles
     */
    public static MessageHandles initialise() {
        if (_instance == null) {
            synchronized (_block) {
                if (_instance == null) {
                    _instance = new MessageHandles();
                }
            }
        }

        return _instance;
    }

    /**
     * Add the specified message handle to the database
     *
     * @param connection - the connection to use
     * @param handle - message handle to add
     * @throws PersistenceException - if add does not complete
     */
    public void addMessageHandle(Connection connection,
                                 MessageHandle handle)
        throws PersistenceException {

        if (_log.isDebugEnabled()) {
            _log.debug("addMessageHandle(handle=[consumer="
                       + handle.getConsumerPersistentId()
                       + ", destination=" + handle.getDestination() 
                       + ", id=" + handle.getMessageId() + "])");
        }

        PreparedStatement insert = null;
        try {
            // map the destination name to an actual identity
            long destinationId = Destinations.instance().getId(
                handle.getDestination().getName());
            if (destinationId == 0) {
                throw new PersistenceException(
                    "Cannot add message handle id=" + handle.getMessageId() +
                    " for destination=" + handle.getDestination().getName() +
                    " and consumer=" + handle.getConsumerPersistentId() +
                    " since the destination cannot be mapped to an id");
            }

            // map the consumer name ot an identity
            long consumerId = Consumers.instance().getConsumerId(
                handle.getConsumerPersistentId());
            if (consumerId == 0) {
                throw new PersistenceException(
                    "Cannot add message handle id=" + handle.getMessageId() +
                    " for destination=" + handle.getDestination().getName() +
                    " and consumer=" + handle.getConsumerPersistentId() +
                    " since the consumer cannot be mapped to an id");
            }

            insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
            insert.setString(1, handle.getMessageId());
            insert.setLong(2, destinationId);
            insert.setLong(3, consumerId);
            insert.setInt(4, handle.getPriority());
            insert.setLong(5, handle.getAcceptedTime());
            insert.setLong(6, handle.getSequenceNumber());
            insert.setLong(7, handle.getExpiryTime());
            insert.setInt(8, (handle.getDelivered()) ? 1 : 0);

            // execute the insert
            if (insert.executeUpdate() != 1) {
                _log.error(
                    "Failed to execute addMessageHandle for handle="
                    + handle.getMessageId() + ", destination Id="
                    + destinationId);
            }
        } catch (SQLException exception) {
            throw new PersistenceException("Failed to add message handle=" +
                handle, exception);
        } finally {
            SQLHelper.close(insert);
        }
    }

    /**
     * Remove the specified message handle from the database. Once the handle
     * has been removed check to see whether there are any more message handles
     * referencing the same message. If there are not then remove the
     * corresponding message from the messages tables.
     *
     * @param connection - the connection to use
     * @param  handle - the handle to remove
     * @throws  PersistenceException - sql releated exception
     */
    public void removeMessageHandle(Connection connection,
                                    MessageHandle handle)
        throws PersistenceException {

        if (_log.isDebugEnabled()) {
            _log.debug("removeMessageHandle(handle=[consumer="
                       + handle.getConsumerPersistentId()
                       + ", destination=" + handle.getDestination() 
                       + ", id=" + handle.getMessageId() + "])");
        }

        PreparedStatement delete = null;
        PreparedStatement select = null;
        ResultSet rs = null;

        try {
            // first check to see that the consumer exists and only
            // proceed if it non-zero.
            long consumerId = Consumers.instance().getConsumerId(
                handle.getConsumerPersistentId());
            if (consumerId != 0) {
                // get the message id
                String id = handle.getMessageId();

                // map the destination name to an actual identity. If it is
                // null then the destination does not currently exist but we
                // may need to delete orphaned handles
                long destinationId = Destinations.instance().getId(
                    handle.getDestination().getName());

                if (destinationId == 0) {
                    delete = connection.prepareStatement(
                        DELETE_MSG_HANDLE_STMT1);
                    delete.setString(1, id);
                    delete.setLong(2, consumerId);

                } else {
                    delete = connection.prepareStatement(
                        DELETE_MSG_HANDLE_STMT2);
                    delete.setString(1, id);
                    delete.setLong(2, destinationId);
                    delete.setLong(3, consumerId);
                }

                // execute the delete
                if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
                    // only log if the message hasn't been garbage
                    // collected
                    _log.error("Failed to execute removeMessageHandle for "
                        + "handle=" + id + " destination id="
                        + destinationId + " consumer id=" + consumerId);
                }
            }
        } catch (SQLException exception) {
            throw new PersistenceException("Failed to remove message handle=" +
                handle, exception);
        } finally {
            SQLHelper.close(rs);
            SQLHelper.close(delete);
            SQLHelper.close(select);
        }
    }

    /**
     * Update the specified message handle from the database
     *
     * @param connection - the connection to use
     * @param  handle - the handle to update
     * @throws  PersistenceException - sql releated exception
     */
    public void updateMessageHandle(Connection connection,
                                    MessageHandle handle)
        throws PersistenceException {
        PreparedStatement update = null;

        if (_log.isDebugEnabled()) {
            _log.debug("updateMessageHandle(handle=[consumer="
                       + handle.getConsumerPersistentId()
                       + ", destination=" + handle.getDestination() 
                       + ", id=" + handle.getMessageId() + "])");
        }

        try {
            // get the message id
            String id = handle.getMessageId();

            // map the destination name to an actual identity
            long destinationId = Destinations.instance().getId(
                handle.getDestination().getName());
            if (destinationId == 0) {
                throw new PersistenceException(
                    "Cannot update message handle id=" +

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -