defaultconnectionpool.java
来自「一个java方面的消息订阅发送的源码」· Java 代码 · 共 644 行 · 第 1/2 页
JAVA
644 行
public void close() throws ResourceException {
ManagedConnectionAcceptor[] acceptors =
(ManagedConnectionAcceptor[]) _acceptors.toArray(
new ManagedConnectionAcceptor[0]);
_acceptors.clear();
for (int i = 0; i < acceptors.length; ++i) {
acceptors[i].close();
}
ManagedConnection[] connections =
(ManagedConnection[]) _entries.keySet().toArray(
new ManagedConnection[0]);
for (int i = 0; i < connections.length; ++i) {
connections[i].destroy();
}
_entries.clear();
_accepted.clear();
_connections.clear();
stopReaper();
}
/**
* Invoked when an acceptor receives an error.
*
* @param acceptor the acceptor which received the error
* @param throwable the error
*/
public void error(ManagedConnectionAcceptor acceptor,
Throwable throwable) {
_acceptors.remove(acceptor);
String uri = "<unknown>";
try {
uri = acceptor.getURI().toString();
} catch (ResourceException ignore) {
}
_log.error("Failed to accept connections on URI=" + uri,
throwable);
try {
acceptor.close();
} catch (ResourceException exception) {
if (_log.isDebugEnabled()) {
_log.debug("Failed to close acceptor, URI=" + uri, exception);
}
}
}
/**
* Sets the listener for caller events.
*
* @param listener the listener
*/
public void setCallerListener(CallerListener listener) {
_listener = listener;
}
/**
* Adds a connection to the pool. If the connection was created, a {@link
* ManagedConnectionHandle} will be returned, wrapping the supplied
* connection.
*
* @param connection the connection to add
* @param accepted if <code>true</code> the connection was accepted via an
* {@link ManagedConnectionAcceptor}, otherwise it was
* created via
* {@link ManagedConnectionFactory#createManagedConnection}
* @return the (possibly wrapped) connection
* @throws ResourceException if the connection cannot be added
*/
protected ManagedConnection add(ManagedConnection connection,
boolean accepted) throws ResourceException {
ManagedConnection result;
PoolEntry entry = new PoolEntry(connection, accepted);
_entries.put(connection, entry);
if (accepted) {
_accepted.add(connection);
result = connection;
} else {
_connections.add(connection);
ManagedConnection handle = new ManagedConnectionHandle(
connection, _resolver);
_handles.put(connection, handle);
result = handle;
}
ContextInvocationHandler handler = new ContextInvocationHandler(
_handler, _resolver);
try {
connection.setInvocationHandler(handler);
connection.setConnectionEventListener(this);
handler.setConnection(connection.getConnection());
} catch (ResourceException exception) {
try {
_log.debug("Failed to initialise connection, destroying",
exception);
connection.destroy();
} catch (ResourceException nested) {
_log.debug("Failed to destroy connection", nested);
} finally {
_entries.remove(connection);
if (accepted) {
_accepted.remove(connection);
} else {
_connections.remove(connection);
_handles.remove(connection);
}
}
// propagate the exception
throw exception;
}
// mark the connection as initialised and therefore available for
// reaping
entry.setInitialised();
startReaper();
return result;
}
/**
* Remove a connection from the pool.
*
* @param connection the connection to remove
*/
protected void remove(ManagedConnection connection) {
PoolEntry entry = (PoolEntry) _entries.remove(connection);
if (entry != null) {
if (entry.getAccepted()) {
_accepted.remove(connection);
} else {
_connections.remove(connection);
_handles.remove(connection);
}
URI remoteURI = null;
URI localURI = null;
try {
remoteURI = connection.getRemoteURI();
localURI = connection.getLocalURI();
} catch (ResourceException exception) {
_log.debug("Failed to get connection URIs", exception);
}
try {
connection.destroy();
} catch (ResourceException exception) {
_log.debug("Failed to destroy connection", exception);
}
if (remoteURI != null && localURI != null) {
notifyDisconnection(remoteURI, localURI);
}
} else {
_log.debug("ManagedConnection not found");
}
if (_entries.isEmpty()) {
stopReaper();
}
}
/**
* Notify of a disconnection.
*
* @param remoteURI the remote address that the client is calling from
* @param localURI the local address that the client is calling to
*/
private void notifyDisconnection(URI remoteURI, URI localURI) {
CallerListener listener = _listener;
if (listener != null) {
listener.disconnected(new CallerImpl(remoteURI, localURI));
}
}
/**
* Starts the reaper for dead/idle connections, if needed.
*/
private synchronized void startReaper() {
if (_daemon == null && _reapInterval > 0) {
_daemon = new ClockDaemon();
ThreadFactory creator =
new ThreadFactory(null, "ManagedConnectionReaper", false);
_daemon.setThreadFactory(creator);
_daemon.executePeriodically(_reapInterval, new Reaper(), false);
}
}
/**
* Stops the reaper for dead/idle connections, if needed.
*/
private synchronized void stopReaper() {
if (_daemon != null) {
_daemon.shutDown();
_daemon = null;
}
}
/**
* Helper class for reaping idle and dead connections.
*/
private class Reaper implements Runnable {
/**
* The time that the reaper last ran, in milliseconds.
*/
private long _lastReapTimestamp = 0;
/**
* Construct a new <code>Reaper</code>.
*/
public Reaper() {
_lastReapTimestamp = System.currentTimeMillis();
}
/**
* Run the reaper.
*/
public void run() {
try {
reapIdleConnections();
if (!done()) {
reapDeadConnections();
}
} catch (Throwable exception) {
_log.error(exception, exception);
}
_lastReapTimestamp = System.currentTimeMillis();
}
/**
* Reap idle connections.
*/
private void reapIdleConnections() {
Map.Entry[] entries = (Map.Entry[]) _handles.entrySet().toArray(
new Map.Entry[0]);
for (int i = 0; i < entries.length && !done(); ++i) {
Map.Entry entry = entries[i];
ManagedConnection connection =
(ManagedConnection) entry.getKey();
PoolEntry pooled = (PoolEntry) _entries.get(connection);
if (pooled != null && pooled.isInitialised()) {
ManagedConnectionHandle handle =
(ManagedConnectionHandle) entry.getValue();
if (handle.canDestroy() && idle(handle)) {
if (_log.isDebugEnabled()) {
try {
_log.debug("Reaping idle connection, URI="
+ connection.getRemoteURI()
+ ", local URI="
+ connection.getLocalURI());
} catch (ResourceException ignore) {
// do nothing
}
}
remove(connection);
}
}
}
}
/**
* Determines if a connection is idle.
*
* @param handle the handle to the underlying managed connection
* @return <code>true</code> if the underlying connection is idle,
* otherwise <code>false</code>
*/
private boolean idle(ManagedConnectionHandle handle) {
boolean result = false;
long timestamp = handle.getLastUsedTimestamp();
if (timestamp == 0) {
// connection not used yet. Update it, but don't reap it on this
// invocation.
handle.updateLastUsedTimestamp();
} else if (timestamp < _lastReapTimestamp) {
result = true;
}
return result;
}
/**
* Reap dead connections.
*/
private void reapDeadConnections() {
PoolEntry[] entries = (PoolEntry[]) _entries.values().toArray(
new PoolEntry[0]);
for (int i = 0; i < entries.length && !done(); ++i) {
PoolEntry entry = entries[i];
if (entry.isInitialised()) {
ManagedConnection connection = entry.getManagedConnection();
if (!connection.isAlive()) {
if (_log.isDebugEnabled()) {
try {
_log.debug("Reaping dead connection, URI="
+ connection.getRemoteURI()
+ ", local URI="
+ connection.getLocalURI());
} catch (ResourceException ignore) {
// do nothing
}
}
remove(connection);
}
}
}
}
/**
* Determines if the reaper should terminate, by chaecking the interrupt
* status of the current thread.
*
* @return <code>true</code> if the reaper should terminate
*/
private boolean done() {
return Thread.currentThread().isInterrupted();
}
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?