messages.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 624 行 · 第 1/2 页
JAVA
624 行
*/ public Vector getMessages(Connection connection, String destination, int priority, long time) throws PersistenceException { PreparedStatement select = null; ResultSet set = null; Vector messages = new Vector(); try { JmsDestination dest = _destinations.get(destination); if (dest == null) { throw new PersistenceException( "Cannot getMessages for destination=" + destination + ": destination does not exist"); } long destinationId = _destinations.getId(destination); if (destinationId == 0) { throw new PersistenceException( "Cannot getMessages for destination=" + destination + ": destination does not exist"); } if ((dest instanceof JmsTopic) && (((JmsTopic) dest).isWildCard())) { // if the destination is a wildcard then we can't only select // on timestamp. This will fault in any message greater than // or equal to the specified timestamp. select = connection.prepareStatement( "select createtime,processed,messageblob from messages " + "where priority=? and createTime>=? " + "order by createTime asc"); select.setInt(1, priority); select.setLong(2, time); } else { // if the destination is more specific then we can execute a // more specialized query and fault in other messages for // the same destination. select = connection.prepareStatement( "select createtime,processed,messageblob from messages " + "where destinationId=? and priority=? and createTime>=? " + "order by createTime asc"); select.setLong(1, destinationId); select.setInt(2, priority); select.setLong(3, time); } set = select.executeQuery(); // now iterate through the result set int count = 0; long lastTimeStamp = time; while (set.next()) { MessageImpl m = deserialize(set.getBytes(3)); m.setProcessed((set.getInt(2) == 1 ? true : false)); messages.add(m); if (++count > 200) { // if there are more than two hundred rows then exist // the loop after 200 messages have been retrieved // and the timestamp has changed. if (set.getLong(1) > lastTimeStamp) { break; } } else { lastTimeStamp = set.getLong(1); } } } catch (SQLException exception) { throw new PersistenceException( "Failed to retrieve messages", exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return messages; } /** * Retrieve the specified number of message ids from the database with a * time greater than that specified. The number of items to retrieve * is only a hint and does not reflect the number of messages actually * returned. * * @param connection - execute on this connection * @param time - with timestamp greater than * @param hint - an indication of the number of messages to return. * @return a map of messageId Strings to their creation time * @throws PersistenceException - if an SQL error occurs */ public HashMap getMessageIds(Connection connection, long time, int hint) throws PersistenceException { PreparedStatement select = null; ResultSet set = null; HashMap messages = new HashMap(); try { select = connection.prepareStatement( "select messageId,createTime from messages where createTime>? " + "order by createTime asc"); select.setLong(1, time); set = select.executeQuery(); // now iterate through the result set int count = 0; long lastTimeStamp = time; while (set.next()) { messages.put(set.getString(1), new Long(set.getLong(2))); if (++count > hint) { if (set.getLong(2) > lastTimeStamp) { break; } } else { lastTimeStamp = set.getLong("createTime"); } } } catch (SQLException exception) { throw new PersistenceException( "Failed to retrieve message identifiers", exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return messages; } /** * Retrieve a list of unprocessed messages and return them to the client. * An unprocessed message has been accepted by the system but not * processed. * * @param connection - execute on this connection * @return Vector - one or more MessageImpl objects * @throws PersistenceException - if an SQL error occurs */ public Vector getUnprocessedMessages(Connection connection) throws PersistenceException { PreparedStatement select = null; ResultSet set = null; Vector messages = new Vector(); try { select = connection.prepareStatement( "select messageblob from messages where processed=0"); set = select.executeQuery(); // now iterate through the result set while (set.next()) { MessageImpl m = deserialize(set.getBytes(1)); m.setProcessed(false); messages.add(m); } } catch (SQLException exception) { throw new PersistenceException( "Failed to retrieve unprocessed messages", exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return messages; } /** * Retrieve the message handle for all unexpired messages * * @param connection - execute on this connection * @param destination - the destination in question * @return Vector - collection of PersistentMessageHandle objects * @throws PersistenceException - sql releated exception */ public Vector getNonExpiredMessages(Connection connection, JmsDestination destination) throws PersistenceException { Vector result = new Vector(); PreparedStatement select = null; ResultSet set = null; try { long destinationId = _destinations.getId(destination.getName()); if (destinationId == 0) { throw new PersistenceException( "Cannot getMessages for destination=" + destination + ": destination does not exist"); } select = connection.prepareStatement( "select messageId,destinationId,priority,createTime," + "sequenceNumber,expiryTime " + "from messages " + "where expiryTime>0 and destinationId=? " + "order by expiryTime asc"); select.setLong(1, destinationId); set = select.executeQuery(); while (set.next()) { String messageId = set.getString(1); int priority = set.getInt(3); long acceptedTime = set.getLong(4); long sequenceNumber = set.getLong(5); long expiryTime = set.getLong(6); PersistentMessageHandle handle = new PersistentMessageHandle( messageId, priority, acceptedTime, sequenceNumber, expiryTime, destination); result.add(handle); } } catch (SQLException exception) { throw new PersistenceException( "Failed to retrieve non-expired messages", exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return result; } /** * Delete all expired messages and associated message handles. * * @param connection - execute on this connection * @throws PersistenceException - if an SQL error occurs */ public void removeExpiredMessages(Connection connection) throws PersistenceException { PreparedStatement delete = null; try { long time = System.currentTimeMillis(); // delete from the messages delete = connection.prepareStatement( "delete from messages where expiryTime > 0 and expiryTime < ?"); delete.setLong(1, time); delete.executeUpdate(); delete.close(); // delete the message handles delete = connection.prepareStatement( "delete from message_handles where expiryTime > 0 and expiryTime < ?"); delete.setLong(1, time); delete.executeUpdate(); } catch (SQLException exception) { throw new PersistenceException( "Failed to remove expired messages", exception); } finally { SQLHelper.close(delete); } } /** * Get the message as a serialized blob * * @param message the message to serialize * @return byte[] the serialized message */ public byte[] serialize(MessageImpl message) throws PersistenceException { byte[] result = null; ObjectOutputStream ostream = null; try { ByteArrayOutputStream bstream = new ByteArrayOutputStream(); ostream = new ObjectOutputStream(bstream); ostream.writeObject(message); result = bstream.toByteArray(); } catch (Exception exception) { throw new PersistenceException("Failed to serialize message", exception); } finally { SQLHelper.close(ostream); } return result; } /** * Set the message from a serialized blob * * @param blob the serialized message * @return the re-constructed message */ public MessageImpl deserialize(byte[] blob) throws PersistenceException { MessageImpl message = null; if (blob != null) { ObjectInputStream istream = null; try { ByteArrayInputStream bstream = new ByteArrayInputStream(blob); istream = new ObjectInputStream(bstream); message = (MessageImpl) istream.readObject(); } catch (Exception exception) { throw new PersistenceException( "Failed to de-serialize message", exception); } finally { SQLHelper.close(istream); } } else { throw new PersistenceException( "Cannot de-serialize null message blob"); } return message; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?