📄 jmsserversession.java
字号:
// Retrieve the destination from the destination manager and use
// it to create the consumer
ConsumerEndpoint consumer =
ConsumerManager.instance().createQueueBrowserEndpoint(this,
clientId, queue, selector);
// if the session is stopped then we should also stop the
// consumer, so that it doesn't deliver messages and then
// cache it for future reference.
consumer.setStopped(_stopped);
_consumers.put(Long.toString(clientId), consumer);
}
/**
* Delete the receiver with the specified identity and clean up all
* associated resources.
*
* @param clientId the identity of the receiver
* @throws JMSException if the receiver cannot be deleted
*/
public void deleteReceiver(long clientId) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("deleteReceiver(clientId=" + clientId + ") [sessionId="
+ _sessionId + "]");
}
ConsumerEndpoint consumer =
(ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
if (consumer == null) {
throw new JMSException("No receiver with id " + clientId);
}
// destroy the consumer endpoint
ConsumerManager.instance().deleteConsumerEndpoint(consumer);
}
/**
* Delete the sender associated with the specified queue from the session
* If the corresponding sender does not exist or it cannot delete it then
* throw the JMSException.
*
* @param clientId the identity of the sender
* @throws JMSException if the sender cannot be deleted
*/
public void deleteSender(long clientId) throws JMSException {
// no-op
}
/**
* Delete the queue browser associated with the specified queue from
* the session.
*
* @param clientId the identity of the browser
* @throws JMSException if the browser cannot be deleted
*/
public void deleteBrowser(long clientId) throws JMSException {
ConsumerEndpoint consumer =
(ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
if (consumer == null) {
throw new JMSException("No browser with id " + clientId);
}
// destroy the consumer endpoint
ConsumerManager.instance().deleteConsumerEndpoint(consumer);
}
/**
* Create a subscriber endpoint for this session. A subscriber is a message
* consumer specific to the topic message model. The subscriber is
* associated with a topic. Register the consumer with the message
* manager so that a queue can be set up for it. Finally add the consumer
* to the list of consumers managed by this session.
* <p>
* Note that the message manager manages consumers for all server sessions
* <p>
* You cannot create more than one subscriber for the same destination.
* Currently we don't check this
*
* @param topic subscriber destination
* @param name consumer name
* @param clientId the client session allocated
* identifier of the consumer
* @param selector the selector to filter messages.
* This may be null.
* @param noLocal true to inhibit consumption of messages
* published on this connection.
* @throws JMSException.
*/
public void createSubscriber(JmsTopic topic, String name, long clientId,
String selector, boolean noLocal)
throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("createSubscriber(topic=" + topic + ", name=" + name
+ ", clientId=" + clientId + ", selector=" + selector
+ ", noLocal=" + noLocal + ") [sessionId="
+ _sessionId + "]");
}
// check to ensure that the methods preconditions have been met
if (topic == null) {
throw new JMSException("Cannot create subscriber for null topic");
}
// Retrieve the destination from the destination manager and
// use it to create the consumer through the consumer manager.
ConsumerEndpoint consumer = null;
if (name != null) {
if (name.length() > 0) {
// for a durable consumer the topic must be
ConsumerManager manager = ConsumerManager.instance();
if (manager.durableConsumerExists(name)) {
// if the durable consumer exists then validate that
// it was the specified topic that it was registered
// under. If it is not registered for the topic then
// we must delete the existing entry and recreate it
// against the new topic
if (!manager.validSubscription(topic.getName(), name)) {
unsubscribe(name);
manager.createDurableConsumer(topic, name);
}
} else {
// if the durable consumer does not exist then create
// it
manager.createDurableConsumer(topic, name);
}
// if a durable subscriber with the specified name is
// alreayd active then this method will throw an
// exception.
// attempt to create a durable consuinmer
consumer = manager.createDurableConsumerEndpoint(this,
topic, name, clientId, selector);
consumer.setConnectionId(_connection.hashCode());
consumer.setTransacted(_transacted);
consumer.setAckMode(_ackMode);
consumer.setNoLocal(noLocal);
} else {
throw new JMSException("Name in createSubscriber was null");
}
} else {
// Create a non-durable subscriber for the specified destination
// and using the required selector.
consumer = ConsumerManager.instance().createConsumerEndpoint(this,
clientId, topic, selector);
consumer.setConnectionId(_connection.hashCode());
consumer.setTransacted(_transacted);
consumer.setAckMode(_ackMode);
consumer.setNoLocal(noLocal);
}
// once the consumer has been created then set it to the same state
// as the session and add it to the list on consumers to manage
consumer.setStopped(_stopped);
_consumers.put(Long.toString(clientId), consumer);
}
/**
* This should be a no operation. Do we need to maintain state information
* that a publisher has been created.
*
* @param topic receiver destination
* @throws JMSException.
*/
public void createPublisher(JmsTopic topic)
throws JMSException {
}
/**
* This function deletes a persistent subsrciber and its history from
* the database. It his subscriber re-connects it get everything available
* for the queue topic. If the subscriber is reliable, this is a no op.
* See UnregisterSubscriber below for just unregistering the subscriber
* but leaving its persistent data in the db.
* <p>
* The data contains information necessary to delete the subscriber
*
* @param clientId the client identity
* @throws JMSException.
*/
public void deleteSubscriber(long clientId) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("deleteSubscriber(clientId=" + clientId
+ ") [sessionId=" + _sessionId + "]");
}
// retrieve the endpoint corresponding to the client id and
// then acknowledge the messsage
ConsumerEndpoint consumer =
(ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
if (consumer == null) {
throw new JMSException("Failed to close consumer with id " +
"[" + hashCode() + ":" + clientId + "]");
}
ConsumerManager.instance().deleteConsumerEndpoint(consumer);
}
/**
* Delete the publisher associated with the specified topic from the
* session. If the corresponding publisher does not exist or it cannot
* delete it then throw the JMSException.
*
* @param topic sender destination
* @throws JMSException.
*/
public void deletePublisher(JmsTopic topic) throws JMSException {
// no-op
}
/**
* Unsubscribe a durable subscription. This will delete the state of
* the durable subscriber maintained by the server. A durable subscriber
* is uniquely identifiable and the same subscriber cannot be associated
* with more than topic.
*
* @param name the name used to uniquely identify the
* subscription
* @throws JMSException if the subscription cannot be removed
* or any other problem.
*/
public void unsubscribe(String name) throws JMSException {
if (_log.isDebugEnabled()) {
_log.debug("unsubscribe(name=" + name + ") [sessionId="
+ _sessionId + "]");
}
ConsumerManager manager = ConsumerManager.instance();
// check that the durable consumer actually exists. If it doesn't then
// throw an exception
if (!manager.durableConsumerExists(name)) {
throw new InvalidDestinationException(name +
" is not a durable subscriber name");
}
// check that the durable consumer is not active before removing it. If
// it is then throw an exception
if (!manager.isDurableConsumerActive(name)) {
manager.removeDurableConsumer(name);
} else {
throw new JMSException("Failed to unsubscribe subscriber "
+ name + " since is still active");
}
}
/**
* Stop message delivery to this session. If there are any problems
* completing the request then throw the JMSException exception
*
* @throws JMSException
*/
public void stopMessageDelivery() throws JMSException {
stop();
}
/**
* Start message delivery to this session. If there are any problems
* completing this request then throw the JMSException exception
*
* @throws JMSException
*/
public void startMessageDelivery() throws JMSException {
start();
}
/**
* Check if the specified message handle is in the session's list
* of unacked messages
*
* @param handle - the handle to query
* @return boolean - true if it is and false otherwise
*/
public boolean containsUnackedHandle(MessageHandle handle) {
return _sentMessageCache.handleInCache(handle);
}
// implementation of InternalMessageListener.onMessage
public void onMessage(MessageHandle handle, boolean ignore)
throws Exception {
if ((handle != null) &&
(_listener != null)) {
MessageImpl message = handle.getMessage();
MessageImpl m = null;
if (message != null) {
m = (MessageImpl) message.clone();
m.setClientId(handle.getClientId());
m.setJMSRedelivered(handle.getDelivered());
// if we are acking the message and the session is
// transacted and the acknowledge mode is
// CLIENT_ACKNOWLEDGE then send it to the cache before
// we send it to the listener. This will enable clients
// to ack the message while in the onMessage method
if ((_transacted) ||
(_ackMode == Session.CLIENT_ACKNOWLEDGE)) {
_sentMessageCache.process(handle);
}
try {
// send the message to the listener.
_listener.onMessage(m);
// if the session is not transacted or the acknowledge mode
// is not CLIENT_ACKNOWLEDGE then process it through the
// sent message cache now.
if ((!_transacted) &&
(_ackMode != Session.CLIENT_ACKNOWLEDGE)) {
_sentMessageCache.process(handle);
}
} catch (ClientDisconnectionException exception) {
// close all resources and rethrow it
close();
throw exception;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -