📄 transactedexample.java
字号:
System.exit(1); } try { /* * Create receiver for vendor order queue, sender * for supplier order topic, and message to send * to suppliers. */ vendorOrderReceiver = session.createConsumer(vendorOrderQueue); supplierOrderProducer = session.createProducer(supplierOrderTopic); orderMessage = session.createMapMessage(); /* * Configure an asynchronous message listener to * process supplier replies to inquiries for * parts to fill order. Start delivery. */ vendorConfirmReceiver = asyncSession.createConsumer(vendorConfirmQueue); listener = new VendorMessageListener(asyncSession, 2); vendorConfirmReceiver.setMessageListener(listener); connection.start(); /* * Process orders in vendor order queue. * Use one transaction to receive order from * order queue and send message to suppliers' * order topic to order components to fulfill * the order placed with the vendor. */ while (true) { try { // Receive an order from a retailer. inMessage = vendorOrderReceiver.receive(); if (inMessage instanceof MapMessage) { vendorOrderMessage = (MapMessage) inMessage; } else { /* * Message is an end-of-message- * stream message from retailer. * Send similar messages to * suppliers, then break out of * processing loop. */ endOfMessageStream = session.createMessage(); endOfMessageStream.setJMSReplyTo(vendorConfirmQueue); supplierOrderProducer.send(endOfMessageStream); session.commit(); break; } /* * A real application would check an * inventory database and order only the * quantities needed. Throw an exception * every few times to simulate a database * concurrent-access exception and cause * a rollback. */ if (rgen.nextInt(3) == throwException) { throw new JMSException("Simulated " + "database concurrent access " + "exception"); } /* * Record retailer order as a pending * order. */ order = new Order(vendorOrderMessage); /* * Set order number and reply queue for * outgoing message. */ orderMessage.setInt("VendorOrderNumber", order.orderNumber); orderMessage.setJMSReplyTo(vendorConfirmQueue); quantity = vendorOrderMessage.getInt("Quantity"); System.out.println("Vendor: Retailer " + "ordered " + quantity + " " + vendorOrderMessage.getString("Item")); // Send message to supplier topic. // Item is not used by supplier. orderMessage.setString("Item", ""); orderMessage.setInt("Quantity", quantity); supplierOrderProducer.send(orderMessage); System.out.println("Vendor: ordered " + quantity + " monitor(s) and hard drive(s)"); // Commit session. session.commit(); System.out.println(" Vendor: " + "committed transaction 1"); } catch (JMSException e) { System.err.println("Vendor: " + "JMSException occurred: " + e.toString()); e.printStackTrace(); session.rollback(); System.err.println(" Vendor: rolled " + "back transaction 1"); } } // Wait till suppliers get back with answers. listener.monitor.waitTillDone(); } catch (JMSException e) { System.err.println("Vendor: Exception " + "occurred: " + e.toString()); e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } } } /** * The Order class represents a Retailer order placed with a * Vendor. It maintains a table of pending orders. */ public static class Order { private static Hashtable pendingOrders = new Hashtable(); private static int nextOrderNumber = 1; private static final int PENDING_STATUS = 1; private static final int CANCELLED_STATUS = 2; private static final int FULFILLED_STATUS = 3; int status; public final int orderNumber; public int quantity; // Original order from retailer public final MapMessage order; // Reply from supplier public MapMessage monitor = null; // Reply from supplier public MapMessage storage = null; /** * Constructor. Sets order number; sets order and * quantity from incoming message. Sets status to * pending, and adds order to hash table of pending * orders. * * @param order the message containing the order */ public Order(MapMessage order) { this.orderNumber = getNextOrderNumber(); this.order = order; try { this.quantity = order.getInt("Quantity"); } catch (JMSException je) { System.err.println("Unexpected error. Message " + "missing Quantity"); this.quantity = 0; } status = PENDING_STATUS; pendingOrders.put(new Integer(orderNumber), this); } /** * Returns the next order number and increments the * static variable that holds this value. * * @return the next order number */ private static int getNextOrderNumber() { int result = nextOrderNumber; nextOrderNumber++; return result; } /** * Returns the number of orders in the hash table. * * @return the number of pending orders */ public static int outstandingOrders() { return pendingOrders.size(); } /** * Returns the order corresponding to a given order * number. * * @param orderNumber the number of the requested order * @return the requested order */ public static Order getOrder(int orderNumber) { return (Order) pendingOrders.get(new Integer(orderNumber)); } /** * Called by the onMessage method of the * VendorMessageListener class to process a reply from * a supplier to the Vendor. * * @param component the message from the supplier * @return the order with updated status * information */ public Order processSubOrder(MapMessage component) { String itemName = null; // Determine which subcomponent this is. try { itemName = component.getString("Item"); } catch (JMSException je) { System.err.println("Unexpected exception. " + "Message missing Item"); } if (itemName.compareTo("Monitor") == 0) { monitor = component; } else if (itemName.compareTo("Hard Drive") == 0) { storage = component; } /* * If notification for all subcomponents has been * received, verify the quantities to compute if able * to fulfill order. */ if ((monitor != null) && (storage != null)) { try { if (quantity > monitor.getInt("Quantity")) { status = CANCELLED_STATUS; } else if (quantity > storage.getInt("Quantity")) { status = CANCELLED_STATUS; } else { status = FULFILLED_STATUS; } } catch (JMSException je) { System.err.println("Unexpected exception: " + je.toString()); status = CANCELLED_STATUS; } /* * Processing of order is complete, so remove it * from pending-order list. */ pendingOrders.remove(new Integer(orderNumber)); } return this; } /** * Determines if order status is pending. * * @return true if order is pending, false if not */ public boolean isPending() { return status == PENDING_STATUS; } /** * Determines if order status is cancelled. * * @return true if order is cancelled, false if not */ public boolean isCancelled() { return status == CANCELLED_STATUS; } /** * Determines if order status is fulfilled. * * @return true if order is fulfilled, false if not */ public boolean isFulfilled() { return status == FULFILLED_STATUS; } } /** * The VendorMessageListener class processes an order
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -