📄 channelniosocket.java
字号:
sSocket = ssc.socket();
sSocket.bind(iddr);
port=i;
break;
} catch( IOException ex ) {
if(log.isInfoEnabled())
log.info("Port busy " + i + " " + ex.toString());
sSocket = null;
}
}
if( sSocket==null ) {
log.error("Can't find free port " + startPort + " " + maxPort );
return;
}
if(log.isInfoEnabled())
log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
// If this is not the base port and we are the 'main' channleSocket and
// SHM didn't already set the localId - we'll set the instance id
if( "channelNioSocket".equals( name ) &&
port != startPort &&
(wEnv.getLocalId()==0) ) {
wEnv.setLocalId( port - startPort );
}
// XXX Reverse it -> this is a notification generator !!
if( next==null && wEnv!=null ) {
if( nextName!=null )
setNext( wEnv.getHandler( nextName ) );
if( next==null )
next=wEnv.getHandler( "dispatch" );
if( next==null )
next=wEnv.getHandler( "request" );
}
JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
running = true;
// Run a thread that will accept connections.
// XXX Try to find a thread first - not sure how...
if( this.domain != null ) {
try {
tpOName=new ObjectName(domain + ":type=ThreadPool,name=" +
getChannelName());
Registry.getRegistry(null, null)
.registerComponent(tp, tpOName, null);
rgOName = new ObjectName
(domain+":type=GlobalRequestProcessor,name=" + getChannelName());
Registry.getRegistry(null, null)
.registerComponent(global, rgOName, null);
} catch (Exception e) {
log.error("Can't register threadpool" );
}
}
tp.start();
Poller pollAjp = new Poller();
tp.runIt(pollAjp);
}
ObjectName tpOName;
ObjectName rgOName;
RequestGroupInfo global=new RequestGroupInfo();
int JMXRequestNote;
public void start() throws IOException{
if( sSocket==null )
init();
resume();
}
public void stop() throws IOException {
destroy();
}
public void registerRequest(Request req, MsgContext ep, int count) {
if(this.domain != null) {
try {
RequestInfo rp=req.getRequestProcessor();
rp.setGlobalProcessor(global);
ObjectName roname = new ObjectName
(getDomain() + ":type=RequestProcessor,worker="+
getChannelName()+",name=JkRequest" +count);
ep.setNote(JMXRequestNote, roname);
Registry.getRegistry(null, null).registerComponent( rp, roname, null);
} catch( Exception ex ) {
log.warn("Error registering request");
}
}
}
public void open(MsgContext ep) throws IOException {
}
public void close(MsgContext ep) throws IOException {
Socket s=(Socket)ep.getNote( socketNote );
SelectionKey key = s.getChannel().keyFor(selector);
if(key != null) {
key.cancel();
}
s.close();
}
public void destroy() throws IOException {
running = false;
try {
/* If we disabled the channel return */
if (port == 0)
return;
tp.shutdown();
selector.wakeup().close();
sSocket.close(); // XXX?
if( tpOName != null ) {
Registry.getRegistry(null, null).unregisterComponent(tpOName);
}
if( rgOName != null ) {
Registry.getRegistry(null, null).unregisterComponent(rgOName);
}
} catch(Exception e) {
log.info("Error shutting down the channel " + port + " " +
e.toString());
if( log.isDebugEnabled() ) log.debug("Trace", e);
}
}
public int send( Msg msg, MsgContext ep)
throws IOException {
msg.end(); // Write the packet header
byte buf[]=msg.getBuffer();
int len=msg.getLen();
if(log.isTraceEnabled() )
log.trace("send() " + len + " " + buf[4] );
OutputStream os=(OutputStream)ep.getNote( osNote );
os.write( buf, 0, len );
return len;
}
public int flush( Msg msg, MsgContext ep)
throws IOException {
OutputStream os=(OutputStream)ep.getNote( osNote );
os.flush();
return 0;
}
public int receive( Msg msg, MsgContext ep )
throws IOException {
if (log.isTraceEnabled()) {
log.trace("receive() ");
}
byte buf[]=msg.getBuffer();
int hlen=msg.getHeaderLength();
// XXX If the length in the packet header doesn't agree with the
// actual number of bytes read, it should probably return an error
// value. Also, callers of this method never use the length
// returned -- should probably return true/false instead.
int rd = this.read(ep, buf, 0, hlen );
if(rd < 0) {
// Most likely normal apache restart.
// log.warn("Wrong message " + rd );
return rd;
}
msg.processHeader();
/* After processing the header we know the body
length
*/
int blen=msg.getLen();
// XXX check if enough space - it's assert()-ed !!!
int total_read = 0;
total_read = this.read(ep, buf, hlen, blen);
if ((total_read <= 0) && (blen > 0)) {
log.warn("can't read body, waited #" + blen);
return -1;
}
if (total_read != blen) {
log.warn( "incomplete read, waited #" + blen +
" got only " + total_read);
return -2;
}
return total_read;
}
/**
* Read N bytes from the InputStream, and ensure we got them all
* Under heavy load we could experience many fragmented packets
* just read Unix Network Programming to recall that a call to
* read didn't ensure you got all the data you want
*
* from read() Linux manual
*
* On success, the number of bytes read is returned (zero indicates end
* of file),and the file position is advanced by this number.
* It is not an error if this number is smaller than the number of bytes
* requested; this may happen for example because fewer bytes
* are actually available right now (maybe because we were close to
* end-of-file, or because we are reading from a pipe, or from a
* terminal), or because read() was interrupted by a signal.
* On error, -1 is returned, and errno is set appropriately. In this
* case it is left unspecified whether the file position (if any) changes.
*
**/
public int read( MsgContext ep, byte[] b, int offset, int len)
throws IOException
{
InputStream is=(InputStream)ep.getNote( isNote );
int pos = 0;
int got;
while(pos < len) {
try {
got = is.read(b, pos + offset, len - pos);
} catch(ClosedChannelException sex) {
if(pos > 0) {
log.info("Error reading data after "+pos+"bytes",sex);
} else {
log.debug("Error reading data", sex);
}
got = -1;
}
if (log.isTraceEnabled()) {
log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
offset + " " + len + " = " + got );
}
// connection just closed by remote.
if (got <= 0) {
// This happens periodically, as apache restarts
// periodically.
// It should be more gracefull ! - another feature for Ajp14
// log.warn( "server has closed the current connection (-1)" );
return -3;
}
pos += got;
}
return pos;
}
protected boolean running=true;
/** Accept incoming connections, dispatch to the thread pool
*/
void acceptConnections() {
if( running ) {
try{
MsgContext ep=createMsgContext();
ep.setSource(this);
ep.setWorkerEnv( wEnv );
this.accept(ep);
if( !running ) return;
// Since this is a long-running connection, we don't care
// about the small GC
SocketConnection ajpConn=
new SocketConnection( ep);
ajpConn.register(ep);
}catch(Exception ex) {
if (running)
log.warn("Exception executing accept" ,ex);
}
}
}
// XXX This should become handleNotification
public int invoke( Msg msg, MsgContext ep ) throws IOException {
int type=ep.getType();
switch( type ) {
case JkHandler.HANDLE_RECEIVE_PACKET:
if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
return receive( msg, ep );
case JkHandler.HANDLE_SEND_PACKET:
return send( msg, ep );
case JkHandler.HANDLE_FLUSH:
return flush( msg, ep );
}
if( log.isTraceEnabled() )
log.trace("Call next " + type + " " + next);
// Send notification
if( nSupport!=null ) {
Notification notif=(Notification)ep.getNote(notifNote);
if( notif==null ) {
notif=new Notification("channelNioSocket.message", ep, requestCount );
ep.setNote( notifNote, notif);
}
nSupport.sendNotification(notif);
}
if( next != null ) {
return next.invoke( msg, ep );
} else {
log.info("No next ");
}
return OK;
}
public boolean isSameAddress(MsgContext ep) {
Socket s=(Socket)ep.getNote( socketNote );
return isSameAddress( s.getLocalAddress(), s.getInetAddress());
}
public String getChannelName() {
String encodedAddr = "";
if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
encodedAddr = getAddress();
if (encodedAddr.startsWith("/"))
encodedAddr = encodedAddr.substring(1);
encodedAddr = URLEncoder.encode(encodedAddr) + "-";
}
return ("jk-" + encodedAddr + port);
}
/**
* Return <code>true</code> if the specified client and server addresses
* are the same. This method works around a bug in the IBM 1.1.8 JVM on
* Linux, where the address bytes are returned reversed in some
* circumstances.
*
* @param server The server's InetAddress
* @param client The client's InetAddress
*/
public static boolean isSameAddress(InetAddress server, InetAddress client)
{
// Compare the byte array versions of the two addresses
byte serverAddr[] = server.getAddress();
byte clientAddr[] = client.getAddress();
if (serverAddr.length != clientAddr.length)
return (false);
boolean match = true;
for (int i = 0; i < serverAddr.length; i++) {
if (serverAddr[i] != clientAddr[i]) {
match = false;
break;
}
}
if (match)
return (true);
// Compare the reversed form of the two addresses
for (int i = 0; i < serverAddr.length; i++) {
if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
return (false);
}
return (true);
}
public void sendNewMessageNotification(Notification notification) {
if( nSupport!= null )
nSupport.sendNotification(notification);
}
private NotificationBroadcasterSupport nSupport= null;
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter,
Object handback)
throws IllegalArgumentException
{
if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
nSupport.addNotificationListener(listener, filter, handback);
}
public void removeNotificationListener(NotificationListener listener)
throws ListenerNotFoundException
{
if( nSupport!=null)
nSupport.removeNotificationListener(listener);
}
MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
public void setNotificationInfo( MBeanNotificationInfo info[]) {
this.notifInfo=info;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -