⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadedmessenger.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            }        }        catch (Throwable any) {            if (LOG.isEnabledFor(Level.FATAL)) {                LOG.fatal("Uncaught throwable in background thread", any);                // Hope the next thread has more luck. It'll need it.            }            bgThread = null;        }    }    private void deferAction(int action) {        deferredAction = action;        if (bgThread == null) {            bgThread = new Thread( myThreadGroup, this,                                   "ThreadedMessenger for " + getDestinationAddress() );            bgThread.setDaemon( true );            bgThread.start();        }    }    private int nextAction() {        long quitAt = TimeUtils.toAbsoluteTimeMillis(THREAD_IDLE_DEAD);        synchronized(stateMachine) {            while (deferredAction == ACTION_NONE) {                // Still nothing to do. Is it time to quit, or where we just awakened for nothing ?                if (TimeUtils.toRelativeTimeMillis(quitAt) < 0) {                    // Ok. Time to quit.                    bgThread = null;                    return ACTION_NONE;                }                // We do not need to wakeup exactly on the dead line, so there's no need to                // recompute the dead line. THREAD_IDLE_DEAD is comparatively short.                try {                    stateMachine.wait(THREAD_IDLE_DEAD);                } catch (InterruptedException ie) {                    // Only shutdown can force termination.                    Thread.interrupted(); // Whatever the urban legend says about this.                }            }            int action = deferredAction;            deferredAction = ACTION_NONE;            return action;        }    }    /**     * Performs the ACTION_SEND deferred action: sends the messages in our channel queues until there's none left or     * we are forced to stop by connection breakage.     */    private void send() {        ThreadedMessengerChannel theChannel = null;        synchronized(stateMachine) {            theChannel = (ThreadedMessengerChannel) activeChannels.peek();            if (theChannel == null) {                // No notifyChange: this is defensive code. NotifyChange() should have been called already.                stateMachine.idleEvent();                stateMachine.notifyAll();                return;            }        }        while (true) {            AsyncChannelMessenger.PendingMessage theMsg = theChannel.peek();            if (theMsg == null) {                // done with that channel for now. (And it knows it). Move to the next channel. Actually                // it should have been removed when we poped the last message, except if we went down upon sending it.                // In that later case, we leave the channel queue as is so that we cannot have to report, idle                // in the same time than down.                synchronized(stateMachine) {                    activeChannels.pop();                    theChannel = (ThreadedMessengerChannel) activeChannels.peek();                    if (theChannel != null) {                        continue; // Nothing changes; we do not call msgsEvent because we never call saturatedEvent either.                    }                    // Done with all channels. We're now idle.                    stateMachine.idleEvent();                    stateMachine.notifyAll();                }                notifyChange();                return;            }            Message currentMsg = theMsg.msg;            String currentService = theMsg.service;            String currentParam = theMsg.param;            try {                sendMessageBImpl(currentMsg, currentService, currentParam);            } catch(Throwable any) {                // When the current message fails, we leave it in there. sendMessageBImpl does not report failures. So that we can retry if                // applicable. It is up to us to report failures. See failall in AsyncChannel. However, there is a risk that a bad                // message causes this messenger to go down repeatedly. We need some kind of safeguard. So, if there's already a failure                // recorded for this message, we bounce it.                synchronized(stateMachine) {                    if (theMsg.failure != null) {                        theChannel.pop();                        currentMsg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(currentMsg, theMsg.failure) );                    } else {                        theMsg.failure = any;                    }                    stateMachine.downEvent();                    stateMachine.notifyAll();                }                notifyChange();                return;            }            // Worked. Remove the message, Rotate the queues, get the next message from the next queue.            synchronized(stateMachine) {                theChannel.pop();                // Things are quite a bit simpler if there's a single still active channel, and it's frequent, so it's worth checking.                boolean empty = (theChannel.peek() == null);                if ((activeChannels.getCurrentInQueue() != 1) || empty) {                    activeChannels.pop();                    if (! empty) {                        // We're not done with that channel. Put it back at the end                        activeChannels.push(theChannel);                    }                    // Get the next channel.                    theChannel = (ThreadedMessengerChannel) activeChannels.peek();                    if (theChannel == null) {                        // Done with all channels. We're now idle.                        stateMachine.idleEvent();                        stateMachine.notifyAll();                    }                } // else, just stick to that channel            }            if (theChannel == null) {                notifyChange();                Thread.yield(); // We're about to go wait(). Yielding is a good bet. It is                // very inexpenssive and may be all it takes to get a new job                // queued.                return;            }        }    }    /**     * Performs the ACTION_CONNECT deferred action. Generates a down event if it does not work.     */    private void connect() {        boolean worked = connectImpl();        ThreadedMessengerChannel[] channels = null;        synchronized(stateMachine) {            if (worked) {                // we can now get the logical destination from the underlying implementation (likely obtained from a transport                // messenger)                EndpointAddress effectiveLogicalDest = getLogicalDestinationImpl();                if (logicalDestination == null) {                    // We did not know what was supposed to be on the other side. Anything will do.                    logicalDestination = effectiveLogicalDest;                    stateMachine.upEvent();                    channels = (ThreadedMessengerChannel[]) resolvingChannels.keySet().toArray(new ThreadedMessengerChannel[0]);                    resolvingChannels.clear();                } else if (logicalDestination.equals(effectiveLogicalDest)) {                    // Good. It's what we expected.                    stateMachine.upEvent();                    channels = (ThreadedMessengerChannel[]) resolvingChannels.keySet().toArray(new ThreadedMessengerChannel[0]);                    resolvingChannels.clear();                } else {                    // Ooops, not what we wanted. Can't connect then. (force close the underlying cnx).                    closeImpl();                    stateMachine.downEvent();                }            } else {                stateMachine.downEvent();            }            stateMachine.notifyAll();        }        // If it worked, we need to tell all the channels that were waiting for resolution.        // If it did not work, the outcome depends upon what will happen after the down event.        // It's ok to do that outside of sync. Channel.up may synchronize, but it never calls        // this class while synchronized.        if (channels != null) {            int i = channels.length;            while (i-->0) {                channels[i].up();            }            channels = null;        }        notifyChange();    }    /*     * Messenger API top level methods.     */    /**     * The endpoint service may call this to cause an orderly closure of its messengers.     */    protected final void shutdown() {        synchronized(stateMachine) {            stateMachine.shutdownEvent();            stateMachine.notifyAll();        }        notifyChange();    }    /**     * {@inheritDoc}     */    public EndpointAddress getLogicalDestinationAddress() {        // If it's not resolved, we can't know what the logical destination is, unless we had an expectation.        // And if we had, the messenger will fail as soon as we discover that the expectation is wrong.        // In most if not all cases, either we have an expectation, or the messenger comes already resolved.        // Otherwise, if you need the logical destination, you must resolve first. We do not want this method        // to be blocking.        return logicalDestination;    }    /**     * {@inheritDoc}     */    public void close() {        synchronized(stateMachine) {            stateMachine.closeEvent();            stateMachine.notifyAll();        }        notifyChange();    }    /**     * {@inheritDoc}     *     * <p/> In this case, this method is here out of principle but is not really expected to be invoked.  The normal way     * of using a ThreadedMessenger is through its channels. We do provide a default channel that all invokers that go around     * channels will share. That could be usefull to send rare out of band messages for example.     */    public final boolean sendMessageN( Message msg, String service, String serviceParam ) {        // Need a default channel.        synchronized(stateMachine) { // Can't do dbl check with current java memory model            if (defaultChannel == null) {                defaultChannel = new ThreadedMessengerChannel(getDestinationAddress(),null,null,null,channelQueueSize,                                 false);            }        }        return defaultChannel.sendMessageN( msg, service, serviceParam );    }    /**     *  {@inheritDoc}     * /**     */    public final void sendMessageB( Message msg, String service, String serviceParam ) throws IOException {        // Need a default channel.        synchronized(stateMachine) { // Can't do dbl check with current java memory model            if (defaultChannel == null) {                defaultChannel = new ThreadedMessengerChannel(getDestinationAddress(),null,null,null,channelQueueSize,                                 false);            }        }        defaultChannel.sendMessageB( msg, service, serviceParam );    }    private final boolean addToActiveChannels( ThreadedMessengerChannel channel ) {        synchronized(stateMachine) {            if (inputClosed) {                return false;            }            activeChannels.push(channel);            // There are items in the queue now.            stateMachine.msgsEvent();            // We called an event. The state may have changed. Notify waiters.            stateMachine.notifyAll();        }        notifyChange();        return true;    }    private final void strongRefResolvingChannel( ThreadedMessengerChannel channel ) {        // If, and only if, this channel is already among the resolving channels, add a strong ref        // to it. This is invoked when a message is queued to that channel while it is still        // resolving. However we must verify its presence in the resolvingChannels map: this method        // may be called while the channel has been removed from the list, but has not been told        // yet.        synchronized(stateMachine) {            if (resolvingChannels.containsKey(channel)) {                resolvingChannels.put(channel, channel);            }        }    }    private final boolean addToResolvingChannels( ThreadedMessengerChannel channel ) {        synchronized(stateMachine) {            // If we're in a state where no resolution event will ever occur, we must not add anything to the list.            if ((stateMachine.getState() & (RESOLVED | TERMINAL)) != 0) {                return false;            }            // We use the weak map only for the weak part, not for the map part.            resolvingChannels.put(channel, null);            stateMachine.resolveEvent();            stateMachine.notifyAll();        }        notifyChange();        return true;    }    /**     *  {@inheritDoc}     */    public final void resolve() {        synchronized(stateMachine) {            stateMachine.resolveEvent();            stateMachine.notifyAll();        }        notifyChange();    }    /**     *  {@inheritDoc}     */    public final int getState() {        return stateMachine.getState();    }    /**     *  {@inheritDoc}     */    public Messenger getChannelMessenger( PeerGroupID redirection, String service, String serviceParam ) {        // Our transport is always in the same group. If the channel's target group is the same, no group        // redirection is ever needed.        return new ThreadedMessengerChannel( getDestinationAddress(), homeGroupID.equals(redirection) ? null : redirection,                                             service, serviceParam, channelQueueSize,                                             (stateMachine.getState() & (RESOLVED & USABLE)) != 0 ); // are we happily resolved ?    }    /*     * Abstract methods to be provided by implementor. These are fully expected     * to be blocking and may be implemented by invoking transport blocking     * methods, such as EndpointServiceImpl.getLocalTransportMessenger() or     * <em>whateverTransportMessengerWasObtained</em>.sendMessageB(). Should the     * underlying code be non-blocking, these impl methods must simulate it. If     * it's not obvious to do, then this base class is not a good choice.     */    /**     * Close underlying connection. May fail current send.     */    protected abstract void closeImpl();    /**     * Make underlying connection.     */    protected abstract boolean connectImpl();    /**     * Sends message through underlying connection.     */    protected abstract void sendMessageBImpl(Message msg, String service, String param) throws IOException;    /**     * Obtain the logical destination address from the implementer (which likely gets it from the transport messenger).     * Might not work if unresolved, so use with care.     */    protected abstract EndpointAddress getLogicalDestinationImpl();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -