⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 asyncchannelmessenger.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2004-2007 Sun Microsystems, Inc.  All rights reserved. *   *  The Sun Project JXTA(TM) Software License *   *  Redistribution and use in source and binary forms, with or without  *  modification, are permitted provided that the following conditions are met: *   *  1. Redistributions of source code must retain the above copyright notice, *     this list of conditions and the following disclaimer. *   *  2. Redistributions in binary form must reproduce the above copyright notice,  *     this list of conditions and the following disclaimer in the documentation  *     and/or other materials provided with the distribution. *   *  3. The end-user documentation included with the redistribution, if any, must  *     include the following acknowledgment: "This product includes software  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology."  *     Alternately, this acknowledgment may appear in the software itself, if  *     and wherever such third-party acknowledgments normally appear. *   *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must  *     not be used to endorse or promote products derived from this software  *     without prior written permission. For written permission, please contact  *     Project JXTA at http://www.jxta.org. *   *  5. Products derived from this software may not be called "JXTA", nor may  *     "JXTA" appear in their name, without prior written permission of Sun. *   *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *   *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United  *  States and other countries. *   *  Please see the license information page at : *  <http://www.jxta.org/project/www/license.html> for instructions on use of  *  the license in source files. *   *  ==================================================================== *   *  This software consists of voluntary contributions made by many individuals  *  on behalf of Project JXTA. For more information on Project JXTA, please see  *  http://www.jxta.org. *   *  This license is based on the BSD license adopted by the Apache Foundation.  */package net.jxta.endpoint;import net.jxta.peergroup.PeerGroupID;import java.io.IOException;import java.io.InterruptedIOException;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;/** * Extends Channel Messenger behaviour to provide asynchronous message sending * via queuing. */public abstract class AsyncChannelMessenger extends ChannelMessenger {    /*     *  Logger     * private final static transient Logger LOG = Logger.getLogger(AsyncChannelMessenger.class.getName());     */    /**     * {@code true} if we have deliberately closed our one message input queue.     */    private boolean inputClosed = false;    /**     * {@code true} if we have deliberately stopped sending.     */    private boolean outputClosed = false;    /**     * Actions that we defer to after returning from event methods. In other     * words, they cannot be done with the lock held, or they require calling     * more event methods.     */    private enum DeferredAction {        /**         * No action deferred.         */        ACTION_NONE,        /**         * Must send the current message.         */        ACTION_SEND,        /**         * Must report failure to connect.         */        ACTION_CONNECT    }    /**     * The current deferred action.     */    private DeferredAction deferredAction = DeferredAction.ACTION_NONE;    /**     * The messages queue.     */    private final BlockingQueue<PendingMessage> queue;    /**     * State lock and engine.     */    private final AsyncChannelMessengerState stateMachine;    /**     * Our statemachine implementation; just connects the standard MessengerState action methods to     * this object.     */    private class AsyncChannelMessengerState extends MessengerState {        protected AsyncChannelMessengerState(boolean connected) {            super(connected);        }        /*         * The required action methods.         */        /**         * {@inheritDoc}         */        @Override        protected void connectAction() {            deferredAction = DeferredAction.ACTION_CONNECT;        }        /**         * {@inheritDoc}         */        @Override        protected void startAction() {            deferredAction = DeferredAction.ACTION_SEND;        }        /**         * {@inheritDoc}         */        @Override        protected void closeInputAction() {            // We're synchronized here. (invoked from stateMachine)            inputClosed = true;        }        /**         * {@inheritDoc}         */        @Override        protected void closeOutputAction() {            // We're synchronized here. (invoked from stateMachine)            outputClosed = true;        }        /**         * {@inheritDoc}         */        @Override        protected void failAllAction() {            // The queue is now closed, so we can rest assured that the last            // message is really the last one. This is a synchronous action. The            // state machine assumes that it is done when we return. There is no            // need to signal completion with an idleEvent.            PendingMessage theMsg;            while (true) {                theMsg = null;                synchronized (stateMachine) {                    theMsg = queue.poll();                }                if (theMsg == null) {                    return;                }                Message currentMsg = theMsg.msg;                Throwable currentFailure = theMsg.failure;                if (currentFailure == null) {                    currentFailure = new IOException("Messenger unexpectedly closed");                }                OutgoingMessageEvent event = new OutgoingMessageEvent(currentMsg, currentFailure);                currentMsg.setMessageProperty(Messenger.class, event);            }        }    }    /**     * The representation of a queued message. It is shared between this     * abstract class and any implementation.     */    protected static class PendingMessage {        final Message msg;        final String service;        final String param;        Throwable failure;        PendingMessage(Message msg, String service, String param) {            this.msg = msg;            this.service = service;            this.param = param;            this.failure = null;        }    }    /**     * Create a new AsyncChannelMessenger.     *     * @param baseAddress      The network address messages go to; regardless of     *                         service, param, or group.     * @param redirection      Group to which the messages must be redirected. This     *                         is used to implement the automatic group segregation which has become a     *                         de-facto standard. If not null, the unique portion of the specified     *                         groupID is prepended with {@link #InsertedServicePrefix} and inserted in     *                         every message's destination address in place of the the original service     *                         name, which gets shifted into the beginning of the service parameter. The     *                         opposite is done on arrival to restore the original destination address     *                         before the message is delivered to the listener in the the specified     *                         group. Messages that already bear a group redirection are not affected.     * @param origService      The default destination service for messages sent     *                         without specifying a different service.     * @param origServiceParam The default destination service parameter for     *                         messages sent without specifying a different service parameter.     * @param queueSize        the queue size that channels should have.     * @param connected        true if the channel is created in the connected state.     */    public AsyncChannelMessenger(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam, int queueSize, boolean connected) {        super(baseAddress, redirection, origService, origServiceParam);        stateMachine = new AsyncChannelMessengerState(connected);        queue = new ArrayBlockingQueue<PendingMessage>(queueSize);        // We synchronize our state with the sharedMessenger's stateMachine.        // Logic would dictate that we pass it to super(), but it is not itself        // constructed until super() returns. No way around it.        setStateLock(stateMachine);    }    /**     * {@inheritDoc}     */    public final void close() {        DeferredAction action;        synchronized (stateMachine) {            stateMachine.closeEvent();            action = eventCalled(true);        }        // We called an event. State may have changed.        notifyChange();        performDeferredAction(action);    }    /**     * This internal method does the common part of sending the message on     * behalf of both sendMessageN and sendMessageB.     * <p/>It is not quite possible to implement sendMessageB as a wrapper     * around sendMessageN without some internal cooperation. At least not in     * an efficient manner. sendMessageB must not set the message property:     * either it fails and throws, or it returns successfully and the property     * is set later. This is required so that messages can be retried when     * failing synchronously (through a blocking messenger typically, but the     * semantic has to be uniform).     * <p/>Each of sendMessageB and sendMessageN takes care of status reporting     * on its own terms.     *     * @param msg           the message to send     * @param rService      destination service     * @param rServiceParam destination param     * @return The outcome from that one attempt. {@code true} means done.     *         {@code false} means saturated.  When {@code true} is returned, it means     *         that the fate of the message will be decided asynchronously, so we do     *         not have any details, yet.     * @throws IOException          is thrown if this messenger is closed.     * @throws InterruptedException if interrupted     */    private boolean sendMessageCommon(Message msg, String rService, String rServiceParam) throws IOException, InterruptedException {        String service = effectiveService(rService);        String serviceParam = effectiveParam(rService, rServiceParam);        boolean queued = true;        boolean change = false;        DeferredAction action = DeferredAction.ACTION_NONE;        synchronized (stateMachine) {            if (inputClosed) {                throw new IOException("This messenger is closed. It cannot be used to send messages.");            }            boolean wasEmpty = queue.isEmpty();            if (queue.remainingCapacity() > 1) {                queue.put(new PendingMessage(msg, service, serviceParam));                // Still not saturated. If we weren't idle either, then nothing worth mentionning.                if (wasEmpty) {                    change = true;                    stateMachine.msgsEvent();                    action = eventCalled(false);                }            } else if (1 == queue.remainingCapacity()) {                queue.put(new PendingMessage(msg, service, serviceParam));                // Now saturated.                stateMachine.saturatedEvent();                action = eventCalled(false);                change = true;            } else {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -