📄 messagemanagerimpl.java
字号:
package com.ibm.aglets;/* * @(#)MessageManagerImpl.java * * IBM Confidential-Restricted * * OCO Source Materials * * 03L7246 (c) Copyright IBM Corp. 1996, 1998 * * The source code for this program is not published or otherwise * divested of its trade secrets, irrespective of what has been * deposited with the U.S. Copyright Office. */import com.ibm.aglet.Aglet;import com.ibm.aglet.Message;import com.ibm.aglet.MessageManager;import com.ibm.aglet.FutureReply;import com.ibm.aglet.ReplySet;import java.util.Vector;import java.util.Hashtable;import java.util.Stack;import java.io.ObjectOutputStream;import java.io.ObjectInputStream;import java.io.IOException;import org.aglets.log.*;// import com.ibm.awb.misc.Debug;/* * Priority * DONT_QUEUE -1 * REENTRANT_MESSAGE 12 * SYSTEM_MESSAGE 11 (onCreation, etc) * REQUEST_MESSAGE 10 *//** * The <tt>MessageManagerReplyImpl</tt> class is an implementation of * com.ibm.aglet.MessageManager interface. * * @version 1.30 $Date: 2002/01/19 22:10:43 $ * @author Mitsuru Oshima */final class MessageManagerImpl implements MessageManager, java.io.Serializable { static private LogCategory logCategory = LogInitializer.getCategory("com.ibm.aglets.MessageManagerImpl"); public static final int REENTRANT_PRIORITY = 12; public static final int SYSTEM_PRIORITY = 11; public static final int REQUEST_PRIORITY = 10; /* * Status */ static final int UNINITIALIZED = 0; static final int RUNNING = 1; static final int SUSPENDED = 2; static final int DEACTIVATED = 3; static final int DESTROYED = 4; /* * Status String */ static private String[] state_string = { "UNINITIALIZED", "RUNNING", "SUSPENDED", "DEACTIVATED", "DESTRYOED" }; transient private MessageQueue message_queue = new MessageQueue(); transient private MessageQueue waiting_queue = new MessageQueue(); static private Hashtable defaultPriorityTable = null; static { defaultPriorityTable = new Hashtable(); defaultPriorityTable.put(Message.CLONE, new Integer(REQUEST_PRIORITY)); defaultPriorityTable.put(Message.DISPOSE, new Integer(REQUEST_PRIORITY)); defaultPriorityTable.put(Message.DISPATCH, new Integer(REQUEST_PRIORITY)); defaultPriorityTable.put(Message.DEACTIVATE, new Integer(REQUEST_PRIORITY)); defaultPriorityTable.put(Message.REVERT, new Integer(REQUEST_PRIORITY)); } transient private MessageImpl owner = null; transient private Stack threadSpool = new Stack(); transient private LocalAgletRef ref; private Hashtable priorityTable = null; private Vector activationTable = null; int state = UNINITIALIZED; /* * public void close() { * synchronized(message_queue) { * closed = true; * } * } * * public boolean isEmpty() { * return message_queue.peek() == null; * } */ MessageManagerImpl(LocalAgletRef ref) { priorityTable = (Hashtable)defaultPriorityTable.clone(); this.ref = ref; } /* * Cancel all messages in the message queue */ void cancelMessagesInMessageQueue() { for (MessageImpl msg = message_queue.peek(); msg != null; msg = msg.next) { if (owner != msg) { msg.cancel("handler destroyed : message = " + msg.toString()); msg.destroy(); } } message_queue.removeAll(); } /* * Cancel all messages in the waiting queue * void cancelMiscMessages() { * int size = misc.size(); * MessageImpl msg; * for(int i=0; i<size; i++) { * msg = ((MessageImpl)misc.elementAt(i)); * msg.cancel("handler destroyed : message = " + msg.toString()); * msg.destroy(); * } * } */ /* * Cancel all messages in the waiting queue */ void cancelMessagesInWaitingQueue() { for (MessageImpl msg = waiting_queue.peek(); msg != null; msg = msg.next) { msg.cancel("handler destroyed : message = " + msg.toString()); msg.destroy(); } waiting_queue.removeAll(); } /* * Cancel the owner message */ void cancelOwnerMessage() { if (owner != null && isOwner() == false) { owner.cancel("handler destroyed : message = " + owner.toString()); owner.destroy(); owner = null; } } void deactivate() { synchronized (message_queue) { if (isSuspended() == false) { throw new IllegalArgumentException("Cannot deactivate"); } setState(DEACTIVATED); cancelOwnerMessage(); cancelMessagesInWaitingQueue(); } cancelMessagesInMessageQueue(); invalidateSpooledThreads(); } // // This have to be improved. // public void destroy() { MessageImpl msg; // Debug.check(); synchronized (message_queue) { if (isDestroyed()) { return; } setState(DESTROYED); ref = null; // Debug.check(); cancelOwnerMessage(); // invalidate all waiting messages // This must be in the synchronized(message_queue) block // @see notifyMessage // @see notifyAllMessages // Debug.check(); cancelMessagesInWaitingQueue(); } // invalidate all messages in the queue // This doesn't have to be in synchronized block because // no one can activate queued message // Debug.check(); cancelMessagesInMessageQueue(); // invalidate all threads // Debug.check(); invalidateSpooledThreads(); // Debug.check(); } public void exitMonitor() { synchronized (message_queue) { if (isOwner() == false) { throw new IllegalMonitorStateException("Current thread is not owner " + Thread.currentThread() + " != " + owner); } processNextMessage(); } } void exitMonitorIfOwner() { synchronized (message_queue) { if (isOwner()) { processNextMessage(); } } } LocalAgletRef getAgletRef() { return ref; } private void invalidateSpooledThreads() { while (threadSpool.empty() == false) { ((AgletThread)threadSpool.pop()).invalidate(); } } public boolean isDeactivated() { return state == DEACTIVATED; } public boolean isDestroyed() { return state == DESTROYED; } boolean isOwner() { // REMIND: owner may become null after the check if (owner != null && owner.thread == Thread.currentThread()) { return true; } else { return false; } } public boolean isRunning() { return state == RUNNING; } public boolean isSuspended() { return state == SUSPENDED; } public boolean isUninitialized() { return state == UNINITIALIZED; } /* package protected */ void kill() { setState(DESTROYED); ref = null; // Debug.check(); while (threadSpool.empty() == false) { ((AgletThread)threadSpool.pop()).stop(); } // Debug.check(); } public void notifyAllMessages() { MessageImpl notifier = null; synchronized (message_queue) { if (isOwner() == false) { throw new IllegalMonitorStateException("Current thread is not owner"); } notifier = owner; if (waiting_queue.peek() != null) { message_queue.insertAtTop(notifier); message_queue.insertAtTop(waiting_queue); waiting_queue.removeAll(); notifier.setWaiting(); processNextMessage(); } } notifier.doWait(); } public void notifyMessage() { MessageImpl notifier = null; MessageImpl waiting = null; synchronized (message_queue) { if (isOwner() == false) { throw new IllegalMonitorStateException("Current thread is not owner"); } notifier = owner; // remove waiting message from queue. waiting = waiting_queue.pop(); if (waiting != null) { // set the waiting message to the top of the queue // and put the notifier on the next of the queue message_queue.insertAtTop(notifier); message_queue.insertAtTop(waiting); notifier.setWaiting(); processNextMessage(); } } notifier.doWait(); } /* * Thread Management */ AgletThread popThread() { if (isDestroyed()) { System.out.println("should not happen"); return null; } synchronized (threadSpool) { if (threadSpool.empty()) { return ref.resourceManager.newAgletThread(this); // due to Fiji // return tm.newAgletThread(ref.threadGroup, this); } return (AgletThread)threadSpool.pop(); } } void postMessage(MessageImpl msg) { postMessage(msg, false); } /* * Post a message */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -