📄 ipcjmsmessagelistener.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
*
* $Id: IpcJmsMessageListener.java,v 1.8 2003/08/07 13:32:54 tanderson Exp $
*
* Date Author Changes
* $Date jimm Created
*/
package org.exolab.jms.client.mipc;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import javax.jms.Message;
import org.exolab.core.mipc.MultiplexConnectionIfc;
import org.exolab.core.mipc.ObjectChannel;
import org.exolab.jms.client.JmsMessageListener;
/**
* All callbacks for ipc message delivery are registered with the single
* instance of this object, by their session id's.
* Arriving messages are also contain a session id, which is used to look up
* the callback this message is destined for.
*
* @version $Revision: 1.8 $ $Date: 2003/08/07 13:32:54 $
* @author <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
* @see org.exolab.jms.client.mipc.IpcJmsSessionStub
*/
class IpcJmsMessageListener implements Runnable {
/**
* The set of MessageListener instances, keyed on session id
*/
private Map _listeners = Collections.synchronizedMap(new HashMap(10));
/**
* The channel to receive messages on
*/
private ObjectChannel _channel;
/**
* Thread for receiving and dispatching messages
*/
private Thread _thread;
/**
* If <code>true</code> indicates that the thread should stop
*/
private boolean _interrupted = false;
/**
* Synchronization helper
*/
private final Object _lock = new Object();
/**
* Construct a new <code>IpcJmsMessageListener</code>
*
* @param connection the connection to the server
*/
public IpcJmsMessageListener(MultiplexConnectionIfc connection) {
if (connection == null) {
throw new IllegalArgumentException(
"Argument 'connection' is null");
}
synchronized (connection) {
_channel = new ObjectChannel("message", connection);
}
}
/**
* Set a new callback for a session.
*
* @param sessionId the unique session id
* @param listener the listener callback to call when new messages arrive.
*/
public void setListener(String sessionId, JmsMessageListener listener) {
_listeners.put(sessionId, listener);
}
/**
* A session has been closed. Remove its callback.
*
* @param sessionId the unique session id
*/
public void closeSession(String sessionId) {
_listeners.remove(sessionId);
}
/**
* Close all session callbacks
*/
public void closeAllSessions() {
_listeners.clear();
}
/**
* This method is called back by the server whenever it has a message for
* this session.
*
* @param sessionId the session this message is for
* @param message the message being sent to this session.
*/
public void onMessage(String sessionId, Message message) {
JmsMessageListener listener =
(JmsMessageListener) _listeners.get(sessionId);
if (listener != null) {
listener.onMessage(message);
}
}
/**
* This method is called back by the server whenever it has one or more
* messages for this session.
*
* @param sessionId the session this message is for.
* @param messages the messages
*/
public void onMessages(String sessionId, Vector messages) {
JmsMessageListener listener =
(JmsMessageListener) _listeners.get(sessionId);
if (listener != null) {
listener.onMessages(messages);
}
}
/**
* This method is called to notify the client that a message is available.
*
* @param sessionId the session this message is for.
* @param clientId the client id to notify
*/
public void onMessageAvailable(String sessionId, long clientId) {
JmsMessageListener listener =
(JmsMessageListener) _listeners.get(sessionId);
if (listener != null) {
listener.onMessageAvailable(clientId);
}
}
/**
* Start a thread to receive messages
*/
public void start() {
synchronized (_lock) {
if (_thread == null) {
_interrupted = false;
_thread = new Thread(this);
_thread.start();
}
}
}
/**
* Stop the message receiver thread
*/
public void stop() {
closeAllSessions();
synchronized (_lock) {
if (_thread != null) {
_interrupted = true;
_thread.interrupt();
}
}
}
/**
* This is called when we start up an inbound message pump in a
* a separate thread. The method sits in a loop and consumes messages
* coming from the server.
*/
public void run() {
try {
while (!_interrupted) {
Vector v = (Vector) _channel.receive();
String sessionId = (String) v.get(0);
if (v.get(1) instanceof Message) {
onMessage(sessionId, (Message) v.get(1));
} else if (v.get(1) instanceof Vector) {
onMessages(sessionId, (Vector) v.get(1));
} else if (v.get(1) instanceof Long) {
onMessageAvailable(sessionId,
((Long) v.get(1)).longValue());
}
_channel.send(new Vector());
}
} catch (Exception ignore) {
} finally {
try {
_channel.close();
} catch (Exception ignore) {
}
synchronized (_lock) {
_thread = null;
}
}
}
} //-- IpcJmsMessageListener
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -