jtjmstopicadapter.java

来自「Java Pattern Oriented Framework (Jt) 是为了」· Java 代码 · 共 482 行

JAVA
482
字号


package Jt.jms;

import Jt.*;
import Jt.jndi.*;

import javax.jms.*;

/**
 * Jt Adapter for the JMS publish/subscribe API. 
 */

public class JtJMSTopicAdapter extends JtJMSAdapter implements MessageListener {


    private static final long serialVersionUID = 1L;
    public static final String JtCLASS_NAME = JtJMSTopicAdapter.class.getName(); 

    private String topic;
    private String connectionFactory;
    private long timeout = 1L; // Receives the next message within the timeout interval
    private Object subject = null;  
    private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
    private int priority = Message.DEFAULT_PRIORITY;
    private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; // message never expires

    private transient JtJNDIAdapter jndiAdapter = null;
    private transient boolean initted = false; 

    private transient Topic jmsTopic;
    private transient TopicConnectionFactory tcFactory;
    private transient TopicConnection topicConnection;
    private transient TopicSession topicSession;
    private transient TopicPublisher topicPublisher;
    private transient TopicSubscriber topicSubscriber;

    // Intialize the JMS Adapter

    private void initial () {
        JtMessage msg = new JtMessage (JtJNDIAdapter.JtLOOKUP);


        jndiAdapter = new JtJNDIAdapter ();

        if (connectionFactory == null) {
            handleError ("Attribute value needs to be set (connectionFactory)");
            return;    
        }

        msg.setMsgContent ("TestJMSConnectionFactory");

        tcFactory = (TopicConnectionFactory) sendMessage (jndiAdapter, msg);

        if (tcFactory == null)
            return;

        if (topic == null) {
            handleError ("Attribute value needs to be set (topic)");
            return;    
        }
        msg.setMsgContent (topic); 

        jmsTopic = (Topic) sendMessage (jndiAdapter, msg);


        if (jmsTopic == null)
            return;

        try {
            topicConnection = tcFactory.createTopicConnection ();
            topicSession = topicConnection.createTopicSession (false, 
                    Session.AUTO_ACKNOWLEDGE);

        } catch (Exception e) {
            handleException (e);
        }

    }


    /**
     * Method used by this adapter to consume JMS messages
     * asynchronously.
     * @param message JMS message
     */

    public void onMessage (Message message) {
        JtMessage msg; 
        ObjectMessage omessage;     

        if (message == null)
            return;


        try {

            omessage = (ObjectMessage) message;
            msg =   (JtMessage) omessage.getObject ();

            if (subject == null) {
                handleWarning ("JtJMSAdapter.onMessage: the subject attribute needs to be set");
                return;
            }

            sendMessage (subject, msg);

        } catch (Exception ex) {
            handleException (ex);
        }
    }

    /**
     * Process object messages.
     * <ul>
     * <li> JtPUBLISH - Publish a JtMessage (msgContent).  
     * <li> JtRECEIVE  - Receive a JtMessage from the JMS queue and return it.
     * <li> The message is consumed synchronously.  
     * <li> JtSTART_LISTENING - Start listening and consume messages asynchronously.  
     * </ul>
     */

    public Object processMessage (Object message) {
        //String content;
        //String query;
        JtMessage e = (JtMessage) message;
        Object reply;
        JtMessage msg;


        if (e == null ||  (e.getMsgId() == null))
            return (null);


        if (e.getMsgId().equals (JtObject.JtREMOVE)) {
            return (null);
        }

        if (!initted) {
            initial ();
            initted = true;        
        }


        if (e.getMsgId().equals(JtJMSAdapter.JtPUBLISH)) {
            msg = (JtMessage) e.getMsgContent ();        
            reply = publishJMSMessage (msg);
            return (reply);
        }


        if (e.getMsgId().equals(JtJMSAdapter.JtSTART_LISTENING)) {
            startListening ();
            return (null);
        }

        if (e.getMsgId().equals(JtJMSAdapter.JtRECEIVE)) {     
            reply = receiveJMSMessage ();
            return (reply);
        }

        // Test the Publisher functionality

        if (e.getMsgId().equals(JtJMSAdapter.JtTEST_PUBLISHER)) {
            reply = testPublisher ();
            return (reply);
        }

        // Test the Subscriber functionality

        if (e.getMsgId().equals(JtJMSAdapter.JtTEST_SUBSCRIBER)) {
            reply = testSubscriber ();
            return (reply);
        }


        handleError 
        ("processMessage: invalid message id:"+
                e.getMsgId());
        return (null);
    }



    /**
     * Specifies the JNDI name of the JMS topic.
     * @param topic topic
     */

    public void setTopic (String topic) {
        this.topic = topic;
    }


    /**
     * Returns the JNDI name of the JMS topic.
     */

    public String getTopic () {
        return (topic);
    }

    /**
     * Specifies the timeout interval (refer to javax.jms.MessageConsumer)
     * @param timeout timeout
     */

    public void setTimeout (long timeout) {
        this.timeout = timeout;
    }


    /**
     * Returns timeout (refer to javax.jms.MessageConsumer)
     */

    public long getTimeout () {
        return (timeout);
    }


    /**
     * Sets the delivery mode (persistent or non-persistent).
     * Messages will be published using this delivery mode. 
     * @param deliveryMode delivery mode
     */

    public void setDeliveryMode (int deliveryMode) {
        this.deliveryMode = deliveryMode;
    }


    /**
     * Returns the delivery mode (persistent or non-persistent)
     */

    public long getDeliveryMode () {
        return (deliveryMode);
    }




    /**
     * Sets the message priority. Messages will be published 
     * using this priority. 
     * @param priority message priority
     */

    public void setPriority (int priority) {
        this.priority = priority;
    }


    /**
     * Returns the message priority.
     */

    public long getPriority () {
        return (priority);
    }


    /**
     * Sets the message time to live (in milliseconds). Messages will be published 
     * using this value. 
     * @param timeToLive message time to live
     */

    public void setTimeToLive (long timeToLive) {
        this.timeToLive = timeToLive;
    }


    /**
     * Returns the message time to live (in milliseconds).
     */

    public long getTimeToLive () {
        return (timeToLive);
    }

    /**
     * Specifies the subject (JtObject). Messages received asynchronously are forwarded to
     * this Jt object for processing. 
     * @param subject subject
     */


    public void setSubject (Object subject) {
        this.subject = subject;
    }


    /**
     * Returns the subject. Messages received asynchronously are forwarded to
     * this Jt object for processing. 
     */

    public Object getSubject () {
        return (subject);
    }


    /**
     * Specifies the JNDI name of the connection factory.
     * @param connectionFactory connection factory
     */

    public void setConnectionFactory (String connectionFactory) {
        this.connectionFactory = connectionFactory;
    }


    /**
     * Returns the JNDI name of the connection factory.
     */

    public String getConnectionFactory () {
        return (connectionFactory);
    }




    private Object testSubscriber () {
        String reply = "PASS";
        //TextMessage message;
        //ObjectMessage message;
        JtMessage msg;



        for (;;) {

            msg = (JtMessage) sendMessage (this, new JtMessage (JtJMSAdapter.JtRECEIVE));

            if (msg == null) {
                System.out.println ("no more messages");
                break;
            } 

            System.out.println ("msgId:" + msg.getMsgId ());

        }

        return (reply);
    }


    // Send a Jt message using JMS as the transport layer

    private Object publishJMSMessage (JtMessage msg) {

        ObjectMessage omsg;
        String reply = "PASS";


        if (msg == null) {
            reply = "FAIL";
            return (reply);
        }

        try {

            if (topicPublisher == null)
                topicPublisher = topicSession.createPublisher (jmsTopic);

            omsg = topicSession.createObjectMessage (); 
            omsg.setObject (msg); 


            topicPublisher.publish (omsg, deliveryMode, priority, timeToLive);
        } catch (Exception e) {
            handleException (e);
            reply = "FAIL";
        }
        return (reply);
    }

    private void startListening () {


        try {
            if (topicSubscriber == null)
                topicSubscriber = topicSession.createSubscriber (jmsTopic);


            if (topicConnection == null) {
                handleError ("receiveJMSMessage:topicConnection is null");
                return; 
            }

            // Use the adapter as the message listener

            topicSubscriber.setMessageListener (this);

            topicConnection.start ();
        } catch (Exception ex) {

            handleException (ex);
        }

    }

    private JtMessage receiveJMSMessage () {

        JtMessage msg = null;
        ObjectMessage message;


        try {

            if (topicSubscriber == null)
                topicSubscriber = topicSession.createSubscriber (jmsTopic);


            if (topicConnection == null) {
                handleError ("receiveJMSMessage:topicConnection is null");
                return (null); 
            }
            topicConnection.start ();

            message = (ObjectMessage) topicSubscriber.receive (timeout);
            if (message != null) {
                msg =  (JtMessage) message.getObject ();
            } 

        } catch (Exception e) {
            handleException (e);
        }

        return (msg);

    }



    private Object testPublisher () {
        //String reply = "PASS";
        //TextMessage message;
        //ObjectMessage omsg;
        JtMessage msg = new JtMessage ("JtHELLO");
        JtMessage wrapper = new JtMessage (JtJMSAdapter.JtPUBLISH);

        wrapper.setMsgContent (msg);


        return (sendMessage (this, wrapper));
    }

    /**
     * Demonstrates the messages processed by JtJMSTopicAdapter. 
     */

    public static void main (String[] args) {
        JtFactory main;
        JtJMSTopicAdapter jmsAdapter;

        main = new JtFactory ();


        jmsAdapter = (JtJMSTopicAdapter) main.createObject (JtJMSTopicAdapter.JtCLASS_NAME, "jmsAdapter");

        if (args.length < 1) {
            System.err.println ("Usage: java Jt.jms.JtJMSTopicAdapter -p or java Jt.jms.JtJMSTopicAdapter -s");
            System.exit (1);
        }

        if (args[0].equals ("-p")) {
            main.sendMessage (jmsAdapter, new JtMessage (JtJMSAdapter.JtTEST_PUBLISHER));
            System.exit (0);
        } else if (args[0].equals ("-s")) {
            main.sendMessage (jmsAdapter, new JtMessage (JtJMSAdapter.JtTEST_SUBSCRIBER));
            System.exit (0);
        } else
            System.err.println ("Usage: java Jt.jms.JtJMSTopicAdapter -p or java Jt.jms.JtJMSTopicAdapter -s");

        main.removeObject (jmsAdapter);

    }
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?