📄 multiplexedmanagedconnection.java
字号:
/**
* Destroys the physical connection.
*
* @throws ResourceException for any error
*/
public void destroy() throws ResourceException {
if (!_sharedPool && _pool != null) {
_pool.shutdownAfterProcessingCurrentlyQueuedTasks();
}
Multiplexer multiplexer;
Thread thread;
Endpoint endpoint;
synchronized (this) {
multiplexer = _multiplexer;
thread = _multiplexThread;
endpoint = _endpoint;
}
try {
if (multiplexer != null) {
// multiplexer handles endpoint closure
multiplexer.close();
if (thread != Thread.currentThread()) {
try {
// wait for the multiplexer thread to terminate
thread.join();
} catch (InterruptedException exception) {
_log.debug(exception);
}
}
} else {
if (endpoint != null) {
try {
endpoint.close();
} catch (IOException exception) {
throw new ResourceException("Failed to close endpoint",
exception);
}
}
}
} finally {
synchronized (this) {
_multiplexer = null;
_multiplexThread = null;
_endpoint = null;
}
}
}
/**
* Determines if the security principal that owns this connection is the
* same as that supplied.
* <p/>
* NOTE: If this is a server-side instance, the principal is only available
* once the connection has been established, by {@link
* #setInvocationHandler}
*
* @param principal the principal to compare. May be <code>null</code>.
* @return <code>true</code> if the principal that owns this connection is
* the same as <code>principal</code>
*/
public boolean hasPrincipal(Principal principal) {
boolean result = false;
if ((_principal != null && _principal.equals(principal))
|| (_principal == null && principal == null)) {
result = true;
}
return result;
}
/**
* Invoked for an invocation request.
*
* @param channel the channel the invocation is on
*/
public void request(Channel channel) {
_invoker.invoke(new ChannelInvocation(channel, getCaller()));
}
/**
* Invoked when the connection is closed by the peer.
*/
public void closed() {
notifyClosed();
}
/**
* Invoked when an error occurs on the multiplexer.
*
* @param error the error
*/
public void error(Throwable error) {
notifyError(error);
}
/**
* Invoke a method on a remote object.
*
* @param connection the connection invoking the request
* @param request the request
* @return the response
*/
protected Response invoke(Connection connection, Request request) {
Response response;
Multiplexer multiplexer;
synchronized (this) {
multiplexer = _multiplexer;
}
if (multiplexer != null) {
Channel channel = null;
try {
channel = multiplexer.getChannel();
response = channel.invoke(request);
channel.release();
} catch (Exception exception) {
_log.debug(exception, exception);
response = new Response(exception);
if (channel != null) {
channel.destroy();
}
}
} else {
response = new Response(new ResourceException("Connection lost"));
}
return response;
}
/**
* Creates the endpoint to multiplex data over.
*
* @return the endpoint to multiplex data over
* @throws IOException for any I/O error
*/
protected abstract Endpoint createEndpoint() throws IOException;
/**
* Create a new client-side multiplexer.
*
* @param endpoint the endpoint to multiplex messages over
* @param principal the security principal
* @param pool thread pool for handling invocation requests
* @return a new client-side multiplexer
* @throws IOException if an I/O error occurs
* @throws SecurityException if connection is refused by the server
*/
protected Multiplexer createMultiplexer(Endpoint endpoint,
Principal principal,
PooledExecutor pool)
throws IOException, SecurityException {
return new Multiplexer(this, endpoint, principal, pool);
}
/**
* Create a new server-side multiplexer.
*
* @param endpoint the endpoint to multiplex messages over
* @param authenticator the connection authetnicator
* @param pool thread pool for handling invocation requests
* @return a new server-side multiplexer
* @throws IOException if an I/O error occurs
* @throws ResourceException if the authenticator cannot authenticate
*/
protected Multiplexer createMultiplexer(Endpoint endpoint,
Authenticator authenticator,
PooledExecutor pool)
throws IOException, ResourceException {
return new Multiplexer(this, endpoint, authenticator, pool);
}
/**
* Returns the thread pool to handle invocation requests.
*
* @return the thread pool to handle invocation requests
*/
protected synchronized PooledExecutor getThreadPool() {
if (_pool == null) {
_pool = new ThreadPool(getThreadGroup(), getDisplayName(),
THREAD_POOL_SIZE);
}
return _pool;
}
/**
* Helper to determine if this is a client-side or server side instance.
*
* @return <code>true</code> if this is a client-side instance, otherwise
* <code>false</code>
*/
protected boolean isClient() {
return (_authenticator == null);
}
/**
* Helper to return an {@link Caller} instance, denoting the client
* performing a method invocation. Only applicable for server-side, and only
* after the multiplexer has been created.
*
* @return the caller instance, or <code>null</code> if it hasn't been
* initialised
*/
protected Caller getCaller() {
return _caller;
}
/**
* Returns the thread group to associate with allocated threads.
*
* @return the thread group to associate with allocated threads, or
* <code>null</code> to use the default thread group.
*/
protected synchronized ThreadGroup getThreadGroup() {
if (_group == null) {
_group = new ThreadGroup(getDisplayName());
}
return _group;
}
/**
* Helper to generate a descriptive name, for display purposes.
* <p/>
* This implementation returns the remote URI, concatenated with "[client]"
* if this is a client connection, or "[server]" if it is a server
* connection.
*
* @return the display name
*/
protected String getDisplayName() {
StringBuffer name = new StringBuffer();
URI uri = null;
try {
uri = getRemoteURI();
} catch (ResourceException ignore) {
if (_log.isDebugEnabled()) {
_log.debug("Failed to determine remote URI", ignore);
}
}
if (uri != null) {
name.append(uri.toString());
} else {
name.append("<unknown>");
}
if (isClient()) {
name.append("[client]");
} else {
name.append("[server]");
}
return name.toString();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -