📄 channelsocket.java
字号:
public void registerRequest(Request req, MsgContext ep, int count) {
if(this.domain != null) {
try {
RequestInfo rp=req.getRequestProcessor();
rp.setGlobalProcessor(global);
ObjectName roname = new ObjectName
(getDomain() + ":type=RequestProcessor,worker="+
getChannelName()+",name=JkRequest" +count);
ep.setNote(JMXRequestNote, roname);
Registry.getRegistry(null, null).registerComponent( rp, roname, null);
} catch( Exception ex ) {
log.warn("Error registering request");
}
}
}
public void open(MsgContext ep) throws IOException {
}
public void close(MsgContext ep) throws IOException {
Socket s=(Socket)ep.getNote( socketNote );
s.close();
}
private void unLockSocket() throws IOException {
// Need to create a connection to unlock the accept();
Socket s;
InetAddress ladr = inet;
if(port == 0)
return;
if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
ladr = InetAddress.getLocalHost();
}
s=new Socket(ladr, port );
// setting soLinger to a small value will help shutdown the
// connection quicker
s.setSoLinger(true, 0);
s.close();
}
public void destroy() throws IOException {
running = false;
try {
/* If we disabled the channel return */
if (port == 0)
return;
tp.shutdown();
if(!paused) {
unLockSocket();
}
sSocket.close(); // XXX?
if( tpOName != null ) {
Registry.getRegistry(null, null).unregisterComponent(tpOName);
}
if( rgOName != null ) {
Registry.getRegistry(null, null).unregisterComponent(rgOName);
}
} catch(Exception e) {
log.info("Error shutting down the channel " + port + " " +
e.toString());
if( log.isDebugEnabled() ) log.debug("Trace", e);
}
}
public int send( Msg msg, MsgContext ep)
throws IOException {
msg.end(); // Write the packet header
byte buf[]=msg.getBuffer();
int len=msg.getLen();
if(log.isTraceEnabled() )
log.trace("send() " + len + " " + buf[4] );
OutputStream os=(OutputStream)ep.getNote( osNote );
os.write( buf, 0, len );
return len;
}
public int flush( Msg msg, MsgContext ep)
throws IOException {
if( bufferSize > 0 ) {
OutputStream os=(OutputStream)ep.getNote( osNote );
os.flush();
}
return 0;
}
public int receive( Msg msg, MsgContext ep )
throws IOException {
if (log.isDebugEnabled()) {
log.debug("receive() ");
}
byte buf[]=msg.getBuffer();
int hlen=msg.getHeaderLength();
// XXX If the length in the packet header doesn't agree with the
// actual number of bytes read, it should probably return an error
// value. Also, callers of this method never use the length
// returned -- should probably return true/false instead.
int rd = this.read(ep, buf, 0, hlen );
if(rd < 0) {
// Most likely normal apache restart.
// log.warn("Wrong message " + rd );
return rd;
}
msg.processHeader();
/* After processing the header we know the body
length
*/
int blen=msg.getLen();
// XXX check if enough space - it's assert()-ed !!!
int total_read = 0;
total_read = this.read(ep, buf, hlen, blen);
if ((total_read <= 0) && (blen > 0)) {
log.warn("can't read body, waited #" + blen);
return -1;
}
if (total_read != blen) {
log.warn( "incomplete read, waited #" + blen +
" got only " + total_read);
return -2;
}
return total_read;
}
/**
* Read N bytes from the InputStream, and ensure we got them all
* Under heavy load we could experience many fragmented packets
* just read Unix Network Programming to recall that a call to
* read didn't ensure you got all the data you want
*
* from read() Linux manual
*
* On success, the number of bytes read is returned (zero indicates end
* of file),and the file position is advanced by this number.
* It is not an error if this number is smaller than the number of bytes
* requested; this may happen for example because fewer bytes
* are actually available right now (maybe because we were close to
* end-of-file, or because we are reading from a pipe, or from a
* terminal), or because read() was interrupted by a signal.
* On error, -1 is returned, and errno is set appropriately. In this
* case it is left unspecified whether the file position (if any) changes.
*
**/
public int read( MsgContext ep, byte[] b, int offset, int len)
throws IOException {
InputStream is=(InputStream)ep.getNote( isNote );
int pos = 0;
int got;
while(pos < len) {
try {
got = is.read(b, pos + offset, len - pos);
} catch(SocketException sex) {
if(pos > 0) {
log.info("Error reading data after "+pos+"bytes",sex);
} else {
log.debug("Error reading data", sex);
}
got = -1;
}
if (log.isTraceEnabled()) {
log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
offset + " " + len + " = " + got );
}
// connection just closed by remote.
if (got <= 0) {
// This happens periodically, as apache restarts
// periodically.
// It should be more gracefull ! - another feature for Ajp14
// log.warn( "server has closed the current connection (-1)" );
return -3;
}
pos += got;
}
return pos;
}
protected boolean running=true;
/** Accept incoming connections, dispatch to the thread pool
*/
void acceptConnections() {
if( log.isDebugEnabled() )
log.debug("Accepting ajp connections on " + port);
while( running ) {
try{
MsgContext ep=createMsgContext(packetSize);
ep.setSource(this);
ep.setWorkerEnv( wEnv );
this.accept(ep);
if( !running ) break;
// Since this is a long-running connection, we don't care
// about the small GC
SocketConnection ajpConn=
new SocketConnection(this, ep);
tp.runIt( ajpConn );
}catch(Exception ex) {
if (running)
log.warn("Exception executing accept" ,ex);
}
}
}
/** Process a single ajp connection.
*/
void processConnection(MsgContext ep) {
try {
MsgAjp recv=new MsgAjp(packetSize);
while( running ) {
if(paused) { // Drop the connection on pause
break;
}
int status= this.receive( recv, ep );
if( status <= 0 ) {
if( status==-3)
log.debug( "server has been restarted or reset this connection" );
else
log.warn("Closing ajp connection " + status );
break;
}
ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
ep.setType( 0 );
// Will call next
status= this.invoke( recv, ep );
if( status!= JkHandler.OK ) {
log.warn("processCallbacks status " + status );
break;
}
}
} catch( Exception ex ) {
String msg = ex.getMessage();
if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
log.debug( "Server has been restarted or reset this connection");
else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
log.debug( "connection timeout reached");
else
log.error( "Error, processing connection", ex);
} finally {
/*
* Whatever happened to this connection (remote closed it, timeout, read error)
* the socket SHOULD be closed, or we may be in situation where the webserver
* will continue to think the socket is still open and will forward request
* to tomcat without receiving ever a reply
*/
try {
this.close( ep );
}
catch( Exception e) {
log.error( "Error, closing connection", e);
}
try{
Request req = (Request)ep.getRequest();
if( req != null ) {
ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
if( roname != null ) {
Registry.getRegistry(null, null).unregisterComponent(roname);
}
req.getRequestProcessor().setGlobalProcessor(null);
}
} catch( Exception ee) {
log.error( "Error, releasing connection",ee);
}
}
}
// XXX This should become handleNotification
public int invoke( Msg msg, MsgContext ep ) throws IOException {
int type=ep.getType();
switch( type ) {
case JkHandler.HANDLE_RECEIVE_PACKET:
if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
return receive( msg, ep );
case JkHandler.HANDLE_SEND_PACKET:
return send( msg, ep );
case JkHandler.HANDLE_FLUSH:
return flush( msg, ep );
}
if( log.isDebugEnabled() )
log.debug("Call next " + type + " " + next);
// Send notification
if( nSupport!=null ) {
Notification notif=(Notification)ep.getNote(notifNote);
if( notif==null ) {
notif=new Notification("channelSocket.message", ep, requestCount );
ep.setNote( notifNote, notif);
}
nSupport.sendNotification(notif);
}
if( next != null ) {
return next.invoke( msg, ep );
} else {
log.info("No next ");
}
return OK;
}
public boolean isSameAddress(MsgContext ep) {
Socket s=(Socket)ep.getNote( socketNote );
return isSameAddress( s.getLocalAddress(), s.getInetAddress());
}
public String getChannelName() {
String encodedAddr = "";
if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
encodedAddr = getAddress();
if (encodedAddr.startsWith("/"))
encodedAddr = encodedAddr.substring(1);
encodedAddr = URLEncoder.encode(encodedAddr) + "-";
}
return ("jk-" + encodedAddr + port);
}
/**
* Return <code>true</code> if the specified client and server addresses
* are the same. This method works around a bug in the IBM 1.1.8 JVM on
* Linux, where the address bytes are returned reversed in some
* circumstances.
*
* @param server The server's InetAddress
* @param client The client's InetAddress
*/
public static boolean isSameAddress(InetAddress server, InetAddress client)
{
// Compare the byte array versions of the two addresses
byte serverAddr[] = server.getAddress();
byte clientAddr[] = client.getAddress();
if (serverAddr.length != clientAddr.length)
return (false);
boolean match = true;
for (int i = 0; i < serverAddr.length; i++) {
if (serverAddr[i] != clientAddr[i]) {
match = false;
break;
}
}
if (match)
return (true);
// Compare the reversed form of the two addresses
for (int i = 0; i < serverAddr.length; i++) {
if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
return (false);
}
return (true);
}
public void sendNewMessageNotification(Notification notification) {
if( nSupport!= null )
nSupport.sendNotification(notification);
}
private NotificationBroadcasterSupport nSupport= null;
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter,
Object handback)
throws IllegalArgumentException
{
if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
nSupport.addNotificationListener(listener, filter, handback);
}
public void removeNotificationListener(NotificationListener listener)
throws ListenerNotFoundException
{
if( nSupport!=null)
nSupport.removeNotificationListener(listener);
}
MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
public void setNotificationInfo( MBeanNotificationInfo info[]) {
this.notifInfo=info;
}
public MBeanNotificationInfo[] getNotificationInfo() {
return notifInfo;
}
static class SocketAcceptor implements ThreadPoolRunnable {
ChannelSocket wajp;
SocketAcceptor(ChannelSocket wajp ) {
this.wajp=wajp;
}
public Object[] getInitData() {
return null;
}
public void runIt(Object thD[]) {
wajp.acceptConnections();
}
}
static class SocketConnection implements ThreadPoolRunnable {
ChannelSocket wajp;
MsgContext ep;
SocketConnection(ChannelSocket wajp, MsgContext ep) {
this.wajp=wajp;
this.ep=ep;
}
public Object[] getInitData() {
return null;
}
public void runIt(Object perTh[]) {
wajp.processConnection(ep);
ep = null;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -