📄 channelniosocket.java
字号:
}
public MBeanNotificationInfo[] getNotificationInfo() {
return notifInfo;
}
protected class SocketConnection implements ThreadPoolRunnable {
MsgContext ep;
MsgAjp recv = new MsgAjp(packetSize);
boolean inProgress = false;
SocketConnection(MsgContext ep) {
this.ep=ep;
}
public Object[] getInitData() {
return null;
}
public void runIt(Object perTh[]) {
if(!processConnection(ep)) {
unregister(ep);
}
}
public boolean isRunning() {
return inProgress;
}
public void setFinished() {
inProgress = false;
}
/** Process a single ajp connection.
*/
boolean processConnection(MsgContext ep) {
try {
InputStream sis = (InputStream)ep.getNote(isNote);
boolean haveInput = true;
while(haveInput) {
if( !running || paused ) {
return false;
}
int status= receive( recv, ep );
if( status <= 0 ) {
if( status==-3)
log.debug( "server has been restarted or reset this connection" );
else
log.warn("Closing ajp connection " + status );
return false;
}
ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
ep.setType( 0 );
// Will call next
status= invoke( recv, ep );
if( status != JkHandler.OK ) {
log.warn("processCallbacks status " + status );
return false;
}
synchronized(this) {
synchronized(sis) {
haveInput = sis.available() > 0;
}
if(!haveInput) {
setFinished();
} else {
if(log.isDebugEnabled())
log.debug("KeepAlive: "+sis.available());
}
}
}
} catch( Exception ex ) {
String msg = ex.getMessage();
if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
log.debug( "Server has been restarted or reset this connection");
else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
log.debug( "connection timeout reached");
else
log.error( "Error, processing connection", ex);
return false;
}
return true;
}
synchronized void process(SelectionKey sk) {
if(!sk.isValid()) {
return;
}
if(sk.isReadable()) {
SocketInputStream sis = (SocketInputStream)ep.getNote(isNote);
boolean isok = sis.readAvailable();
if(!inProgress) {
if(isok) {
if(sis.available() > 0 || !nioIsBroken){
inProgress = true;
tp.runIt(this);
}
} else {
unregister(ep);
return;
}
}
}
if(sk.isWritable()) {
Object os = ep.getNote(osNote);
synchronized(os) {
os.notify();
}
}
}
synchronized void unregister(MsgContext ep) {
try{
close(ep);
} catch(Exception e) {
log.error("Error closing connection", e);
}
try{
Request req = (Request)ep.getRequest();
if( req != null ) {
ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
if( roname != null ) {
Registry.getRegistry(null, null).unregisterComponent(roname);
}
req.getRequestProcessor().setGlobalProcessor(null);
}
} catch( Exception ee) {
log.error( "Error, releasing connection",ee);
}
}
void register(MsgContext ep) {
Socket s = (Socket)ep.getNote(socketNote);
try {
s.getChannel().register(selector, SelectionKey.OP_READ, this);
} catch(IOException iex) {
log.error("Unable to register connection",iex);
unregister(ep);
}
}
}
protected class Poller implements ThreadPoolRunnable {
Poller() {
}
public Object[] getInitData() {
return null;
}
public void runIt(Object perTh[]) {
while(running) {
try {
int ns = selector.select(serverTimeout);
if(log.isDebugEnabled())
log.debug("Selecting "+ns+" channels");
if(ns > 0) {
Set sels = selector.selectedKeys();
Iterator it = sels.iterator();
while(it.hasNext()) {
SelectionKey sk = (SelectionKey)it.next();
if(sk.isValid()) {
if(sk.isAcceptable()) {
acceptConnections();
} else {
SocketConnection sc = (SocketConnection)sk.attachment();
sc.process(sk);
}
} else {
sk.cancel();
}
it.remove();
}
}
} catch(ClosedSelectorException cse) {
log.debug("Selector is closed");
return;
} catch(CancelledKeyException cke) {
log.debug("Key Cancelled", cke);
} catch(IOException iex) {
log.warn("IO Error in select",iex);
} catch(Exception ex) {
log.warn("Error processing select",ex);
}
}
}
}
protected class SocketInputStream extends InputStream {
final int BUFFER_SIZE = 8200;
private ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
private SocketChannel channel;
private boolean blocking = false;
private boolean isClosed = false;
private volatile boolean dataAvailable = false;
SocketInputStream(SocketChannel channel) {
this.channel = channel;
buffer.limit(0);
}
public int available() {
return buffer.remaining();
}
public void mark(int readlimit) {
buffer.mark();
}
public boolean markSupported() {
return true;
}
public void reset() {
buffer.reset();
}
public synchronized int read() throws IOException {
if(!checkAvailable(1)) {
block(1);
}
return buffer.get();
}
private boolean checkAvailable(int nbyte) throws IOException {
if(isClosed) {
throw new ClosedChannelException();
}
return buffer.remaining() >= nbyte;
}
private int fill(int nbyte) throws IOException {
int rem = nbyte;
int read = 0;
boolean eof = false;
byte [] oldData = null;
if(buffer.remaining() > 0) {
// should rarely happen, so short-lived GC shouldn't hurt
// as much as allocating a long-lived buffer for this
if(log.isDebugEnabled())
log.debug("Saving old buffer: "+buffer.remaining());
oldData = new byte[buffer.remaining()];
buffer.get(oldData);
}
buffer.clear();
if(oldData != null) {
buffer.put(oldData);
}
while(rem > 0) {
int count = channel.read(buffer);
if(count < 0) {
eof = true;
break;
} else if(count == 0) {
log.debug("Failed to recieve signaled read: ");
break;
}
read += count;
rem -= count;
}
buffer.flip();
return eof ? -1 : read;
}
synchronized boolean readAvailable() {
if(blocking) {
dataAvailable = true;
notify();
} else if(dataAvailable) {
log.debug("Race Condition");
} else {
int nr=0;
try {
nr = fill(1);
} catch(ClosedChannelException cce) {
log.debug("Channel is closed",cce);
nr = -1;
} catch(IOException iex) {
log.warn("Exception processing read",iex);
nr = -1; // Can't handle this yet
}
if(nr < 0) {
isClosed = true;
notify();
return false;
} else if(nr == 0) {
if(!nioIsBroken) {
dataAvailable = (buffer.remaining() <= 0);
}
}
}
return true;
}
public int read(byte [] data) throws IOException {
return read(data, 0, data.length);
}
public synchronized int read(byte [] data, int offset, int len) throws IOException {
int olen = len;
while(!checkAvailable(len)) {
int avail = buffer.remaining();
if(avail > 0) {
buffer.get(data, offset, avail);
}
len -= avail;
offset += avail;
block(len);
}
buffer.get(data, offset, len);
return olen;
}
private void block(int len) throws IOException {
if(len <= 0) {
return;
}
if(!dataAvailable) {
blocking = true;
if(log.isDebugEnabled())
log.debug("Waiting for "+len+" bytes to be available");
try{
wait(socketTimeout);
}catch(InterruptedException iex) {
log.debug("Interrupted",iex);
}
blocking = false;
}
if(dataAvailable) {
dataAvailable = false;
if(fill(len) < 0) {
isClosed = true;
}
}
}
}
protected class SocketOutputStream extends OutputStream {
ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
SocketChannel channel;
SocketOutputStream(SocketChannel channel) {
this.channel = channel;
}
public void write(int b) throws IOException {
if(!checkAvailable(1)) {
flush();
}
buffer.put((byte)b);
}
public void write(byte [] data) throws IOException {
write(data, 0, data.length);
}
public void write(byte [] data, int offset, int len) throws IOException {
if(!checkAvailable(len)) {
flush();
}
buffer.put(data, offset, len);
}
public void flush() throws IOException {
buffer.flip();
while(buffer.hasRemaining()) {
int count = channel.write(buffer);
if(count == 0) {
synchronized(this) {
SelectionKey key = channel.keyFor(selector);
key.interestOps(SelectionKey.OP_WRITE);
if(log.isDebugEnabled())
log.debug("Blocking for channel write: "+buffer.remaining());
try {
wait();
} catch(InterruptedException iex) {
// ignore, since can't happen
}
key.interestOps(SelectionKey.OP_READ);
}
}
}
buffer.clear();
}
private boolean checkAvailable(int len) {
return buffer.remaining() >= len;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -