📄 ackequivexample.java
字号:
/* * Copyright (c) 2006 Sun Microsystems, Inc. All rights reserved. U.S. * Government Rights - Commercial software. Government users are subject * to the Sun Microsystems, Inc. standard license agreement and * applicable provisions of the FAR and its supplements. Use is subject * to license terms. * * This distribution may include materials developed by third parties. * Sun, Sun Microsystems, the Sun logo, Java and J2EE are trademarks * or registered trademarks of Sun Microsystems, Inc. in the U.S. and * other countries. * * Copyright (c) 2006 Sun Microsystems, Inc. Tous droits reserves. * * Droits du gouvernement americain, utilisateurs gouvernementaux - logiciel * commercial. Les utilisateurs gouvernementaux sont soumis au contrat de * licence standard de Sun Microsystems, Inc., ainsi qu'aux dispositions * en vigueur de la FAR (Federal Acquisition Regulations) et des * supplements a celles-ci. Distribue par des licences qui en * restreignent l'utilisation. * * Cette distribution peut comprendre des composants developpes par des * tierces parties. Sun, Sun Microsystems, le logo Sun, Java et J2EE * sont des marques de fabrique ou des marques deposees de Sun * Microsystems, Inc. aux Etats-Unis et dans d'autres pays. */import javax.jms.ConnectionFactory;import javax.jms.Queue;import javax.jms.Topic;import javax.jms.Connection;import javax.jms.Session;import javax.jms.MessageProducer;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Message;import javax.jms.TextMessage;import javax.jms.JMSException;import javax.annotation.Resource;public class AckEquivExample { @Resource(mappedName = "jms/ConnectionFactory") private static ConnectionFactory connectionFactory; @Resource(mappedName = "jms/DurableConnectionFactory") private static ConnectionFactory durableConnectionFactory; @Resource(mappedName = "jms/ControlQueue") private static Queue controlQueue; @Resource(mappedName = "jms/Queue") private static Queue queue; @Resource(mappedName = "jms/Topic") private static Topic topic; final String CONTROL_QUEUE = "jms/ControlQueue"; final String conFacName = "jms/DurableConnectionFactory"; final String queueName = "jms/Queue"; final String topicName = "jms/Topic"; /** * Instantiates the sender, receiver, subscriber, and * publisher classes and starts their threads. * Calls the join method to wait for the threads to die. */ public void run_threads() { SynchSender synchSender = new SynchSender(); SynchReceiver synchReceiver = new SynchReceiver(); AsynchSubscriber asynchSubscriber = new AsynchSubscriber(); MultiplePublisher multiplePublisher = new MultiplePublisher(); synchSender.start(); synchReceiver.start(); try { synchSender.join(); synchReceiver.join(); } catch (InterruptedException e) { } try { asynchSubscriber.start(); Thread.sleep(1000); multiplePublisher.start(); } catch (InterruptedException e) { } try { asynchSubscriber.join(); multiplePublisher.join(); } catch (InterruptedException e) { } } /** * Calls the run_threads method to execute the program * threads. * * @param args the topic used by the example */ public static void main(String[] args) { AckEquivExample aee = new AckEquivExample(); if (args.length != 0) { System.out.println("Program takes no arguments."); System.exit(1); } System.out.println("Queue name is " + aee.CONTROL_QUEUE); System.out.println("Queue name is " + aee.queueName); System.out.println("Topic name is " + aee.topicName); System.out.println("Connection factory name is " + aee.conFacName); aee.run_threads(); System.exit(0); } /** * The AsynchSubscriber class creates a session in * AUTO_ACKNOWLEDGE mode and fetches several messages from a * topic asynchronously, using a message listener, * TextListener. * * Each message is acknowledged after the onMessage method * completes. */ public class AsynchSubscriber extends Thread { /** * Runs the thread. */ public void run() { Connection connection = null; Session session = null; MessageConsumer subscriber = null; TextListener listener = null; try { connection = durableConnectionFactory.createConnection(); session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE); System.out.println( "SUBSCRIBER: Created " + "auto-acknowledge session"); } catch (Exception e) { System.err.println( "Connection problem with subscriber: " + e.toString()); if (connection != null) { try { connection.close(); } catch (JMSException ee) { } } System.exit(1); } /* * Create auto-acknowledge subscriber. * Register message listener (TextListener). * Start message delivery. * Send synchronize message to publisher, then wait * till all messages have arrived. * Listener displays the messages obtained. */ try { subscriber = session.createDurableSubscriber(topic, "AckSub"); listener = new TextListener(); subscriber.setMessageListener(listener); connection.start(); // Let publisher know that subscriber is ready. try { SampleUtilities.sendSynchronizeMessage( "SUBSCRIBER: ", connectionFactory, controlQueue); } catch (Exception e) { System.err.println( "Connection or queue problem with subscriber: " + e.toString()); e.printStackTrace(); if (connection != null) { try { connection.close(); } catch (JMSException ee) { } } System.exit(1); } /* * Asynchronously process messages. * Block until publisher issues a control message * indicating end of publish stream. */ listener.monitor.waitTillDone(); subscriber.close(); session.unsubscribe("AckSub"); } catch (JMSException e) { System.err.println("Exception occurred: " + e.toString()); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } } /** * The TextListener class implements the MessageListener * interface by defining an onMessage method for the * AsynchSubscriber class. */ private class TextListener implements MessageListener { final SampleUtilities.DoneLatch monitor = new SampleUtilities.DoneLatch(); /** * Casts the message to a TextMessage and displays * its text. A non-text message is interpreted as the * end of the message stream, and the message * listener sets its monitor state to all done * processing messages. * * @param message the incoming message */ public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { System.out.println( "SUBSCRIBER: " + "Processing message: " + msg.getText());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -