📄 threadedmessenger.java
字号:
} } catch (Throwable any) { if (LOG.isEnabledFor(Level.FATAL)) { LOG.fatal("Uncaught throwable in background thread", any); // Hope the next thread has more luck. It'll need it. } bgThread = null; } } private void deferAction(int action) { deferredAction = action; if (bgThread == null) { bgThread = new Thread( myThreadGroup, this, "ThreadedMessenger for " + getDestinationAddress() ); bgThread.setDaemon( true ); bgThread.start(); } } private int nextAction() { long quitAt = TimeUtils.toAbsoluteTimeMillis(THREAD_IDLE_DEAD); synchronized(stateMachine) { while (deferredAction == ACTION_NONE) { // Still nothing to do. Is it time to quit, or where we just awakened for nothing ? if (TimeUtils.toRelativeTimeMillis(quitAt) < 0) { // Ok. Time to quit. bgThread = null; return ACTION_NONE; } // We do not need to wakeup exactly on the dead line, so there's no need to // recompute the dead line. THREAD_IDLE_DEAD is comparatively short. try { stateMachine.wait(THREAD_IDLE_DEAD); } catch (InterruptedException ie) { // Only shutdown can force termination. Thread.interrupted(); // Whatever the urban legend says about this. } } int action = deferredAction; deferredAction = ACTION_NONE; return action; } } /** * Performs the ACTION_SEND deferred action: sends the messages in our channel queues until there's none left or * we are forced to stop by connection breakage. */ private void send() { ThreadedMessengerChannel theChannel = null; synchronized(stateMachine) { theChannel = (ThreadedMessengerChannel) activeChannels.peek(); if (theChannel == null) { // No notifyChange: this is defensive code. NotifyChange() should have been called already. stateMachine.idleEvent(); stateMachine.notifyAll(); return; } } while (true) { AsyncChannelMessenger.PendingMessage theMsg = theChannel.peek(); if (theMsg == null) { // done with that channel for now. (And it knows it). Move to the next channel. Actually // it should have been removed when we poped the last message, except if we went down upon sending it. // In that later case, we leave the channel queue as is so that we cannot have to report, idle // in the same time than down. synchronized(stateMachine) { activeChannels.pop(); theChannel = (ThreadedMessengerChannel) activeChannels.peek(); if (theChannel != null) { continue; // Nothing changes; we do not call msgsEvent because we never call saturatedEvent either. } // Done with all channels. We're now idle. stateMachine.idleEvent(); stateMachine.notifyAll(); } notifyChange(); return; } Message currentMsg = theMsg.msg; String currentService = theMsg.service; String currentParam = theMsg.param; try { sendMessageBImpl(currentMsg, currentService, currentParam); } catch(Throwable any) { // When the current message fails, we leave it in there. sendMessageBImpl does not report failures. So that we can retry if // applicable. It is up to us to report failures. See failall in AsyncChannel. However, there is a risk that a bad // message causes this messenger to go down repeatedly. We need some kind of safeguard. So, if there's already a failure // recorded for this message, we bounce it. synchronized(stateMachine) { if (theMsg.failure != null) { theChannel.pop(); currentMsg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(currentMsg, theMsg.failure) ); } else { theMsg.failure = any; } stateMachine.downEvent(); stateMachine.notifyAll(); } notifyChange(); return; } // Worked. Remove the message, Rotate the queues, get the next message from the next queue. synchronized(stateMachine) { theChannel.pop(); // Things are quite a bit simpler if there's a single still active channel, and it's frequent, so it's worth checking. boolean empty = (theChannel.peek() == null); if ((activeChannels.getCurrentInQueue() != 1) || empty) { activeChannels.pop(); if (! empty) { // We're not done with that channel. Put it back at the end activeChannels.push(theChannel); } // Get the next channel. theChannel = (ThreadedMessengerChannel) activeChannels.peek(); if (theChannel == null) { // Done with all channels. We're now idle. stateMachine.idleEvent(); stateMachine.notifyAll(); } } // else, just stick to that channel } if (theChannel == null) { notifyChange(); Thread.yield(); // We're about to go wait(). Yielding is a good bet. It is // very inexpenssive and may be all it takes to get a new job // queued. return; } } } /** * Performs the ACTION_CONNECT deferred action. Generates a down event if it does not work. */ private void connect() { boolean worked = connectImpl(); ThreadedMessengerChannel[] channels = null; synchronized(stateMachine) { if (worked) { // we can now get the logical destination from the underlying implementation (likely obtained from a transport // messenger) EndpointAddress effectiveLogicalDest = getLogicalDestinationImpl(); if (logicalDestination == null) { // We did not know what was supposed to be on the other side. Anything will do. logicalDestination = effectiveLogicalDest; stateMachine.upEvent(); channels = (ThreadedMessengerChannel[]) resolvingChannels.keySet().toArray(new ThreadedMessengerChannel[0]); resolvingChannels.clear(); } else if (logicalDestination.equals(effectiveLogicalDest)) { // Good. It's what we expected. stateMachine.upEvent(); channels = (ThreadedMessengerChannel[]) resolvingChannels.keySet().toArray(new ThreadedMessengerChannel[0]); resolvingChannels.clear(); } else { // Ooops, not what we wanted. Can't connect then. (force close the underlying cnx). closeImpl(); stateMachine.downEvent(); } } else { stateMachine.downEvent(); } stateMachine.notifyAll(); } // If it worked, we need to tell all the channels that were waiting for resolution. // If it did not work, the outcome depends upon what will happen after the down event. // It's ok to do that outside of sync. Channel.up may synchronize, but it never calls // this class while synchronized. if (channels != null) { int i = channels.length; while (i-->0) { channels[i].up(); } channels = null; } notifyChange(); } /* * Messenger API top level methods. */ /** * The endpoint service may call this to cause an orderly closure of its messengers. */ protected final void shutdown() { synchronized(stateMachine) { stateMachine.shutdownEvent(); stateMachine.notifyAll(); } notifyChange(); } /** * {@inheritDoc} */ public EndpointAddress getLogicalDestinationAddress() { // If it's not resolved, we can't know what the logical destination is, unless we had an expectation. // And if we had, the messenger will fail as soon as we discover that the expectation is wrong. // In most if not all cases, either we have an expectation, or the messenger comes already resolved. // Otherwise, if you need the logical destination, you must resolve first. We do not want this method // to be blocking. return logicalDestination; } /** * {@inheritDoc} */ public void close() { synchronized(stateMachine) { stateMachine.closeEvent(); stateMachine.notifyAll(); } notifyChange(); } /** * {@inheritDoc} * * <p/> In this case, this method is here out of principle but is not really expected to be invoked. The normal way * of using a ThreadedMessenger is through its channels. We do provide a default channel that all invokers that go around * channels will share. That could be usefull to send rare out of band messages for example. */ public final boolean sendMessageN( Message msg, String service, String serviceParam ) { // Need a default channel. synchronized(stateMachine) { // Can't do dbl check with current java memory model if (defaultChannel == null) { defaultChannel = new ThreadedMessengerChannel(getDestinationAddress(),null,null,null,channelQueueSize, false); } } return defaultChannel.sendMessageN( msg, service, serviceParam ); } /** * {@inheritDoc} * /** */ public final void sendMessageB( Message msg, String service, String serviceParam ) throws IOException { // Need a default channel. synchronized(stateMachine) { // Can't do dbl check with current java memory model if (defaultChannel == null) { defaultChannel = new ThreadedMessengerChannel(getDestinationAddress(),null,null,null,channelQueueSize, false); } } defaultChannel.sendMessageB( msg, service, serviceParam ); } private final boolean addToActiveChannels( ThreadedMessengerChannel channel ) { synchronized(stateMachine) { if (inputClosed) { return false; } activeChannels.push(channel); // There are items in the queue now. stateMachine.msgsEvent(); // We called an event. The state may have changed. Notify waiters. stateMachine.notifyAll(); } notifyChange(); return true; } private final void strongRefResolvingChannel( ThreadedMessengerChannel channel ) { // If, and only if, this channel is already among the resolving channels, add a strong ref // to it. This is invoked when a message is queued to that channel while it is still // resolving. However we must verify its presence in the resolvingChannels map: this method // may be called while the channel has been removed from the list, but has not been told // yet. synchronized(stateMachine) { if (resolvingChannels.containsKey(channel)) { resolvingChannels.put(channel, channel); } } } private final boolean addToResolvingChannels( ThreadedMessengerChannel channel ) { synchronized(stateMachine) { // If we're in a state where no resolution event will ever occur, we must not add anything to the list. if ((stateMachine.getState() & (RESOLVED | TERMINAL)) != 0) { return false; } // We use the weak map only for the weak part, not for the map part. resolvingChannels.put(channel, null); stateMachine.resolveEvent(); stateMachine.notifyAll(); } notifyChange(); return true; } /** * {@inheritDoc} */ public final void resolve() { synchronized(stateMachine) { stateMachine.resolveEvent(); stateMachine.notifyAll(); } notifyChange(); } /** * {@inheritDoc} */ public final int getState() { return stateMachine.getState(); } /** * {@inheritDoc} */ public Messenger getChannelMessenger( PeerGroupID redirection, String service, String serviceParam ) { // Our transport is always in the same group. If the channel's target group is the same, no group // redirection is ever needed. return new ThreadedMessengerChannel( getDestinationAddress(), homeGroupID.equals(redirection) ? null : redirection, service, serviceParam, channelQueueSize, (stateMachine.getState() & (RESOLVED & USABLE)) != 0 ); // are we happily resolved ? } /* * Abstract methods to be provided by implementor. These are fully expected * to be blocking and may be implemented by invoking transport blocking * methods, such as EndpointServiceImpl.getLocalTransportMessenger() or * <em>whateverTransportMessengerWasObtained</em>.sendMessageB(). Should the * underlying code be non-blocking, these impl methods must simulate it. If * it's not obvious to do, then this base class is not a good choice. */ /** * Close underlying connection. May fail current send. */ protected abstract void closeImpl(); /** * Make underlying connection. */ protected abstract boolean connectImpl(); /** * Sends message through underlying connection. */ protected abstract void sendMessageBImpl(Message msg, String service, String param) throws IOException; /** * Obtain the logical destination address from the implementer (which likely gets it from the transport messenger). * Might not work if unresolved, so use with care. */ protected abstract EndpointAddress getLogicalDestinationImpl();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -