📄 jmsinvocationhandler.java
字号:
package book.jmx.examples;
import java.util.*;
import java.lang.reflect.*;
import java.rmi.*;
import javax.jms.*;
import javax.naming.*;
import javax.management.*;
public class JMSInvocationHandler
implements MessageListener, InvocationHandler, JMSConnectorConstants {
// JMS Topic
private Topic topic = null;
private TopicConnection con = null;
private TopicSession session = null;
private TopicPublisher publisher = null;
private TopicSubscriber subscriber = null;
private Context ctx = null;
// keeps track of the message IDs
static long messageID = 1;
// Stores the references to the callback objects. When a reply
// matching to the ID is received the return value is inserted
// to the callback instance and any threads waiting on it will
// be notified.
private Map callbackBuffer =
Collections.synchronizedMap(new HashMap());
// constructor
public JMSInvocationHandler(Properties props)
throws NamingException, JMSException {
// retrieve connection properties
String conFactory =
props.getProperty(JMS_TOPIC_CONNECTION_FACTORY);
String topicName =
props.getProperty(JMS_TOPIC);
// lookup connection factory and create topic connection
// and topic session
ctx = new InitialContext();
TopicConnectionFactory factory =
(TopicConnectionFactory)ctx.lookup(conFactory);
con = factory.createTopicConnection();
session = con.createTopicSession(
false, /* not a transacted session */
Session.AUTO_ACKNOWLEDGE
);
// Proxy acts as a publisher of invocations and subscribes
// for reply messages
topic = (Topic)ctx.lookup(topicName);
publisher = session.createPublisher(topic);
subscriber = session.createSubscriber(topic, "JMSType='REPLY'", true);
// topic is non persistent.
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
publisher.setDisableMessageTimestamp(true);
subscriber.setMessageListener(this);
con.start();
}
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
// initialize MethodInvocation
String methodName = method.getName();
MethodInvocation mi = new MethodInvocation(method);
mi.setParams(args);
// create message ID and callback object.
// Map ID to callback in callbackBuffer.
String ID = "" + messageID++;
Callback cb = new JMSCallback(ID, mi);
callbackBuffer.put(ID, cb);
// wrap MethodInvocation in JMS Object message
ObjectMessage msg = session.createObjectMessage(mi);
msg.setJMSType("SEND");
msg.setStringProperty("ID", ID);
if (methodName.equals("close")) {
close();
return null;
}
// send
publisher.publish(msg);
return cb;
}
private void close() {
try {
con.close();
}
catch (JMSException e) {
e.printStackTrace();
}
}
// subscriber for reply messages
public void onMessage(Message message) {
try {
// extract the MethodInvocation and message ID
ObjectMessage msg = (ObjectMessage)message;
MethodInvocation mi = (MethodInvocation)msg.getObject();
String ID = msg.getStringProperty("ID");
// find the corresponding callback object (ID is the key)
JMSCallback cb = (JMSCallback)callbackBuffer.get(ID);
// JPL: REMOVE FROM QUEUE!
// setMI() implementation in JMS Callback will set the
// return value and notify all waiting threads
cb.setMethodInvocation(mi);
}
catch (JMSException e) {
e.printStackTrace();
}
}
// JMS callback implementation
class JMSCallback implements Callback {
private int status = UNKNOWN;
private String ID = null;
private MethodInvocation mi = null;
JMSCallback(String ID, MethodInvocation mi) {
status = SENDING;
this.ID = ID;
this.mi = mi;
}
// notifies all threads that the answer has arrived
protected void setMethodInvocation(MethodInvocation mi) {
synchronized (this) {
this.mi = mi;
status = FINISHED;
notifyAll();
}
}
// returns status, won't block
public int peek() {
return status;
}
// blocks on status -- status set to FINISHED after
// return value has been set and before threads are
// notified.
public Object get() throws RemoteException {
synchronized (this) {
while (status != FINISHED) {
try {
wait();
}
catch (InterruptedException e) {}
}
}
// if an exception was thrown, wrap it in RemoteExc.
// and throw to the client
if (mi.getStatus() == MethodInvocation.ERROR) {
Throwable t = (Throwable)mi.getReturnValue();
if (t instanceof RuntimeMBeanException) {
RuntimeMBeanException e = (RuntimeMBeanException)t;
throw new RemoteException(e.getMessage(), e.getTargetException());
}
throw new RemoteException("", (Throwable)mi.getReturnValue());
}
// return value
return mi.getReturnValue();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -