📄 messagemanagerimpl.java
字号:
private void postMessage(MessageImpl msg, boolean oneway) { logCategory.debug("postMessage()++"); int priority = NORM_PRIORITY; MessageImpl reentrantOwner = null; synchronized (message_queue) { if (isDestroyed()) { msg.cancel("MessageManager destroyed : message = " + msg.toString()); return; } else if (isDeactivated()) { if (activationTable != null && activationTable.contains(msg.getKind())) { try { ref.activate(); msg.enableDelegation(); ref.delegateMessage(msg); } catch (Exception ex) { ex.printStackTrace(); msg.cancel("MessageManager cannot activate aglet : message = " + msg.toString()); return; } } else { msg.cancel("MessageManager deactivated : message = " + msg.toString()); } return; } if (msg.getKind() != null) { Object o = priorityTable.get(msg.getKind()); if (o instanceof Integer) { priority = ((Integer)o).intValue(); } } else { priority = msg.priority; } /* * Not Queued */ if (priority < 0) { msg.activate(this); return; } if (isOwner() && oneway == false) { reentrantOwner = owner; // keep the original priority // and set the top priority priority = reentrantOwner.priority; reentrantOwner.priority = REENTRANT_PRIORITY; message_queue.insertAtTop(reentrantOwner); // // Reentrant message has top priority and // must be put on the top of queue. // msg.priority = REENTRANT_PRIORITY; message_queue.insertAtTop(msg); reentrantOwner.setWaiting(); processNextMessage(); } else { // // Normal message is put in the queue in accordance with // its priority // msg.priority = priority; message_queue.insert(msg); processNextMessageIfEmpty(); } } // // @see notifyMessage // if (reentrantOwner != null) { reentrantOwner.doWait(); // restore original priority reentrantOwner.priority = priority; } } /* * This have to be called from synchronized(message_queue) block */ private void processNextMessage() { // don't process any more if destroed // don't process if suspended // but can process next messages even if closed if (isRunning() == false) { return; } if (message_queue.peek() != null) { owner = message_queue.pop(); owner.activate(this); } else { owner = null; } } private void processNextMessageIfEmpty() { if (owner == null) { processNextMessage(); } } void pushMessage(MessageImpl msg) { postMessage(msg, true); } /* * void pushThreadAndExitMonitorIfOwner(AgletThread thread) { * synchronized(message_queue) { * synchronized(threadSpool) { * pushThread(thread); * if (isOwner()) { * processNextMessage(); * } * } * } * } */ void pushThread(AgletThread thread) { synchronized (threadSpool) { if (isDestroyed()) { thread.invalidate(); return; } threadSpool.push(thread); } } void pushThreadAndExitMonitorIfOwner(AgletThread thread) { synchronized (message_queue) { pushThread(thread); if (isOwner()) { processNextMessage(); } } } /* * MessageManagerImpl(java.io.ObjectInput in, LocalAgletRef ref) throws IOException, ClassNotFoundException { * priorityTable = (Hashtable)in.readObject(); * activationTable = (Vector)in.readObject(); * state = in.readInt(); * this.ref = ref; * } */ private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); // state = UNINITIALIZED; threadSpool = new Stack(); message_queue = new MessageQueue(); waiting_queue = new MessageQueue(); } void removeThread(AgletThread thread) { threadSpool.removeElement(thread); } public void resume() { // ThreadGroup group) { synchronized (message_queue) { if (isRunning() || isDestroyed()) { return; } setState(RUNNING); ref.resourceManager.resumeAllThreads(); processNextMessage(); // processNextMessageIfEmpty(); } } void setAgletRef(LocalAgletRef r) { ref = r; } public void setPriority(String kind, int priority) { if ((priority & ACTIVATE_AGLET) == ACTIVATE_AGLET) { if (activationTable == null) { activationTable = new Vector(); } if (activationTable.contains(kind) == false) { activationTable.addElement(kind); } priority = priority & 0xF; if (priority == 0) { // priority = NORM_PRIORITY; return; } } if (priority != NOT_QUEUED && (priority < MIN_PRIORITY || priority > MAX_PRIORITY)) { throw new IllegalArgumentException("illegal priority"); } priorityTable.put(kind, new Integer(priority)); // REMIND: re-sort the messages in the queue } /* synchronized (message_queue) */ void setState(int next) { switch (state) { case UNINITIALIZED: if (next == RUNNING || next == DEACTIVATED) { state = next; return; } break; case RUNNING: if (next == SUSPENDED) { state = next; return; } break; case SUSPENDED: if (next == DESTROYED || next == RUNNING || next == DEACTIVATED) { state = next; return; } break; case DEACTIVATED: if (next == RUNNING) { state = next; return; } break; case DESTROYED: default: // cannot move to any state! break; } throw new IllegalArgumentException("Cannot proceed from " + state_string[state] + " to " + state_string[next]); } /* * Start */ void start() { if (isUninitialized() == false) { throw new IllegalMonitorStateException("MessageManager not valid"); } synchronized (message_queue) { setState(RUNNING); processNextMessageIfEmpty(); } } public void suspend() { // ThreadGroup group) { synchronized (message_queue) { if (isSuspended() || isDestroyed()) { return; } setState(SUSPENDED); // to make sure that no other thread has entered in the // synchronous block. ref.resourceManager.suspendAllThreads(); } } /** * */ public String toString() { int i = 0; StringBuffer buff = new StringBuffer("Active queue\n"); MessageImpl tmp = message_queue.peek(); synchronized (message_queue) { i = 1; while (tmp != null) { buff.append(String.valueOf(i++)); buff.append(":"); buff.append(tmp.toString()); buff.append("\n"); tmp = tmp.next; } tmp = waiting_queue.peek(); buff.append("Waiting queue\n"); i = 1; while (tmp != null) { buff.append(String.valueOf(i++)); buff.append(":"); buff.append(tmp.toString()); buff.append("\n"); tmp = tmp.next; } } return buff.toString(); } public void waitMessage() { waitMessage(0); } public void waitMessage(long timeout) { MessageImpl wait = null; synchronized (message_queue) { if (isOwner() == false) { throw new IllegalMonitorStateException("Current thread is not owner"); } wait = owner; // put the owner message to the waiting queue waiting_queue.append(wait); wait.setWaiting(); processNextMessage(); } // wait outside of synchronized block to avoid dead lock if (timeout == 0) { // short cut wait.doWait(); } else { wait.doWait(timeout); synchronized (message_queue) { if (wait.isWaiting()) { // nobody didn't wake me up! waiting_queue.remove(wait); // REMIND: this must be improved to // consider priority message_queue.insertAtTop(wait); // kick processNextMessageIfEmpty(); // message_queue.notify(); } } // if still waiting, // wait again until the handler loop activates this message wait.doWait(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -