📄 transactedexample.java
字号:
public VendorMessageListener(QueueSession qs, int numSuppliers) { this.session = qs; this.numSuppliers = numSuppliers; } /** * Casts the message to a MapMessage and processes the order. * A message that is not a MapMessage is interpreted as the end of the * message stream, and the message listener sets its monitor state to * all done processing messages. * <p> * Each message received represents a fulfillment message from * a supplier. * * @param message the incoming message */ public void onMessage(Message message) { /* * If message is an end-of-message-stream message and this is the * last such message, set monitor status to all done processing * messages and commit transaction. */ if (! (message instanceof MapMessage)) { if (Order.outstandingOrders() == 0) { numSuppliers--; if (numSuppliers == 0) { monitor.allDone(); } } try { session.commit(); } catch (JMSException je) {} return; } /* * Message is an order confirmation message from a supplier. */ int orderNumber = -1; try { MapMessage component = (MapMessage) message; /* * Process the order confirmation message and commit the * transaction. */ orderNumber = component.getInt("VendorOrderNumber"); Order order = Order.getOrder(orderNumber).processSubOrder(component); session.commit(); /* * If this message is the last supplier message, send message * to Retailer and commit transaction. */ if (! order.isPending()) { System.out.println("Vendor: Completed processing for order " + order.orderNumber); Queue replyQueue = (Queue) order.order.getJMSReplyTo(); QueueSender qs = session.createSender(replyQueue); MapMessage retailerConfirmationMessage = session.createMapMessage(); if (order.isFulfilled()) { retailerConfirmationMessage.setBoolean("OrderAccepted", true); System.out.println("Vendor: sent " + order.quantity + " computer(s)"); } else if (order.isCancelled()) { retailerConfirmationMessage.setBoolean("OrderAccepted", false); System.out.println("Vendor: unable to send " + order.quantity + " computer(s)"); } qs.send(retailerConfirmationMessage); session.commit(); System.out.println(" Vendor: committed transaction 2"); } } catch (JMSException je) { je.printStackTrace(); try { session.rollback(); } catch (JMSException je2) {} } catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (JMSException je2) {} } } } /** * The GenericSupplier class receives an item order from the * vendor and sends a message accepting or refusing it. * * @author Kim Haase * @author Joseph Fialli * @version 1.3, 08/18/00 */ public static class GenericSupplier extends Thread { final String PRODUCT_NAME; final String IN_ORDER_QUEUE; int quantity = 0; /** * Constructor. Instantiates the supplier as the supplier for the * kind of item being ordered. * * @param itemName the name of the item being ordered * @param inQueue the queue from which the order is obtained */ public GenericSupplier(String itemName, String inQueue) { PRODUCT_NAME = itemName; IN_ORDER_QUEUE = inQueue; } /** * Checks to see if there are enough items in inventory. * Rather than go to a database, it generates a random number related * to the order quantity, so that some of the time there won't be * enough in stock. * * @return the number of items in inventory */ public int checkInventory() { Random rgen = new Random(); return (rgen.nextInt(quantity * 5)); } /** * Runs the thread. */ public void run() { QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null; QueueSession queueSession = null; Queue orderQueue = null; QueueReceiver queueReceiver = null; Message inMessage = null; MapMessage orderMessage = null; MapMessage outMessage = null; try { queueConnectionFactory = SampleUtilities.getQueueConnectionFactory(); queueConnection = queueConnectionFactory.createQueueConnection(); queueSession = queueConnection.createQueueSession(true, 0); orderQueue = SampleUtilities.getQueue(IN_ORDER_QUEUE, queueSession); } catch (Exception e) { System.out.println("Connection problem: " + e.toString()); System.out.println("Program assumes six queues named A B C D E F"); if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException ee) {} } System.exit(1); } // Create receiver for order queue and start message delivery. try { queueReceiver = queueSession.createReceiver(orderQueue); queueConnection.start(); } catch (JMSException je) { exitResult = 1; } /* * Keep checking supplier order queue for order request until * end-of-message-stream message is received. * Receive order and send an order confirmation as one transaction. */ while (true) { try { inMessage = queueReceiver.receive(); if (inMessage instanceof MapMessage) { orderMessage = (MapMessage) inMessage; } else { /* * Message is an end-of-message-stream message. * Send a similar message to reply queue, commit * transaction, then stop processing orders by breaking * out of loop. */ QueueSender queueSender = queueSession.createSender((Queue) inMessage.getJMSReplyTo()); queueSender.send(queueSession.createMessage()); queueSession.commit(); break; } // Extract quantity ordered from order message. quantity = orderMessage.getInt("Quantity"); System.out.println(PRODUCT_NAME + " Supplier: Vendor ordered " + quantity + " " + orderMessage.getString("Item") + "(s)"); /* * Create sender and message for reply queue. * Set order number and item; check inventory and set * quantity available. * Send message to vendor and commit transaction. */ QueueSender queueSender = queueSession.createSender((Queue) orderMessage.getJMSReplyTo()); outMessage = queueSession.createMapMessage(); outMessage.setInt("VendorOrderNumber", orderMessage.getInt("VendorOrderNumber")); outMessage.setString("Item", PRODUCT_NAME); int numAvailable = checkInventory(); if (numAvailable >= quantity) { outMessage.setInt("Quantity", quantity); } else { outMessage.setInt("Quantity", numAvailable); } queueSender.send(outMessage); System.out.println(PRODUCT_NAME + " Supplier: sent " + outMessage.getInt("Quantity") + " " + outMessage.getString("Item") + "(s)"); queueSession.commit(); System.out.println(" " + PRODUCT_NAME + " Supplier: committed transaction"); } catch (Exception e) { System.out.println(PRODUCT_NAME + " Supplier: Exception occurred: " + e.toString()); e.printStackTrace(); exitResult = 1; } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { exitResult = 1; } } } } /** * Creates the Retailer and Vendor classes and the two supplier classes, * then starts the threads. * * @param quantity the quantity specified on the command line */ public static void run_threads(int quantity) { Retailer r = new Retailer(quantity); Vendor v = new Vendor(); GenericSupplier ms = new GenericSupplier("Monitor", monitorOrderQueueName); GenericSupplier ss = new GenericSupplier("Hard Drive", storageOrderQueueName); r.start(); v.start(); ms.start(); ss.start(); try { r.join(); v.join(); ms.join(); ss.join(); } catch (InterruptedException e) {} } /** * Reads the order quantity from the command line, then * calls the run_threads method to execute the program threads. * * @param args the quantity of computers being ordered */ public static void main(String[] args) { TransactedExample te = new TransactedExample(); int quantity = 0; if (args.length != 1) { System.out.println("Usage: java TransactedExample <integer>"); System.out.println("Program assumes five queues named A B C D E"); System.exit(1); } te.vendorOrderQueueName = new String("A"); te.retailerConfirmationQueueName = new String("B"); te.monitorOrderQueueName = new String("C"); te.storageOrderQueueName = new String("D"); te.vendorConfirmationQueueName = new String("E"); quantity = (new Integer(args[0])).intValue(); System.out.println("Quantity to be ordered is " + quantity); if (quantity > 0) { te.run_threads(quantity); } else { System.out.println("Quantity must be positive and nonzero"); te.exitResult = 1; } SampleUtilities.exit(te.exitResult); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -