📄 multiplexer.java
字号:
/**
* Redistribution and use of this software and associated documentation
* ("Software"), with or without modification, are permitted provided
* that the following conditions are met:
*
* 1. Redistributions of source code must retain copyright
* statements and notices. Redistributions must also contain a
* copy of this document.
*
* 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* 3. The name "Exolab" must not be used to endorse or promote
* products derived from this Software without prior written
* permission of Exoffice Technologies. For written permission,
* please contact info@exolab.org.
*
* 4. Products derived from this Software may not be called "Exolab"
* nor may "Exolab" appear in their names without prior written
* permission of Exoffice Technologies. Exolab is a registered
* trademark of Exoffice Technologies.
*
* 5. Due credit should be given to the Exolab Project
* (http://www.exolab.org/).
*
* THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
* NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
* OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
*
* $Id: Multiplexer.java,v 1.6 2005/06/04 14:36:14 tanderson Exp $
*/
package org.exolab.jms.net.multiplexer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ProtocolException;
import java.security.Principal;
import java.util.HashMap;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.exolab.jms.common.security.BasicPrincipal;
import org.exolab.jms.net.connector.Authenticator;
import org.exolab.jms.net.connector.ResourceException;
import org.exolab.jms.net.connector.SecurityException;
/**
* This class multiplexes data over a physical connection.
*
* @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
* @version $Revision: 1.6 $ $Date: 2005/06/04 14:36:14 $
*/
public class Multiplexer implements Constants, Runnable {
/**
* The listener to notify.
*/
private MultiplexerListener _listener;
/**
* If <code>true</code>, indicates that the multiplexer has been closed.
*/
private volatile boolean _closed;
/**
* The endpoint.
*/
private Endpoint _endpoint;
/**
* The endpoint's output stream.
*/
private DataOutputStream _out;
/**
* The endpoint's input stream.
*/
private DataInputStream _in;
/**
* The set of channels managed by this, keyed on channel identifier.
*/
private HashMap _channels = new HashMap();
/**
* The set of free channels, keyed on channel identifier.
*/
private LinkedList _free = new LinkedList();
/**
* If <code>true</code>, indicates that the physical connection was opened
* (client), rather than accepted (server). This is used in channel
* identifier generation
*/
private boolean _client = false;
/**
* The channel identifier seed.
*/
private int _seed = 0;
/**
* The thread pool for scheduling invocation requests.
*/
private PooledExecutor _pool;
/**
* The principal that owns the connection, or <code>null</code>,
* if this is an unauthenticated connection.
*/
private Principal _principal;
/**
* The sending and receiving buffer size, in bytes.
*/
private static final int BUFFER_SIZE = 2048;
/**
* The logger.
*/
private static final Log _log = LogFactory.getLog(Multiplexer.class);
/**
* Construct a new client-side <code>Multiplexer</code>.
*
* @param listener the multiplexer listener
* @param endpoint the endpoint to multiplex messages over
* @param principal the security principal
* @param pool thread pool for handling invocation requests
* @throws IOException if an I/O error occurs
* @throws SecurityException if connection is refused by the server
*/
public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
Principal principal, PooledExecutor pool)
throws IOException, SecurityException {
initialise(listener, endpoint, pool, true);
authenticate(principal);
}
/**
* Construct a new server-side <code>Multiplexer</code>.
*
* @param listener the multiplexer listener
* @param endpoint the endpoint to multiplex messages over
* @param authenticator the connection authenticator
* @param pool thread pool for handling invocation requests
* @throws IOException if an I/O error occurs
* @throws ResourceException if the authenticator cannot authenticate
*/
public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
Authenticator authenticator,
PooledExecutor pool)
throws IOException, ResourceException {
initialise(listener, endpoint, pool, false);
authenticate(authenticator);
}
/**
* Construct a new <code>Multiplexer</code>.
* <p/>
* This constructor is provided for subclasses that must perform setup
* work prior to invoking {@link #initialise}
*/
protected Multiplexer() {
}
/**
* Start multiplexing.
*/
public void run() {
while (!_closed) {
multiplex();
}
}
/**
* Returns a free channel from the pool, opening a new one if none are
* available.
*
* @return a free channel
* @throws IOException if an I/O error occurs
*/
public Channel getChannel() throws IOException {
Channel channel = null;
synchronized (_free) {
if (!_free.isEmpty()) {
channel = (Channel) _free.removeFirst();
}
}
if (channel == null) {
channel = open();
}
return channel;
}
/**
* Releases a channel back to the pool.
*
* @param channel the channel to release
*/
public void release(Channel channel) {
synchronized (_free) {
_free.add(channel);
}
}
/**
* Close a channel.
*
* @param channel the channel to close
* @throws IOException if an I/O error occurs
*/
public void close(Channel channel) throws IOException {
int channelId = channel.getId();
synchronized (_channels) {
_channels.remove(new Integer(channelId));
}
send(CLOSE, channelId);
}
/**
* Send a message.
*
* @param type the packet type
* @throws IOException if an I/O error occurs
*/
public void send(byte type) throws IOException {
synchronized (_out) {
_out.writeByte(type);
_out.flush();
if (_log.isDebugEnabled()) {
_log.debug("send(type=0x" + Integer.toHexString(type) + ")");
}
}
}
/**
* Send a message.
*
* @param type the packet type
* @param channelId the identifier of the channel sending the message
* @throws IOException if an I/O error occurs
*/
public void send(byte type, int channelId) throws IOException {
synchronized (_out) {
_out.writeByte(type);
_out.writeShort(channelId);
_out.flush();
if (_log.isDebugEnabled()) {
_log.debug("send(type=0x" + Integer.toHexString(type)
+ ", channel=" + channelId + ")");
}
}
}
/**
* Send a message.
*
* @param type the packet type
* @param channelId the identifier of the channel sending the message
* @param data the data to send
* @throws IOException if an I/O error occurs
*/
public void send(byte type, int channelId, int data) throws IOException {
synchronized (_out) {
_out.writeByte(type);
_out.writeShort(channelId);
_out.writeInt(data);
_out.flush();
if (_log.isDebugEnabled()) {
_log.debug("send(type=" + type + ", channel=" + channelId
+ ", data=" + Integer.toHexString(data) + ")");
}
}
}
/**
* Send a message.
*
* @param type the packet type
* @param channelId the identifier of the channel sending the message
* @param data the data to send
* @param offset the offset into the data
* @param length the length of data
* @throws IOException if an I/O error occurs
*/
public void send(byte type, int channelId, byte[] data, int offset,
int length) throws IOException {
synchronized (_out) {
_out.writeByte(type);
_out.writeShort(channelId);
_out.writeInt(length);
_out.write(data, offset, length);
_out.flush();
}
}
/**
* Close the multiplexer, releasing any resources. This closes the socket
* and waits for the thread to terminate.
*/
public void close() {
if (!_closed) {
_closed = true;
try {
send(SHUTDOWN);
} catch (IOException exception) {
_log.debug(exception);
}
try {
_endpoint.close();
} catch (IOException exception) {
_log.debug(exception);
}
// _pool.shutdownAfterProcessingCurrentlyQueuedTasks();
// @todo - as the pool is shared, need to block for
// tasks queued by this
}
}
/**
* Determines if the multiplexer is closed.
*
* @return <code>true</code> if the multiplexer is closed
*/
public boolean isClosed() {
return _closed;
}
/**
* Determines if this is a client-side instance.
*
* @return <code>true</code> if this is a client-side instance,
* <code>false</code> if it is a server=side instance
*/
public boolean isClient() {
return _client;
}
/**
* Returns the principal that owns the connection.
*
* @return the principal that owns the connection, or <code>null<code>
* if this is an unauthenticated connection
*/
public Principal getPrincipal() {
return _principal;
}
/**
* Initialise the multiplexer.
*
* @param listener the multiplexer listener
* @param endpoint the endpoint to multiplex messages over
* @param pool thread pool for handling ivocation requests
* @param client determines if this is a client-side or server-side
* instance
* @throws IOException if an I/O error occurs
*/
protected void initialise(MultiplexerListener listener, Endpoint endpoint,
PooledExecutor pool, boolean client)
throws IOException {
if (listener == null) {
throw new IllegalArgumentException("Argument 'listener' is null");
}
if (endpoint == null) {
throw new IllegalArgumentException("Argument 'endpoint' is null");
}
if (pool == null) {
throw new IllegalArgumentException("Argument 'pool' is null");
}
if (_log.isDebugEnabled()) {
_log.debug("Multiplexer(uri=" + endpoint.getURI()
+ ", client=" + client);
}
_listener = listener;
_endpoint = endpoint;
_pool = pool;
_out = new DataOutputStream(endpoint.getOutputStream());
_in = new DataInputStream(endpoint.getInputStream());
_client = client;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -