📄 virtualchannelselector.java
字号:
}
else {
selector_impl.pauseSelects( channel );
}
}
/**
* Resume selection operations for the given channel
* @param channel to resume
*/
public void resumeSelects( AbstractSelectableChannel channel ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "resume - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Iterator it = selectors.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
VirtualChannelSelectorImpl sel = (VirtualChannelSelectorImpl)entry.getKey();
ArrayList channels = (ArrayList)entry.getValue();
if( channels.contains( channel ) ) {
sel.resumeSelects( channel );
return;
}
}
System.out.println( "resumeSelects():: channel not found!" );
}
finally{ selectors_mon.exit(); }
}
else {
selector_impl.resumeSelects( channel );
}
}
/**
* Cancel the selection operations for the given channel.
* @param channel channel originally registered
*/
public void cancel( AbstractSelectableChannel channel ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "cancel - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Iterator it = selectors.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
VirtualChannelSelectorImpl sel = (VirtualChannelSelectorImpl)entry.getKey();
ArrayList channels = (ArrayList)entry.getValue();
if( channels.remove( channel ) ) {
sel.cancel( channel );
return;
}
}
}
finally{ selectors_mon.exit(); }
}
else {
if( selector_impl != null ) selector_impl.cancel( channel );
}
}
/**
* Run a virtual select() operation, with the given selection timeout value;
* (1) cancellations are processed (2) the select operation is performed; (3)
* listener notification of completed selects (4) new registrations are processed
* @param timeout in ms; if zero, block indefinitely
* @return number of sockets selected
*/
public int select(long timeout) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
boolean was_destroyed = destroyed;
try{
int count = 0;
for( Iterator it = selectors_keyset_cow.iterator(); it.hasNext(); ){
VirtualChannelSelectorImpl sel = (VirtualChannelSelectorImpl)it.next();
count += sel.select( timeout );
}
return count;
}finally{
if ( was_destroyed ){
// destruction process requires select op after destroy...
try{
selectors_mon.enter();
selectors.clear();
selectors_keyset_cow = new HashSet();
}finally{
selectors_mon.exit();
}
}
}
}
return selector_impl.select( timeout );
}
public void destroy()
{
destroyed = true;
if ( SAFE_SELECTOR_MODE_ENABLED ){
for( Iterator it = selectors_keyset_cow.iterator(); it.hasNext(); ) {
VirtualChannelSelectorImpl sel = (VirtualChannelSelectorImpl)it.next();
sel.destroy();
}
}else{
selector_impl.destroy();
}
}
public boolean
isDestroyed()
{
return( destroyed );
}
public boolean isSafeSelectionModeEnabled() { return SAFE_SELECTOR_MODE_ENABLED; }
public void enableSafeSelectionMode() {
if( !SAFE_SELECTOR_MODE_ENABLED ) {
SAFE_SELECTOR_MODE_ENABLED = true;
COConfigurationManager.setParameter( "network.tcp.enable_safe_selector_mode", true );
initSafeMode();
}
}
public boolean
selectSuccess(
VirtualAbstractSelectorListener listener,
AbstractSelectableChannel sc,
Object attachment )
{
if ( op == OP_ACCEPT ){
return(((VirtualAcceptSelectorListener)listener).selectSuccess( VirtualChannelSelector.this, (ServerSocketChannel)sc, attachment ));
}else{
return(((VirtualSelectorListener)listener).selectSuccess( VirtualChannelSelector.this, (SocketChannel)sc, attachment ));
}
}
public void
selectFailure(
VirtualAbstractSelectorListener listener,
AbstractSelectableChannel sc,
Object attachment,
Throwable msg)
{
if ( op == OP_ACCEPT ){
((VirtualAcceptSelectorListener)listener).selectFailure( VirtualChannelSelector.this, (ServerSocketChannel)sc, attachment, msg );
}else{
((VirtualSelectorListener)listener).selectFailure( VirtualChannelSelector.this, (SocketChannel)sc, attachment, msg );
}
}
public interface
VirtualAbstractSelectorListener
{
}
/**
* Listener for notification upon socket channel selection.
*/
public interface VirtualSelectorListener extends VirtualAbstractSelectorListener{
/**
* Called when a channel is successfully selected for readyness.
* @param attachment originally given with the channel's registration
* @return indicator of whether or not any 'progress' was made due to this select
* null -> progress made, String -> location of non progress
* e.g. read-select -> read >0 bytes, write-select -> wrote > 0 bytes
*/
public boolean selectSuccess(VirtualChannelSelector selector, SocketChannel sc, Object attachment);
/**
* Called when a channel selection fails.
* @param msg failure message
*/
public void selectFailure(VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg);
}
public interface VirtualAcceptSelectorListener extends VirtualAbstractSelectorListener{
/**
* Called when a channel is successfully selected for readyness.
* @param attachment originally given with the channel's registration
* @return indicator of whether or not any 'progress' was made due to this select
* e.g. read-select -> read >0 bytes, write-select -> wrote > 0 bytes
*/
public boolean selectSuccess(VirtualChannelSelector selector, ServerSocketChannel sc, Object attachment);
/**
* Called when a channel selection fails.
* @param msg failure message
*/
public void selectFailure(VirtualChannelSelector selector, ServerSocketChannel sc, Object attachment, Throwable msg);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -