📄 transactedexample.java
字号:
} catch (JMSException ee) {} } System.exit(1); } try { /* * Create receiver for vendor order queue, sender for * supplier order queues, and message to send to suppliers. */ vendorOrderQueueReceiver = queueSession.createReceiver(vendorOrderQueue); monitorOrderQueueSender = queueSession.createSender(monitorOrderQueue); storageOrderQueueSender = queueSession.createSender(storageOrderQueue); orderMessage = queueSession.createMapMessage(); /* * Configure an asynchronous message listener to process * supplier replies to inquiries for parts to fill order. * Start delivery. */ vendorConfirmationQueueReceiver = asyncQueueSession.createReceiver(vendorConfirmationQueue); listener = new VendorMessageListener(asyncQueueSession, 2); vendorConfirmationQueueReceiver.setMessageListener(listener); queueConnection.start(); /* * Process orders in vendor order queue. * Use one transaction to receive order from order queue * and send messages to suppliers' order queues to order * components to fulfill the order placed with the vendor. */ while (true) { try { // Receive an order from a retailer. inMessage = vendorOrderQueueReceiver.receive(); if (inMessage instanceof MapMessage) { vendorOrderMessage = (MapMessage) inMessage; } else { /* * Message is an end-of-message-stream message from * retailer. Send similar messages to suppliers, * then break out of processing loop. */ endOfMessageStream = queueSession.createMessage(); endOfMessageStream.setJMSReplyTo(vendorConfirmationQueue); monitorOrderQueueSender.send(endOfMessageStream); storageOrderQueueSender.send(endOfMessageStream); queueSession.commit(); break; } /* * A real application would check an inventory database * and order only the quantities needed. Throw an * exception every few times to simulate a database * concurrent-access exception and cause a rollback. */ if (rgen.nextInt(3) == throwException) { throw new JMSException("Simulated database concurrent access exception"); } // Record retailer order as a pending order. order = new Order(vendorOrderMessage); /* * Set order number and reply queue for outgoing * message. */ orderMessage.setInt("VendorOrderNumber", order.orderNumber); orderMessage.setJMSReplyTo(vendorConfirmationQueue); quantity = vendorOrderMessage.getInt("Quantity"); System.out.println("Vendor: Retailer ordered " + quantity + " " + vendorOrderMessage.getString("Item")); // Send message to monitor supplier. orderMessage.setString("Item", "Monitor"); orderMessage.setInt("Quantity", quantity); monitorOrderQueueSender.send(orderMessage); System.out.println("Vendor: ordered " + quantity + " " + orderMessage.getString("Item") + "(s)"); /* * Reuse message to send to storage supplier, changing * only item name. */ orderMessage.setString("Item", "Hard Drive"); storageOrderQueueSender.send(orderMessage); System.out.println("Vendor: ordered " + quantity + " " + orderMessage.getString("Item") + "(s)"); // Commit session. queueSession.commit(); System.out.println(" Vendor: committed transaction 1"); } catch(JMSException e) { System.out.println("Vendor: JMSException occurred: " + e.toString()); e.printStackTrace(); queueSession.rollback(); System.out.println(" Vendor: rolled back transaction 1"); exitResult = 1; } } // Wait till suppliers get back with answers. listener.monitor.waitTillDone(); } catch (JMSException e) { System.out.println("Vendor: Exception occurred: " + e.toString()); e.printStackTrace(); exitResult = 1; } finally { if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { exitResult = 1; } } } } } /** * The Order class represents a Retailer order placed with a Vendor. * It maintains a table of pending orders. * * @author Joseph Fialli * @version 1.3, 08/18/00 */ public static class Order { private static Hashtable pendingOrders = new Hashtable(); private static int nextOrderNumber = 1; private static final int PENDING_STATUS = 1; private static final int CANCELLED_STATUS = 2; private static final int FULFILLED_STATUS = 3; int status; public final int orderNumber; public int quantity; public final MapMessage order; // original order from retailer public MapMessage monitor = null; // reply from supplier public MapMessage storage = null; // reply from supplier /** * Returns the next order number and increments the static variable * that holds this value. * * @return the next order number */ private static int getNextOrderNumber() { int result = nextOrderNumber; nextOrderNumber++; return result; } /** * Constructor. Sets order number; sets order and quantity from * incoming message. Sets status to pending, and adds order to hash * table of pending orders. * * @param order the message containing the order */ public Order(MapMessage order) { this.orderNumber = getNextOrderNumber(); this.order = order; try { this.quantity = order.getInt("Quantity"); } catch (JMSException je) { System.err.println("Unexpected error. Message missing Quantity"); this.quantity = 0; } status = PENDING_STATUS; pendingOrders.put(new Integer(orderNumber), this); } /** * Returns the number of orders in the hash table. * * @return the number of pending orders */ public static int outstandingOrders() { return pendingOrders.size(); } /** * Returns the order corresponding to a given order number. * * @param orderNumber the number of the requested order * @return the requested order */ public static Order getOrder(int orderNumber) { return (Order) pendingOrders.get(new Integer(orderNumber)); } /** * Called by the onMessage method of the VendorMessageListener class * to process a reply from a supplier to the Vendor. * * @param component the message from the supplier * @return the order with updated status information */ public Order processSubOrder(MapMessage component) { String itemName = null; // Determine which subcomponent this is. try { itemName = component.getString("Item"); } catch (JMSException je) { System.err.println("Unexpected exception. Message missing Item"); } if (itemName.compareTo("Monitor") == 0) { monitor = component; } else if (itemName.compareTo("Hard Drive") == 0 ) { storage = component; } /* * If notification for all subcomponents has been received, * verify the quantities to compute if able to fulfill order. */ if ( (monitor != null) && (storage != null) ) { try { if (quantity > monitor.getInt("Quantity")) { status = CANCELLED_STATUS; } else if (quantity > storage.getInt("Quantity")) { status = CANCELLED_STATUS; } else { status = FULFILLED_STATUS; } } catch (JMSException je) { System.err.println("Unexpected exception " + je); status = CANCELLED_STATUS; } /* * Processing of order is complete, so remove it from * pending-order list. */ pendingOrders.remove(new Integer(orderNumber)); } return this; } /** * Determines if order status is pending. * * @return true if order is pending, false if not */ public boolean isPending() { return status == PENDING_STATUS; } /** * Determines if order status is cancelled. * * @return true if order is cancelled, false if not */ public boolean isCancelled() { return status == CANCELLED_STATUS; } /** * Determines if order status is fulfilled. * * @return true if order is fulfilled, false if not */ public boolean isFulfilled() { return status == FULFILLED_STATUS; } } /** * The VendorMessageListener class processes an order confirmation message * from a supplier to the vendor. * <p> * It demonstrates the use of transactions within message listeners. * * @author Joseph Fialli * @version 1.3, 08/18/00 */ public static class VendorMessageListener implements MessageListener { final SampleUtilities.DoneLatch monitor = new SampleUtilities.DoneLatch(); private final QueueSession session; int numSuppliers; /** * Constructor. Instantiates the message listener with the session * of the consuming class (the vendor). * * @param qs the session of the consumer * @param numSuppliers the number of suppliers */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -