⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 asyncchannelmessenger.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * * $Id: AsyncChannelMessenger.java,v 1.7 2006/05/25 00:41:33 hamada Exp $ * * Copyright (c) 2004-2006 Sun Microsystems, Inc.  All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright *    notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright *    notice, this list of conditions and the following disclaimer in *    the documentation and/or other materials provided with the *    distribution. * * 3. The end-user documentation included with the redistribution, *    if any, must include the following acknowledgment: *       "This product includes software developed by the *       Sun Microsystems, Inc. for Project JXTA." *    Alternately, this acknowledgment may appear in the software itself, *    if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *    must not be used to endorse or promote products derived from this *    software without prior written permission. For written *    permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", *    nor may "JXTA" appear in their name, without prior written *    permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA.  For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.endpoint;import java.io.IOException;import java.io.InterruptedIOException;import net.jxta.peergroup.PeerGroupID;import net.jxta.impl.util.UnbiasedQueue;import org.apache.log4j.Level;import org.apache.log4j.Logger;/** *  Extends Channel Messenger behaviour to provide asynchronous message sending *  via queuing. */public abstract class AsyncChannelMessenger extends ChannelMessenger {    /**     *  Log4J Logger     */    private final static transient Logger LOG = Logger.getLogger(AsyncChannelMessenger.class.getName());    /**     * {@code true} if we have deliberately closed our one message input queue.     */    private boolean inputClosed = false;    /**     * {@code true} if we have deliberately stopped sending.     */    private boolean outputClosed = false;    /*     * Actions that we defer to after returning from event methods. In other      * words, they cannot be done with the lock held, or they require calling     * more event methods.     */    /**     * No action deferred.     */    private static final int ACTION_NONE = 0;    /**     * Must send the current message.     */    private static final int ACTION_SEND = 1;    /**     * Must report failure to connect.     */    private static final int ACTION_CONNECT = 2;    /**     * The current deferred action.     */    private int deferredAction = ACTION_NONE;    /**     * The messages queue.     */    private final UnbiasedQueue queue;    /**     * State lock and engine.     */    private final AsyncChannelMessengerState stateMachine;    /**     * Our statemachine implementation; just connects the standard MessengerState action methods to     * this object.     */    private class AsyncChannelMessengerState extends MessengerState {        protected AsyncChannelMessengerState(boolean connected) {            super(connected);        }        /*         * The required action methods.         */        /**         *  {@inheritDoc}         */        protected void connectAction() {            deferredAction = ACTION_CONNECT;        }        /**         *  {@inheritDoc}         */        protected void startAction() {            deferredAction = ACTION_SEND;        }        /**         *  {@inheritDoc}         */        protected void closeInputAction() {            // We're synchronized here. (invoked from stateMachine)            inputClosed = true;        }        /**         *  {@inheritDoc}         */        protected void closeOutputAction() {            // We're synchronized here. (invoked from stateMachine)            outputClosed = true;        }        /**         *  {@inheritDoc}         */        protected void failAllAction() {            // The queue is now closed, so we can rest assured that the last            // message is really the last one. This is a synchronous action. The            // state machine assumes that it is done when we return. There is no            // need to signal completion with an idleEvent.            while (true) {                PendingMessage theMsg = null;                synchronized(stateMachine) {                    theMsg = (PendingMessage) queue.pop();                }                if (theMsg == null) {                    return;                }                Message currentMsg = theMsg.msg;                Throwable currentFailure = theMsg.failure;                if (currentFailure == null) {                    currentFailure = new IOException("Messenger unexpectedly closed");                }                OutgoingMessageEvent event =  new OutgoingMessageEvent(currentMsg, currentFailure);                currentMsg.setMessageProperty(Messenger.class, event);            }        }    }    /**     *  The representation of a queued message. It is shared between this      *  abstract class and any implementation.     */    protected static class PendingMessage {        final Message msg;        final String service;        final String param;        Throwable failure;        PendingMessage( Message msg, String service, String param ) {            this.msg = msg;            this.service = service;            this.param = param;            this.failure = null;        }    }    /**     *  Create a new AsyncChannelMessenger.     *     * @param baseAddress The network address messages go to; regardless of      * service, param, or group.     * @param redirection Group to which the messages must be redirected. This      * is used to implement the automatic group segregation which has become a      * de-facto standard. If not null, the unique portion of the specified      * groupID is prepended with {@link #InsertedServicePrefix} and inserted in     * every message's destination address in place of the the original service      * name, which gets shifted into the beginning of the service parameter. The      * opposite is done on arrival to restore the original destination address      * before the message is delivered to the listener in the the specified      * group. Messages that already bear a group redirection are not affected.     * @param origService The default destination service for messages sent      * without specifying a different service.     * @param origServiceParam The default destination service parameter for      * messages sent without specifying a different service parameter.     * @param queueSize the queue size that channels should have.     * @param connected true if the channel is created in the connected state.     */    public AsyncChannelMessenger(EndpointAddress baseAddress, PeerGroupID redirection,                                 String origService, String origServiceParam, int queueSize, boolean connected ) {        super(baseAddress, redirection, origService, origServiceParam);        stateMachine = new AsyncChannelMessengerState(connected);        queue = new UnbiasedQueue(queueSize, false);        // We synchronize our state with the sharedMessenger's stateMachine.        // Logic would dictate that we pass it to super(), but it is not itself        // constructed until super() returns. No way around it.        setStateLock(stateMachine);    }    /**     * {@inheritDoc}     */    public final void close() {        int action;        synchronized(stateMachine) {            stateMachine.closeEvent();            action = eventCalled(true);        }        // We called an event. State may have changed.        notifyChange();        performDeferredAction(action);    }    /**     *  This internal method does the common part of sending the message on      *  behalf of both sendMessageN and sendMessageB.     *     *  <p/>It is not quite possible to implement sendMessageB as a wrapper      *  around sendMessageN without some internal cooperation. At least not in      *  an efficient manner. sendMessageB must not set the message property:      *  either it fails and throws, or it returns successfully and the property      *  is set later. This is required so that messages can be retried when     *  failing synchronously (through a blocking messenger typically, but the      *  semantic has to be uniform).     *     *  <p/>Each of sendMessageB and sendMessageN takes care of status reporting      *  on its own terms.     *     *  @return The outcome from that one attempt. {@code true} means done.      *  {@code false} means saturated.  When {@code true} is returned, it means      *  that the fate of the message will be decided asynchronously, so we do      *  not have any details, yet.     *  @throws IOException is thrown if this messenger is closed.     */    private final boolean sendMessageCommon( Message msg, String rService, String rServiceParam ) throws IOException {        String service = effectiveService(rService);        String serviceParam = effectiveParam(rService, rServiceParam);        boolean queued = true;        boolean change = false;        int action = ACTION_NONE;        synchronized(stateMachine) {            if (inputClosed) {                throw new IOException("This messenger is closed. It cannot be used to send messages.");            }            int inq = queue.getCurrentInQueue();            int qsz = queue.getMaxQueueSize();            if ( inq < qsz - 1 ) {                queue.push(new PendingMessage(msg, service, serviceParam));                // Still not saturated. If we weren't idle either, then nothing                // worth mentionning.                if (inq == 0) {                    change = true;                    stateMachine.msgsEvent();                    action = eventCalled(false);                }            } else if (inq == qsz - 1) {                queue.push(new PendingMessage(msg, service, serviceParam));                // Now saturated.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -