messagehandles.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 696 行 · 第 1/2 页
JAVA
696 行
long consumerId = _consumers.getConsumerId( handle.getConsumerPersistentId()); if (consumerId == 0) { throw new PersistenceException( "Cannot update message handle id=" + handle.getMessageId() + " for destination=" + handle.getDestination().getName() + " and consumer=" + handle.getConsumerPersistentId() + " since the consumer cannot be mapped to an id"); } update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT); update.setInt(1, handle.getDelivered() ? 1 : 0); update.setString(2, id); update.setLong(3, destinationId); update.setLong(4, consumerId); // execute the delete if (update.executeUpdate() != 1 && !handle.hasExpired()) { // only log if the message hasn't been garbage collected _log.error( "Failed to execute updateMessageHandle for handle=" + id + ", destination id=" + destinationId + ", consumer id=" + consumerId); } } catch (SQLException exception) { throw new PersistenceException("Failed to update message handle=" + handle, exception); } finally { SQLHelper.close(update); } } /** * Remove all the message handles associated with the specified destination * * @param connection - the connection to use * @param destination the name of the destination * @throws PersistenceException - sql releated exception */ public void removeMessageHandles(Connection connection, String destination) throws PersistenceException { PreparedStatement delete = null; try { // map the destination name to an actual identity long destinationId = _destinations.getId(destination); if (destinationId == 0) { throw new PersistenceException( "Cannot remove message handles for destination=" + destination + " since the destination cannot be " + "mapped to an id"); } delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST); delete.setLong(1, destinationId); delete.executeUpdate(); } catch (SQLException exception) { throw new PersistenceException( "Failed to remove message handles for destination=" + destination, exception); } finally { SQLHelper.close(delete); } } /** * Remove all the message handles for the specified messageid * * @param connection - the connection to use * @param messageId the message identity * @throws PersistenceException - sql releated exception */ public void removeMessageHandles(Connection connection, long messageId) throws PersistenceException { PreparedStatement delete = null; try { delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT); delete.setLong(1, messageId); delete.executeUpdate(); } catch (SQLException exception) { throw new PersistenceException( "Failed to remove message handles for message id=" + messageId, exception); } finally { SQLHelper.close(delete); } } /** * Retrieve the message handle for the specified desitation and consumer * name * * @param connection - the connection to use * @param destination - destination name * @param name - consumer name * @return Vector - collection of MessageHandle objects * @throws PersistenceException - sql releated exception */ public Vector getMessageHandles(Connection connection, String destination, String name) throws PersistenceException { Vector result = new Vector(); PreparedStatement select = null; ResultSet set = null; // if the consumer and/or destination cannot be mapped then // return an empty vector long destinationId = _destinations.getId(destination); long consumerId = _consumers.getConsumerId(name); if ((consumerId == 0) || (destinationId == 0)) { return result; } // all preprequisites have been met so continue processing the // request. try { select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST); select.setLong(1, consumerId); // iterate through the result set and construct the corresponding // MessageHandles set = select.executeQuery(); while (set.next()) { // Attempt to retrieve the corresponding destination JmsDestination dest = _destinations.get(set.getLong(2)); if (dest == null) { throw new PersistenceException( "Cannot create persistent handle, because " + "destination mapping failed for " + set.getLong(2)); } String consumer = _consumers.getConsumerName(set.getLong(3)); if (name == null) { throw new PersistenceException( "Cannot create persistent handle because " + "consumer mapping failed for " + set.getLong(3)); } String messageId = set.getString(1); int priority = set.getInt(4); long acceptedTime = set.getLong(5); long sequenceNumber = set.getLong(6); long expiryTime = set.getLong(7); boolean delivered = (set.getInt(8) == 0) ? false : true; MessageHandle handle = new PersistentMessageHandle( messageId, priority, acceptedTime, sequenceNumber, expiryTime, dest, consumer); handle.setDelivered(delivered); result.add(handle); } } catch (SQLException exception) { throw new PersistenceException( "Failed to get message handles for destination=" + destination + ", consumer=" + name, exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return result; } /** * Retrieve a distint list of message ids, in this table, between the min * and max times inclusive. * * @param connection - the connection to use * @param min - the minimum time in milliseconds * @param max - the maximum time in milliseconds * @return Vector - collection of String objects * @throws PersistenceException - sql related exception */ public Vector getMessageIds(Connection connection, long min, long max) throws PersistenceException { Vector result = new Vector(); PreparedStatement select = null; ResultSet set = null; try { select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE); select.setLong(1, min); select.setLong(2, max); // iterate through the result set and construct the corresponding // MessageHandles set = select.executeQuery(); while (set.next()) { result.add(set.getString(1)); } } catch (SQLException exception) { throw new PersistenceException("Failed to retrieve message ids", exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return result; } /** * Check if a message with the specified messageId exists in the * table * * @param connection - the connection to use * @param messageId the message Identifier * @return Vector - collection of MessageHandle objects * @throws PersistenceException - sql releated exception */ public boolean messageExists(Connection connection, long messageId) throws PersistenceException { boolean result = false; PreparedStatement select = null; ResultSet set = null; try { select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID); select.setLong(1, messageId); set = select.executeQuery(); if (set.next()) { result = true; } } catch (SQLException exception) { throw new PersistenceException( "Failed to determine if message exists, id=" + messageId, exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return result; } /** * Returns the number of messages for the specified destination and * consumer * * @param connection - the connection to use * @param destination - destination name * @param name - consumer name * @return Vector - collection of MessageHandle objects * @throws PersistenceException - sql releated exception */ public int getMessageCount(Connection connection, String destination, String name) throws PersistenceException { int result = -1; boolean destinationIsWildCard = false; // map the destination name to an actual identity long destinationId = _destinations.getId(destination); if (destinationId == 0) { if (JmsTopic.isWildCard(destination)) { destinationIsWildCard = true; } else { throw new PersistenceException( "Cannot get message handle count for destination=" + destination + " and consumer=" + name + " since the destination cannot be mapped to an id"); } } // map the consumer name to an identity long consumerId = _consumers.getConsumerId(name); if (consumerId == 0) { throw new PersistenceException( "Cannot get message handle count for destination=" + destination + " and consumer=" + name + " since the consumer cannot be mapped to an id"); } PreparedStatement select = null; ResultSet set = null; try { if (destinationIsWildCard) { select = connection.prepareStatement( GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER); select.setLong(1, destinationId); select.setLong(2, consumerId); } else { select = connection.prepareStatement( GET_MSG_HANDLE_COUNT_FOR_CONSUMER); select.setLong(1, consumerId); } set = select.executeQuery(); if (set.next()) { result = set.getInt(1); } } catch (SQLException exception) { throw new PersistenceException( "Failed to count messages for destination=" + destination + ", consumer=" + name, exception); } finally { SQLHelper.close(set); SQLHelper.close(select); } return result; } /** * Remove all expired handles for the specified consumer * * @param connection - the connection to use * @param consumer - consumer name * @throws PersistenceException - sql releated exception */ public void removeExpiredMessageHandles(Connection connection, String consumer) throws PersistenceException { PreparedStatement delete = null; // map the consumer name ot an identity long consumerId = _consumers.getConsumerId(consumer); if (consumerId != 0) { try { delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES); delete.setLong(1, consumerId); delete.setLong(2, System.currentTimeMillis()); delete.executeUpdate(); } catch (SQLException exception) { throw new PersistenceException( "Failed to remove expired message handles", exception); } finally { SQLHelper.close(delete); } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?