messages.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 624 行 · 第 1/2 页

JAVA
624
字号
     */    public Vector getMessages(Connection connection, String destination,                              int priority, long time)        throws PersistenceException {        PreparedStatement select = null;        ResultSet set = null;        Vector messages = new Vector();        try {            JmsDestination dest = _destinations.get(destination);            if (dest == null) {                throw new PersistenceException(                    "Cannot getMessages for destination=" + destination                    + ": destination does not exist");            }            long destinationId = _destinations.getId(destination);            if (destinationId == 0) {                throw new PersistenceException(                    "Cannot getMessages for destination=" + destination                    + ": destination does not exist");            }            if ((dest instanceof JmsTopic) &&                (((JmsTopic) dest).isWildCard())) {                // if the destination is a wildcard then we can't only select                // on timestamp. This will fault in any message greater than                // or equal to the specified timestamp.                select = connection.prepareStatement(                    "select createtime,processed,messageblob from messages "                    + "where priority=? and createTime>=? "                    + "order by createTime asc");                select.setInt(1, priority);                select.setLong(2, time);            } else {                // if the destination is more specific then we can execute a                // more specialized query and fault in other messages for                // the same destination.                select = connection.prepareStatement(                    "select createtime,processed,messageblob from messages "                    + "where destinationId=? and priority=? and createTime>=? "                    + "order by createTime asc");                select.setLong(1, destinationId);                select.setInt(2, priority);                select.setLong(3, time);            }            set = select.executeQuery();            // now iterate through the result set            int count = 0;            long lastTimeStamp = time;            while (set.next()) {                MessageImpl m = deserialize(set.getBytes(3));                m.setProcessed((set.getInt(2) == 1 ? true : false));                messages.add(m);                if (++count > 200) {                    // if there are more than two hundred rows then exist                    // the loop after 200 messages have been retrieved                    // and the timestamp has changed.                    if (set.getLong(1) > lastTimeStamp) {                        break;                    }                } else {                    lastTimeStamp = set.getLong(1);                }            }        } catch (SQLException exception) {            throw new PersistenceException(                "Failed to retrieve messages", exception);        } finally {            SQLHelper.close(set);            SQLHelper.close(select);        }        return messages;    }    /**     * Retrieve the specified number of message ids from the database with a     * time greater than that specified. The number of items to retrieve     * is only a hint and does not reflect the number of messages actually     * returned.     *     * @param connection - execute on this connection     * @param time - with timestamp greater than     * @param hint - an indication of the number of messages to return.     * @return a map of messageId Strings to their creation time     * @throws PersistenceException - if an SQL error occurs     */    public HashMap getMessageIds(Connection connection, long time, int hint)        throws PersistenceException {        PreparedStatement select = null;        ResultSet set = null;        HashMap messages = new HashMap();        try {            select = connection.prepareStatement(                "select messageId,createTime from messages where createTime>? "                + "order by createTime asc");            select.setLong(1, time);            set = select.executeQuery();            // now iterate through the result set            int count = 0;            long lastTimeStamp = time;            while (set.next()) {                messages.put(set.getString(1), new Long(set.getLong(2)));                if (++count > hint) {                    if (set.getLong(2) > lastTimeStamp) {                        break;                    }                } else {                    lastTimeStamp = set.getLong("createTime");                }            }        } catch (SQLException exception) {            throw new PersistenceException(                "Failed to retrieve message identifiers", exception);        } finally {            SQLHelper.close(set);            SQLHelper.close(select);        }        return messages;    }    /**     * Retrieve a list of unprocessed messages and return them to the client.     * An unprocessed message has been accepted by the system but not     * processed.     *     * @param connection - execute on this connection     * @return Vector - one or more MessageImpl objects     * @throws PersistenceException - if an SQL error occurs     */    public Vector getUnprocessedMessages(Connection connection)        throws PersistenceException {        PreparedStatement select = null;        ResultSet set = null;        Vector messages = new Vector();        try {            select = connection.prepareStatement(                "select messageblob from messages where processed=0");            set = select.executeQuery();            // now iterate through the result set            while (set.next()) {                MessageImpl m = deserialize(set.getBytes(1));                m.setProcessed(false);                messages.add(m);            }        } catch (SQLException exception) {            throw new PersistenceException(                "Failed to retrieve unprocessed messages", exception);        } finally {            SQLHelper.close(set);            SQLHelper.close(select);        }        return messages;    }    /**     * Retrieve the message handle for all unexpired messages     *     * @param connection - execute on this connection     * @param destination - the destination in question     * @return Vector - collection of PersistentMessageHandle objects     * @throws  PersistenceException - sql releated exception     */    public Vector getNonExpiredMessages(Connection connection,                                        JmsDestination destination)        throws PersistenceException {        Vector result = new Vector();        PreparedStatement select = null;        ResultSet set = null;        try {            long destinationId = _destinations.getId(destination.getName());            if (destinationId == 0) {                throw new PersistenceException(                    "Cannot getMessages for destination=" + destination                    + ": destination does not exist");            }            select = connection.prepareStatement(                "select messageId,destinationId,priority,createTime,"                + "sequenceNumber,expiryTime "                + "from messages "                + "where expiryTime>0 and destinationId=? "                + "order by expiryTime asc");            select.setLong(1, destinationId);            set = select.executeQuery();            while (set.next()) {                String messageId = set.getString(1);                int priority = set.getInt(3);                long acceptedTime = set.getLong(4);                long sequenceNumber = set.getLong(5);                long expiryTime = set.getLong(6);                PersistentMessageHandle handle = new PersistentMessageHandle(                        messageId, priority, acceptedTime, sequenceNumber,                        expiryTime, destination);                result.add(handle);            }        } catch (SQLException exception) {            throw new PersistenceException(                "Failed to retrieve non-expired messages", exception);        } finally {            SQLHelper.close(set);            SQLHelper.close(select);        }        return result;    }    /**     * Delete all expired messages and associated message handles.     *     * @param connection - execute on this connection     * @throws PersistenceException - if an SQL error occurs     */    public void removeExpiredMessages(Connection connection)        throws PersistenceException {        PreparedStatement delete = null;        try {            long time = System.currentTimeMillis();            // delete from the messages            delete = connection.prepareStatement(                "delete from messages where expiryTime > 0 and expiryTime < ?");            delete.setLong(1, time);            delete.executeUpdate();            delete.close();            // delete the message handles            delete = connection.prepareStatement(                "delete from message_handles where expiryTime > 0 and expiryTime < ?");            delete.setLong(1, time);            delete.executeUpdate();        } catch (SQLException exception) {            throw new PersistenceException(                "Failed to remove expired messages", exception);        } finally {            SQLHelper.close(delete);        }    }    /**     * Get the message as a serialized blob     *     * @param       message             the message to serialize     * @return      byte[]              the serialized message     */    public byte[] serialize(MessageImpl message)        throws PersistenceException {        byte[] result = null;        ObjectOutputStream ostream = null;        try {            ByteArrayOutputStream bstream = new ByteArrayOutputStream();            ostream = new ObjectOutputStream(bstream);            ostream.writeObject(message);            result = bstream.toByteArray();        } catch (Exception exception) {            throw new PersistenceException("Failed to serialize message",                exception);        } finally {            SQLHelper.close(ostream);        }        return result;    }    /**     * Set the message from a serialized blob     *     * @param blob the serialized message     * @return the re-constructed message     */    public MessageImpl deserialize(byte[] blob) throws PersistenceException {        MessageImpl message = null;        if (blob != null) {            ObjectInputStream istream = null;            try {                ByteArrayInputStream bstream = new ByteArrayInputStream(blob);                istream = new ObjectInputStream(bstream);                message = (MessageImpl) istream.readObject();            } catch (Exception exception) {                throw new PersistenceException(                    "Failed to de-serialize message", exception);            } finally {                SQLHelper.close(istream);            }        } else {            throw new PersistenceException(                "Cannot de-serialize null message blob");        }        return message;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?