⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messages.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                exception);
        } finally {
            SQLHelper.close(delete);
        }

        return result;
    }

    /**
     * Retrieve the next set of messages for the specified destination with
     * an acceptance time greater or equal to that specified. It will retrieve
     * around 200 or so messages depending on what is available.
     *
     * @param connection - execute on this connection
     * @param destination - the destination
     * @param priority - the priority of the messages
     * @param time - with timestamp greater or equal to this
     * @return Vector - one or more MessageImpl objects
     * @throws PersistenceException - if an SQL error occurs
     */
    public Vector getMessages(Connection connection, String destination,
                              int priority, long time)
        throws PersistenceException {

        PreparedStatement select = null;
        ResultSet set = null;
        Vector messages = new Vector();

        try {
            JmsDestination dest = Destinations.instance().get(destination);
            if (dest == null) {
                throw new PersistenceException(
                    "Cannot getMessages for destination=" + destination
                    + ": destination does not exist");
            }

            long destinationId = Destinations.instance().getId(destination);
            if (destinationId == 0) {
                throw new PersistenceException(
                    "Cannot getMessages for destination=" + destination
                    + ": destination does not exist");
            }

            if ((dest instanceof JmsTopic) &&
                (((JmsTopic) dest).isWildCard())) {
                // if the destination is a wildcard then we can't only select
                // on timestamp. This will fault in any message greater than
                // or equal to the specified timestamp.
                select = connection.prepareStatement(
                    "select * from messages where priority=? and createTime>=? order by createTime asc");
                select.setInt(1, priority);
                select.setLong(2, time);
            } else {
                // if the destination is more specific then we can execute a
                // more specialized query and fault in other messages for
                // the same destination.
                select = connection.prepareStatement(
                    "select * from messages where destinationId=? and priority=? and createTime>=? order by createTime asc");
                select.setLong(1, destinationId);
                select.setInt(2, priority);
                select.setLong(3, time);
            }
            set = select.executeQuery();

            // now iterate through the result set
            int count = 0;
            long lastTimeStamp = time;
            while (set.next()) {
                MessageImpl m = deserialize(set.getBytes("messageBlob"));
                m.setProcessed((set.getInt("processed") == 1 ? true : false));
                messages.add(m);
                if (++count > 200) {
                    // if there are more than two hundred rows then exist
                    // the loop after 200 messages have been retrieved
                    // and the timestamp has changed.
                    if (set.getLong("createTime") > lastTimeStamp) {
                        break;
                    }
                } else {
                    lastTimeStamp = set.getLong("createTime");
                }
            }
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to retrieve messages", exception);
        } finally {
            SQLHelper.close(set);
            SQLHelper.close(select);
        }

        return messages;
    }

    /**
     * Retrieve the specified number of message ids from the database with a
     * time greater than that specified. The number of items to retrieve
     * is only a hint and does not reflect the number of messages actually
     * returned.
     *
     * @param connection - execute on this connection
     * @param time - with timestamp greater than
     * @param hint - an indication of the number of messages to return.
     * @return a map of messageId Strings to their creation time
     * @throws PersistenceException - if an SQL error occurs
     */
    public HashMap getMessageIds(Connection connection, long time, int hint)
        throws PersistenceException {

        PreparedStatement select = null;
        ResultSet set = null;
        HashMap messages = new HashMap();

        try {
            select = connection.prepareStatement(
                "select messageId,createTime from messages where createTime>? order by createTime asc");
            select.setLong(1, time);
            set = select.executeQuery();

            // now iterate through the result set
            int count = 0;
            long lastTimeStamp = time;
            while (set.next()) {
                messages.put(set.getString("messageId"),
                    new Long(set.getLong("createTime")));
                if (++count > hint) {
                    if (set.getLong("createTime") > lastTimeStamp) {
                        break;
                    }
                } else {
                    lastTimeStamp = set.getLong("createTime");
                }

            }
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to retrieve message identifiers", exception);
        } finally {
            SQLHelper.close(set);
            SQLHelper.close(select);
        }

        return messages;
    }

    /**
     * Retrieve a list of unprocessed messages and return them to the client.
     * An unprocessed message has been accepted by the system but not
     * processed.
     *
     * @param connection - execute on this connection
     * @return Vector - one or more MessageImpl objects
     * @throws PersistenceException - if an SQL error occurs
     */
    public Vector getUnprocessedMessages(Connection connection)
        throws PersistenceException {

        PreparedStatement select = null;
        ResultSet set = null;
        Vector messages = new Vector();

        try {
            select = connection.prepareStatement(
                "select * from messages where processed=0");
            set = select.executeQuery();
            // now iterate through the result set
            while (set.next()) {
                MessageImpl m = deserialize(set.getBytes("messageBlob"));
                m.setProcessed(false);
                messages.add(m);
            }
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to retrieve unprocessed messages", exception);
        } finally {
            SQLHelper.close(set);
            SQLHelper.close(select);
        }

        return messages;
    }

    /**
     * Retrieve the message handle for all unexpired messages
     *
     * @param connection - execute on this connection
     * @param destination - the destination in question
     * @return Vector - collection of PersistentMessageHandle objects
     * @throws  PersistenceException - sql releated exception
     */
    public Vector getNonExpiredMessages(Connection connection,
                                        JmsDestination destination)
        throws PersistenceException {

        Vector result = new Vector();
        PreparedStatement select = null;
        ResultSet set = null;

        try {
            long destinationId = Destinations.instance().getId(
                destination.getName());

            if (destinationId == 0) {
                throw new PersistenceException(
                    "Cannot getMessages for destination=" + destination
                    + ": destination does not exist");
            }

            select = connection.prepareStatement(
                "select messageId,destinationId,priority,createTime,expiryTime" +
                " from messages where expiryTime>0 and destinationId=? order by expiryTime asc");
            select.setLong(1, destinationId);
            set = select.executeQuery();

            while (set.next()) {
                PersistentMessageHandle handle = new PersistentMessageHandle();
                handle.setMessageId(new MessageId(set.getString(1)));
                handle.setDestination(destination);
                handle.setPriority(set.getInt(3));
                handle.setAcceptedTime(set.getLong(4));
                handle.setExpiryTime(set.getLong(5));
                result.add(handle);
            }
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to retrieve non-expired messages", exception);
        } finally {
            SQLHelper.close(set);
            SQLHelper.close(select);
        }

        return result;
    }

    /**
     * Delete all expired messages and associated message handles.
     *
     * @param connection - execute on this connection
     * @throws PersistenceException - if an SQL error occurs
     */
    public void removeExpiredMessages(Connection connection)
        throws PersistenceException {

        PreparedStatement delete = null;
        try {
            long time = System.currentTimeMillis();

            // delete from the messages
            delete = connection.prepareStatement(
                "delete from messages where expiryTime > 0 and expiryTime < ?");
            delete.setLong(1, time);
            delete.executeUpdate();
            delete.close();

            // delete the message handles
            delete = connection.prepareStatement(
                "delete from message_handles where expiryTime > 0 and expiryTime < ?");
            delete.setLong(1, time);
            delete.executeUpdate();
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to remove expired messages", exception);
        } finally {
            SQLHelper.close(delete);
        }
    }

    /**
     * Reset the instance. We need to deprecate this method since this
     * class does not contain state information
     */
    public void close() {
        _instance = null;
    }

    /**
     * Default constructor does nothing at the moment.
     */
    protected Messages() {
    }

    /**
     * Get the message as a serialized blob
     *
     * @param       message             the message to serialize
     * @return      byte[]              the serialized message
     */
    public byte[] serialize(MessageImpl message)
        throws PersistenceException {

        byte[] result = null;
        try {
            ByteArrayOutputStream bstream = new ByteArrayOutputStream();
            ObjectOutputStream ostream = new ObjectOutputStream(bstream);
            ostream.writeObject(message);
            ostream.close();
            result = bstream.toByteArray();
        } catch (Exception exception) {
            throw new PersistenceException("Failed to serialize message",
                exception);
        }

        return result;
    }

    /**
     * Set the message from a serialized blob
     *
     * @param blob the serialized message
     * @return the re-constructed message
     */
    public MessageImpl deserialize(byte[] blob) throws PersistenceException {
        MessageImpl message = null;

        if (blob != null) {
            try {
                ByteArrayInputStream bstream = new ByteArrayInputStream(blob);
                ObjectInputStream istream = new ObjectInputStream(bstream);
                message = (MessageImpl) istream.readObject();
                istream.close();
            } catch (Exception exception) {
                throw new PersistenceException(
                    "Failed to de-serialize message", exception);
            }
        } else {
            throw new PersistenceException(
                "Cannot de-serialize null message blob");
        }

        return message;
    }

} //-- Messages

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -