📄 transactedexample.java
字号:
* confirmation message from a supplier to the vendor. * * It demonstrates the use of transactions within message * listeners. */ public static class VendorMessageListener implements MessageListener { final SampleUtilities.DoneLatch monitor = new SampleUtilities.DoneLatch(); private final Session session; int numSuppliers; /** * Constructor. Instantiates the message listener with * the session of the consuming class (the vendor). * * @param s the session of the consumer * @param numSuppliers the number of suppliers */ public VendorMessageListener(Session s, int numSuppliers) { this.session = s; this.numSuppliers = numSuppliers; } /** * Casts the message to a MapMessage and processes the * order. A message that is not a MapMessage is * interpreted as the end of the message stream, and the * message listener sets its monitor state to all done * processing messages. * * Each message received represents a fulfillment message * from a supplier. * * @param message the incoming message */ public void onMessage(Message message) { /* * If message is an end-of-message-stream message and * this is the last such message, set monitor status * to all done processing messages and commit * transaction. */ if (!(message instanceof MapMessage)) { if (Order.outstandingOrders() == 0) { numSuppliers--; if (numSuppliers == 0) { monitor.allDone(); } } try { session.commit(); } catch (JMSException je) { } return; } /* * Message is an order confirmation message from a * supplier. */ int orderNumber = -1; try { MapMessage component = (MapMessage) message; /* * Process the order confirmation message and * commit the transaction. */ orderNumber = component.getInt("VendorOrderNumber"); Order order = Order.getOrder(orderNumber) .processSubOrder(component); session.commit(); /* * If this message is the last supplier message, * send message to Retailer and commit * transaction. */ if (!order.isPending()) { System.out.println("Vendor: Completed " + "processing for order " + order.orderNumber); javax.jms.Queue replyQueue = (javax.jms.Queue) order.order.getJMSReplyTo(); MessageProducer producer = session.createProducer(replyQueue); MapMessage retailerConfirmMessage = session.createMapMessage(); if (order.isFulfilled()) { retailerConfirmMessage.setBoolean("OrderAccepted", true); System.out.println("Vendor: sent " + order.quantity + " computer(s)"); } else if (order.isCancelled()) { retailerConfirmMessage.setBoolean("OrderAccepted", false); System.out.println("Vendor: unable to " + "send " + order.quantity + " computer(s)"); } producer.send(retailerConfirmMessage); session.commit(); System.out.println(" Vendor: committed " + "transaction 2"); } } catch (JMSException je) { je.printStackTrace(); try { session.rollback(); } catch (JMSException je2) { } } catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (JMSException je2) { } } } } /** * The GenericSupplier class receives an item order from the * vendor and sends a message accepting or refusing it. */ public static class GenericSupplier extends Thread { final String PRODUCT_NAME; final String IN_ORDER_TOPIC; int quantity = 0; /** * Constructor. Instantiates the supplier as the * supplier for the kind of item to be ordered. * * @param itemName the name of the item being ordered * @param inTopic the topic from which the order is * obtained */ public GenericSupplier(String itemName, String inTopic) { PRODUCT_NAME = itemName; IN_ORDER_TOPIC = inTopic; } /** * Checks to see if there are enough items in inventory. * Rather than go to a database, it generates a random * number related to the order quantity, so that some of * the time there won't be enough in stock. * * @return the number of items in inventory */ public int checkInventory() { Random rgen = new Random(); return (rgen.nextInt(quantity * 5)); } /** * Runs the thread. */ public void run() { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Topic orderTopic = null; MessageConsumer receiver = null; Message inMessage = null; MapMessage orderMessage = null; MapMessage outMessage = null; try { connectionFactory = SampleUtilities.getConnectionFactory(); connection = connectionFactory.createConnection(); session = connection.createSession(true, 0); orderTopic = SampleUtilities.getTopic(IN_ORDER_TOPIC, session); } catch (Exception e) { System.err.println("Connection problem: " + e.toString()); System.err.println("Program assumes three " + "queues named jms/AQueue, jms/BQueue, and jms/CQueue " + "and one topic named jms/OTopic"); if (connection != null) { try { connection.close(); } catch (JMSException ee) { } } System.exit(1); } /* * Create receiver for order topic and start message * delivery. */ try { receiver = session.createConsumer(orderTopic); connection.start(); } catch (JMSException je) { } /* * Keep checking supplier order topic for order * request until end-of-message-stream message is * received. Receive order and send an order * confirmation as one transaction. */ while (true) { try { inMessage = receiver.receive(); if (inMessage instanceof MapMessage) { orderMessage = (MapMessage) inMessage; } else { /* * Message is an end-of-message-stream * message. Send a similar message to * reply queue, commit transaction, then * stop processing orders by breaking out * of loop. */ MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); producer.send(session.createMessage()); session.commit(); break; } /* * Extract quantity ordered from order * message. */ quantity = orderMessage.getInt("Quantity"); System.out.println(PRODUCT_NAME + " Supplier: Vendor ordered " + quantity + " " + PRODUCT_NAME + "(s)"); /* * Create sender and message for reply queue. * Set order number and item; check inventory * and set quantity available. * Send message to vendor and commit * transaction. */ MessageProducer producer = session.createProducer((javax.jms.Queue) orderMessage.getJMSReplyTo()); outMessage = session.createMapMessage(); outMessage.setInt("VendorOrderNumber", orderMessage.getInt("VendorOrderNumber")); outMessage.setString("Item", PRODUCT_NAME); int numAvailable = checkInventory(); if (numAvailable >= quantity) { outMessage.setInt("Quantity", quantity); } else { outMessage.setInt("Quantity", numAvailable); } producer.send(outMessage); System.out.println(PRODUCT_NAME + " Supplier: sent " + outMessage.getInt("Quantity") + " " + outMessage.getString("Item") + "(s)"); session.commit(); System.out.println(" " + PRODUCT_NAME + " Supplier: committed transaction"); } catch (Exception e) { System.err.println(PRODUCT_NAME + " Supplier: Exception occurred: " + e.toString()); e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -