📄 multiplexer.java
字号:
/** * Redistribution and use of this software and associated documentation * ("Software"), with or without modification, are permitted provided * that the following conditions are met: * * 1. Redistributions of source code must retain copyright * statements and notices. Redistributions must also contain a * copy of this document. * * 2. Redistributions in binary form must reproduce the * above copyright notice, this list of conditions and the * following disclaimer in the documentation and/or other * materials provided with the distribution. * * 3. The name "Exolab" must not be used to endorse or promote * products derived from this Software without prior written * permission of Exoffice Technologies. For written permission, * please contact info@exolab.org. * * 4. Products derived from this Software may not be called "Exolab" * nor may "Exolab" appear in their names without prior written * permission of Exoffice Technologies. Exolab is a registered * trademark of Exoffice Technologies. * * 5. Due credit should be given to the Exolab Project * (http://www.exolab.org/). * * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. * * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved. * * $Id: Multiplexer.java,v 1.9 2006/12/16 12:37:17 tanderson Exp $ */package org.exolab.jms.net.multiplexer;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.exolab.jms.common.security.BasicPrincipal;import org.exolab.jms.net.connector.Authenticator;import org.exolab.jms.net.connector.ResourceException;import org.exolab.jms.net.connector.SecurityException;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.IOException;import java.net.ProtocolException;import java.security.Principal;import java.util.HashMap;import java.util.LinkedList;/** * This class multiplexes data over a physical connection. * * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> * @version $Revision: 1.9 $ $Date: 2006/12/16 12:37:17 $ */public class Multiplexer implements Constants, Runnable { /** * The listener to notify. */ private MultiplexerListener _listener; /** * If <code>true</code>, indicates that the multiplexer has been closed. */ private volatile boolean _closed; /** * The endpoint. */ private Endpoint _endpoint; /** * The endpoint's output stream. */ private DataOutputStream _out; /** * The endpoint's input stream. */ private DataInputStream _in; /** * The set of channels managed by this, keyed on channel identifier. */ private final HashMap _channels = new HashMap(); /** * The set of free channels, keyed on channel identifier. */ private final LinkedList _free = new LinkedList(); /** * If <code>true</code>, indicates that the physical connection was opened * (client), rather than accepted (server). This is used in channel * identifier generation */ private boolean _client = false; /** * The channel identifier seed. */ private int _seed = 0; /** * The principal that owns the connection, or <code>null</code>, * if this is an unauthenticated connection. */ private Principal _principal; /** * The sending and receiving buffer size, in bytes. */ private static final int BUFFER_SIZE = 2048; /** * The logger. */ private static final Log _log = LogFactory.getLog(Multiplexer.class); /** * Construct a new client-side <code>Multiplexer</code>. * * @param listener the multiplexer listener * @param endpoint the endpoint to multiplex messages over * @param principal the security principal * @throws IOException if an I/O error occurs * @throws SecurityException if connection is refused by the server */ public Multiplexer(MultiplexerListener listener, Endpoint endpoint, Principal principal) throws IOException, SecurityException { initialise(listener, endpoint, true); authenticate(principal); } /** * Construct a new server-side <code>Multiplexer</code>. * * @param listener the multiplexer listener * @param endpoint the endpoint to multiplex messages over * @param authenticator the connection authenticator * @throws IOException if an I/O error occurs * @throws ResourceException if the authenticator cannot authenticate */ public Multiplexer(MultiplexerListener listener, Endpoint endpoint, Authenticator authenticator) throws IOException, ResourceException { initialise(listener, endpoint, false); authenticate(authenticator); } /** * Construct a new <code>Multiplexer</code>. * <p/> * This constructor is provided for subclasses that must perform setup * work prior to invoking {@link #initialise} */ protected Multiplexer() { } /** * Start multiplexing. */ public void run() { while (!_closed) { multiplex(); } } /** * Returns a free channel from the pool, opening a new one if none are * available. * * @return a free channel * @throws IOException if an I/O error occurs */ public Channel getChannel() throws IOException { Channel channel = null; synchronized (_free) { if (!_free.isEmpty()) { channel = (Channel) _free.removeFirst(); } } if (channel == null) { channel = open(); } return channel; } /** * Releases a channel back to the pool. * * @param channel the channel to release */ public void release(Channel channel) { synchronized (_free) { _free.add(channel); } } /** * Close a channel. * * @param channel the channel to close * @throws IOException if an I/O error occurs */ public void close(Channel channel) throws IOException { int channelId = channel.getId(); synchronized (_channels) { _channels.remove(new Integer(channelId)); } send(CLOSE, channelId); } /** * Send a message. * * @param type the packet type * @throws IOException if an I/O error occurs */ public void send(byte type) throws IOException { synchronized (_out) { _out.writeByte(type); _out.flush(); if (_log.isDebugEnabled()) { _log.debug("send(type=0x" + Integer.toHexString(type) + ")"); } } } /** * Send a message. * * @param type the packet type * @param channelId the identifier of the channel sending the message * @throws IOException if an I/O error occurs */ public void send(byte type, int channelId) throws IOException { synchronized (_out) { _out.writeByte(type); _out.writeShort(channelId); _out.flush(); if (_log.isDebugEnabled()) { _log.debug("send(type=0x" + Integer.toHexString(type) + ", channel=" + channelId + ")"); } } } /** * Send a message. * * @param type the packet type * @param channelId the identifier of the channel sending the message * @param data the data to send * @throws IOException if an I/O error occurs */ public void send(byte type, int channelId, int data) throws IOException { synchronized (_out) { _out.writeByte(type); _out.writeShort(channelId); _out.writeInt(data); _out.flush(); if (_log.isDebugEnabled()) { _log.debug("send(type=" + type + ", channel=" + channelId + ", data=" + Integer.toHexString(data) + ")"); } } } /** * Send a message. * * @param type the packet type * @param channelId the identifier of the channel sending the message * @param data the data to send * @param offset the offset into the data * @param length the length of data * @throws IOException if an I/O error occurs */ public void send(byte type, int channelId, byte[] data, int offset, int length) throws IOException { synchronized (_out) { _out.writeByte(type); _out.writeShort(channelId); _out.writeInt(length); _out.write(data, offset, length); _out.flush(); } } /** * Ping the connection. * * @param token the token to be returned in the reply * @throws IOException if an I/O error occurs */ public void ping(int token) throws IOException { synchronized (_out) { _out.writeByte(PING_REQUEST); _out.writeInt(token); _out.flush(); if (_log.isDebugEnabled()) { _log.debug("ping(token=" + token + ")"); } } } /** * Close the multiplexer, releasing any resources. This closes the socket * and waits for the thread to terminate. */ public void close() { if (!_closed) { _closed = true; try { send(SHUTDOWN); } catch (IOException exception) { _log.debug(exception); } try { _endpoint.close(); } catch (IOException exception) { _log.debug(exception); } // _pool.shutdownAfterProcessingCurrentlyQueuedTasks(); // @todo - as the pool is shared, need to block for // tasks queued by this } } /** * Determines if the multiplexer is closed. * * @return <code>true</code> if the multiplexer is closed */ public boolean isClosed() { return _closed; } /** * Determines if this is a client-side instance. * * @return <code>true</code> if this is a client-side instance, * <code>false</code> if it is a server=side instance */ public boolean isClient() { return _client; } /** * Returns the principal that owns the connection. * * @return the principal that owns the connection, or <code>null<code> * if this is an unauthenticated connection */ public Principal getPrincipal() { return _principal; } /** * Initialise the multiplexer. * * @param listener the multiplexer listener * @param endpoint the endpoint to multiplex messages over * @param client determines if this is a client-side or server-side * instance * @throws IOException if an I/O error occurs */ protected void initialise(MultiplexerListener listener, Endpoint endpoint, boolean client) throws IOException { if (listener == null) { throw new IllegalArgumentException("Argument 'listener' is null"); } if (endpoint == null) { throw new IllegalArgumentException("Argument 'endpoint' is null"); } if (_log.isDebugEnabled()) { _log.debug("Multiplexer(uri=" + endpoint.getURI() + ", client=" + client); } _listener = listener; _endpoint = endpoint; _out = new DataOutputStream(endpoint.getOutputStream()); _in = new DataInputStream(endpoint.getInputStream()); _client = client;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -