⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmsinvocationhandler.java

📁 jmx codeJava源码
💻 JAVA
字号:

package book.jmx.examples;

import java.util.*;
import java.lang.reflect.*;
import java.rmi.*;
import javax.jms.*;
import javax.naming.*;
import javax.management.*;

public class JMSInvocationHandler 
    implements MessageListener, InvocationHandler, JMSConnectorConstants {
  
  // JMS Topic
  private Topic topic                = null;
  private TopicConnection con        = null;
  private TopicSession session       = null;
  private TopicPublisher publisher   = null;
  private TopicSubscriber subscriber = null;
  private Context ctx                = null;

  // keeps track of the message IDs
  static long messageID = 1;

  // Stores the references to the callback objects. When a reply
  // matching to the ID is received the return value is inserted
  // to the callback instance and any threads waiting on it will
  // be notified.
  private Map callbackBuffer = 
      Collections.synchronizedMap(new HashMap());
  
  
  // constructor
  public JMSInvocationHandler(Properties props) 
      throws NamingException, JMSException {
      
    // retrieve connection properties
    String conFactory = 
        props.getProperty(JMS_TOPIC_CONNECTION_FACTORY);
    String topicName  = 
        props.getProperty(JMS_TOPIC);

    // lookup connection factory and create topic connection
    // and topic session
    ctx = new InitialContext();
    TopicConnectionFactory factory =
        (TopicConnectionFactory)ctx.lookup(conFactory);
  
    con = factory.createTopicConnection();
      
    session = con.createTopicSession(
        false,      /* not a transacted session */
        Session.AUTO_ACKNOWLEDGE
    );
      
    // Proxy acts as a publisher of invocations and subscribes
    // for reply messages
    topic = (Topic)ctx.lookup(topicName);
    publisher   = session.createPublisher(topic);
    subscriber  = session.createSubscriber(topic, "JMSType='REPLY'", true);
    
    // topic is non persistent.
    publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    publisher.setDisableMessageTimestamp(true);
    subscriber.setMessageListener(this);
      
    con.start();     
  }
    
    
  public Object invoke(Object proxy, Method method,
                       Object[] args) throws Throwable {
    
    // initialize MethodInvocation
    String methodName   = method.getName();
    MethodInvocation mi = new MethodInvocation(method);
    mi.setParams(args);

    // create message ID and callback object.
    // Map ID to callback in callbackBuffer.
    String ID           = "" + messageID++;
    Callback cb         = new JMSCallback(ID, mi);
    callbackBuffer.put(ID, cb);

    // wrap MethodInvocation in JMS Object message
    ObjectMessage msg   = session.createObjectMessage(mi);
    msg.setJMSType("SEND");
    msg.setStringProperty("ID", ID);

    if (methodName.equals("close")) {
      close();
      return null;
    }
      
    // send
    publisher.publish(msg);
    return cb;
  }
    
  private void close() {
    try {
      con.close();
    }
    catch (JMSException e) {
      e.printStackTrace();   
    }
  }    
  
  
  // subscriber for reply messages
  
  public void onMessage(Message message) {
  
    try {
      // extract the MethodInvocation and message ID
      ObjectMessage msg   = (ObjectMessage)message;
      MethodInvocation mi = (MethodInvocation)msg.getObject();
      String ID           = msg.getStringProperty("ID");
      
      // find the corresponding callback object (ID is the key)
      JMSCallback cb      = (JMSCallback)callbackBuffer.get(ID);
// JPL: REMOVE FROM QUEUE!
      
      // setMI() implementation in JMS Callback will set the
      // return value and notify all waiting threads
      cb.setMethodInvocation(mi);
    }
    catch (JMSException e) {
      e.printStackTrace();
    }
  }
  
  
  // JMS callback implementation
  
  class JMSCallback implements Callback {
  
    private int status          = UNKNOWN;
    private String ID           = null;
    private MethodInvocation mi = null;
    
    
    JMSCallback(String ID, MethodInvocation mi) {
      
      status = SENDING;
      
      this.ID = ID;
      this.mi = mi;
    }
    
    // notifies all threads that the answer has arrived
    protected void setMethodInvocation(MethodInvocation mi) {
      synchronized (this) {
        this.mi = mi;
        status = FINISHED;

        notifyAll();
      }
    }
    
    // returns status, won't block
    public int peek() {
      return status;
    }
    
    // blocks on status -- status set to FINISHED after
    // return value has been set and before threads are
    // notified.
    public Object get() throws RemoteException {
    
      synchronized (this) {
        while (status != FINISHED) {
          try {
              wait();
          } 
          catch (InterruptedException e) {}
        }
      }
      
      // if an exception was thrown, wrap it in RemoteExc.
      // and throw to the client
      if (mi.getStatus() == MethodInvocation.ERROR) {
        Throwable t = (Throwable)mi.getReturnValue();
        
        if (t instanceof RuntimeMBeanException) {
          RuntimeMBeanException e = (RuntimeMBeanException)t;
          throw new RemoteException(e.getMessage(), e.getTargetException());
        }
        
        throw new RemoteException("", (Throwable)mi.getReturnValue());
      }
      
      // return value
      return mi.getReturnValue();
    }
  }
  
}
  

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -