📄 transactedexample.java
字号:
public void run() { Connection connection = null; Session session = null; Session asyncSession = null; MessageConsumer vendorOrderReceiver = null; MessageProducer supplierOrderProducer = null; MapMessage orderMessage = null; MessageConsumer vendorConfirmReceiver = null; VendorMessageListener listener = null; Message inMessage = null; MapMessage vendorOrderMessage = null; Message endOfMessageStream = null; Order order = null; int quantity = 0; try { connection = connectionFactory.createConnection(); session = connection.createSession(true, 0); asyncSession = connection.createSession(true, 0); } catch (Exception e) { System.err.println("Connection problem: " + e.toString()); System.err.println( "Program assumes three " + "queues named jms/AQueue, jms/BQueue, and jms/CQueue " + "and one topic named jms/OTopic"); if (connection != null) { try { connection.close(); } catch (JMSException ee) { } } System.exit(1); } try { /* * Create receiver for vendor order queue, sender * for supplier order topic, and message to send * to suppliers. */ vendorOrderReceiver = session.createConsumer(vendorOrderQueue); supplierOrderProducer = session.createProducer( supplierOrderTopic); orderMessage = session.createMapMessage(); /* * Configure an asynchronous message listener to * process supplier replies to inquiries for * parts to fill order. Start delivery. */ vendorConfirmReceiver = asyncSession.createConsumer( vendorConfirmQueue); listener = new VendorMessageListener(asyncSession, 2); vendorConfirmReceiver.setMessageListener(listener); connection.start(); /* * Process orders in vendor order queue. * Use one transaction to receive order from * order queue and send message to suppliers' * order topic to order components to fulfill * the order placed with the vendor. */ while (true) { try { // Receive an order from a retailer. inMessage = vendorOrderReceiver.receive(); if (inMessage instanceof MapMessage) { vendorOrderMessage = (MapMessage) inMessage; } else { /* * Message is an end-of-message- * stream message from retailer. * Send similar messages to * suppliers, then break out of * processing loop. */ endOfMessageStream = session.createMessage(); endOfMessageStream.setJMSReplyTo( vendorConfirmQueue); supplierOrderProducer.send(endOfMessageStream); session.commit(); break; } /* * A real application would check an * inventory database and order only the * quantities needed. Throw an exception * every few times to simulate a database * concurrent-access exception and cause * a rollback. */ if (rgen.nextInt(3) == throwException) { throw new JMSException( "Simulated " + "database concurrent access " + "exception"); } /* * Record retailer order as a pending * order. */ order = new Order(vendorOrderMessage); /* * Set order number and reply queue for * outgoing message. */ orderMessage.setInt( "VendorOrderNumber", order.orderNumber); orderMessage.setJMSReplyTo(vendorConfirmQueue); quantity = vendorOrderMessage.getInt("Quantity"); System.out.println( "Vendor: Retailer " + "ordered " + quantity + " " + vendorOrderMessage.getString("Item")); // Send message to supplier topic. // Item is not used by supplier. orderMessage.setString("Item", ""); orderMessage.setInt("Quantity", quantity); supplierOrderProducer.send(orderMessage); System.out.println( "Vendor: ordered " + quantity + " monitor(s) and hard drive(s)"); // Commit session. session.commit(); System.out.println( " Vendor: " + "committed transaction 1"); } catch (JMSException e) { System.err.println( "Vendor: " + "JMSException occurred: " + e.toString()); e.printStackTrace(); session.rollback(); System.err.println( " Vendor: rolled " + "back transaction 1"); } } // Wait till suppliers get back with answers. listener.monitor.waitTillDone(); } catch (JMSException e) { System.err.println( "Vendor: Exception " + "occurred: " + e.toString()); e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } } } /** * The VendorMessageListener class processes an order * confirmation message from a supplier to the vendor. * * It demonstrates the use of transactions within message * listeners. */ public static class VendorMessageListener implements MessageListener { final SampleUtilities.DoneLatch monitor = new SampleUtilities.DoneLatch(); int numSuppliers; private final Session session; /** * Constructor. Instantiates the message listener with * the session of the consuming class (the vendor). * * @param s the session of the consumer * @param numSuppliers the number of suppliers */ public VendorMessageListener( Session s, int numSuppliers) { this.session = s; this.numSuppliers = numSuppliers; } /** * Casts the message to a MapMessage and processes the * order. A message that is not a MapMessage is * interpreted as the end of the message stream, and the * message listener sets its monitor state to all done * processing messages. * * Each message received represents a fulfillment message * from a supplier. * * @param message the incoming message */ public void onMessage(Message message) { /* * If message is an end-of-message-stream message and * this is the last such message, set monitor status * to all done processing messages and commit * transaction. */ if (!(message instanceof MapMessage)) { if (Order.outstandingOrders() == 0) { numSuppliers--; if (numSuppliers == 0) { monitor.allDone(); } } try { session.commit(); } catch (JMSException je) { } return; } /* * Message is an order confirmation message from a * supplier. */ int orderNumber = -1; try { MapMessage component = (MapMessage) message; /* * Process the order confirmation message and * commit the transaction. */ orderNumber = component.getInt("VendorOrderNumber"); Order order = Order.getOrder(orderNumber) .processSubOrder(component); session.commit(); /* * If this message is the last supplier message, * send message to Retailer and commit * transaction. */ if (!order.isPending()) { System.out.println( "Vendor: Completed " + "processing for order " + order.orderNumber); Queue replyQueue = (Queue) order.order.getJMSReplyTo(); MessageProducer producer = session.createProducer( replyQueue); MapMessage retailerConfirmMessage = session.createMapMessage(); if (order.isFulfilled()) { retailerConfirmMessage.setBoolean( "OrderAccepted", true); System.out.println( "Vendor: sent " + order.quantity + " computer(s)"); } else if (order.isCancelled()) { retailerConfirmMessage.setBoolean( "OrderAccepted", false); System.out.println( "Vendor: unable to " + "send " + order.quantity + " computer(s)"); } producer.send(retailerConfirmMessage); session.commit(); System.out.println( " Vendor: committed " + "transaction 2"); } } catch (JMSException je) { je.printStackTrace(); try { session.rollback(); } catch (JMSException je2) { } } catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (JMSException je2) { } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -