📄 jumpmessagedispatcherimpl.java
字号:
listeners.remove(messageType); } } } return listener; } public void cancelRegistration(Object registrationToken) throws IOException { ((RegistrationToken)registrationToken).cancelRegistration(); } private interface RegistrationToken { void cancelRegistration() throws IOException; } private static class HandlerRegistrationToken implements RegistrationToken { private final Listener listener; private final JUMPMessageHandler handler; // Don't allow cancelRegistration() to be called twice. private boolean canceled = false; public HandlerRegistrationToken ( Listener listener, JUMPMessageHandler handler) { this.listener = listener; this.handler = handler; } public void cancelRegistration () throws IOException { synchronized (this) { if (canceled) { throw new IllegalStateException( "Registration has already been canceled."); } canceled = true; } listener.removeHandler(handler); } } private static class DirectRegistrationToken implements RegistrationToken { private final DirectRegistration directRegistration; // Don't allow cancelRegistration() to be called twice. private boolean canceled = false; public DirectRegistrationToken (DirectRegistration directRegistration) { this.directRegistration = directRegistration; } public void cancelRegistration () { synchronized (this) { if (canceled) { throw new IllegalStateException( "Registration has already been canceled."); } canceled = true; } directRegistration.decrementUseCountMaybeClose(); } } private class DirectRegistration { private final String messageType; // useCount is incremented for every direct registration of // messageType and when a message receive begins, and // decremented when the registration is canceled or a message // read is finished. When the count falls to zero, the // low-level resources are freed, and the directRegistrations // mapping is removed, therefore this DirectRegistration can // never be used again to access the (freed) low-level // resources. private int useCount = 0; public DirectRegistration (String messageType) throws IOException { this.messageType = messageType; // Make sure we've got a receive queue for the messageType. jumpMessageQueueInterfaceImpl.reserve(messageType); } // Externally synchronized on lock. public void incrementUseCount () { useCount++; } public void decrementUseCountMaybeClose () { synchronized (lock) { useCount--; if (useCount == 0) { close(); directRegistrations.remove(messageType); } } } public void close () { // Tell the low-level code we're done with the message queue. jumpMessageQueueInterfaceImpl.unreserve(messageType); } } /* * Frequently asked questions: * 1. Why doesn't Listener extend Thread? Extending Thread would * put lots of unnecessary and inappropriate methods into its * API. It should keep control over those things to itself. * 2. How about implementing Runnable then? The fact that Listener * uses a Thread and/or Runnable is an implementation detail * and shouldn't be exposed in its API. The inner class * implementing Runnable keeps the implementation private. * Only those methods intended to be called from outside the * class itself are public. * 3. How can we get the thread to exit when it's blocking in * JUMPMessageReceiveQueue.receiveMessage()? * There are three choices: * 1. Make JUMPMessageReceiveQueue.receiveMessage() interruptible, * and interrupt the thread. We probably don't want to go there. * 2. Send a message that the thread will see and exit on. * This isn't as easy as it sounds since sending messages * may fail, e.g., if the Listener is processing messages * slowly and its queue has filled up. But in that case * it should exit after reading one of the "real" messages * whether our sentinel is sent/received or not. * 3. Periodically time out and check for exit. We do this, * it's simple and effective and doesn't need any extra low-level * support such as interrupt handling, although it doesn't stop * the thread immediately, and requires the thread to wake up * periodically. */ private class Listener { // Guarded by this. private final List handlers = new ArrayList(); private final String messageType; public Listener (String messageType) throws IOException { this.messageType = messageType; // Make sure we've got a receive queue for the messageType. jumpMessageQueueInterfaceImpl.reserve(messageType); } // Externally synchronized on lock. public void addHandler (JUMPMessageHandler handler) { handlers.add(handler); } public void removeHandler (JUMPMessageHandler handler) throws IOException { synchronized (lock) { handlers.remove(handler); if (handlers.isEmpty()) { // Wake up the listening thread so it can exit if // it finds handlers is still empty. jumpMessageQueueInterfaceImpl.unblock(messageType); } } } public void start () { Thread thread = new Thread( new Runnable() { public void run() { try { listen(); } finally { close(); } } }); thread.setName(this.getClass().getName() + ": " + messageType); thread.setDaemon(true); thread.start(); } public void close () { // Tell the low-level code we're done with the message queue. jumpMessageQueueInterfaceImpl.unreserve(messageType); } private void listen () { // FIXME We should either log Errors and RuntimeExceptions // and continue, or cleanup and make sure they're thrown. while (true) { try { JUMPMessage msg = doWaitForMessage(messageType, 0L); dispatchMessage(msg); } catch (JUMPUnblockedException e) { // This is normal. It's time to check for exit. } catch (JUMPTimedOutException e) { // This shouldn't happen. Handle like IOException. } catch (IOException e) { // Unexpected exception. e.printStackTrace(); } synchronized (lock) { if (handlers.isEmpty()) { // Remove ourselves from the map and exit. listeners.remove(messageType); break; } } } } // NOTE: Handlers should not be called while holding our // monitor since it can lead to inadvertent deadlocks. // However, not synchronizing on "this" here can result in // handlers being called even after they've been removed. // This is a generally accepted hazard of patterns like this. private void dispatchMessage(JUMPMessage msg) { JUMPMessageHandler[] handlersSnapshot; // Get a snapsot with the lock held. synchronized (lock) { handlersSnapshot = (JUMPMessageHandler[]) handlers.toArray(new JUMPMessageHandler[handlers.size()]); } // Call handlers with the lock released. for (int i = 0; i < handlersSnapshot.length; i++) { JUMPMessageHandler handler = handlersSnapshot[i]; try { handler.handleMessage(msg); } catch (RuntimeException e) { e.printStackTrace(); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -