📄 httpserver.java
字号:
try {
synchronized( this ) {
do {
// we are available for connections
HttpServer.this.unicastPool.availableThread( this );
try {
// We sync on ourself and then wait for someone to wake
// us.
wait();
} catch( InterruptedException woken ) {
Thread.interrupted();
}
// if we are woken without a socket, that means its time to
// quit.
if( null == socket )
break;
HttpServer.this.owner.runReceive(socket);
// we are done with this socket.
socket = null;
} while( true );
}
} catch ( Throwable all ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
}
// we are dying, perhaps another thread can be created to take our place
HttpServer.this.unicastPool.threadDied( this );
}
}
/**
* Manages a connection with a client.
**/
class ClientConnection {
private String clientId = null;
public Socket inputSocket = null;
public InputStream inputStream = null;
public OutputStream outputStream = null;
/**
* Construct a new client connection.
**/
ClientConnection(String clientId,
Socket inputSocket,
InputStream inputStream,
OutputStream outputStream) {
this.clientId = clientId;
this.inputSocket = inputSocket;
this.inputStream = inputStream;
this.outputStream = outputStream;
}
public void sendToClient( Vector buffers, long len ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "sendToClient begins" );
if ((outputStream == null) && (inputSocket != null)) {
try {
outputStream = inputSocket.getOutputStream();
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "failed to get outputstream for socket", e );
}
}
/*
* If we have an available active connection then send it, otherwise
* put it into the spool
*/
if (outputStream == null) {
String dn = owner.HttpSpool + File.separatorChar + clientId;
String fn = owner.cm.createTmpName( dn );
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("sendToClient: spooling message ");
Vector streamParts = new Vector();
for (int i = 0; i < buffers.size(); ++i )
streamParts.addElement(
new ByteArrayInputStream( (byte[]) buffers.elementAt(i) ) );
InputStream combined = new SequenceInputStream(
streamParts.elements() );
owner.cm.saveBytes(dn, fn, combined );
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("sendToClient failed spooling a message", e);
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Sending response on waiting socket to " +
inputSocket.getInetAddress().getHostAddress() + ":" +
inputSocket.getPort() );
HttpTransport.sendResponse( outputStream, buffers, len );
close();
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "sendToClient finished" );
}
public void close() {
HttpTransport.closeSocket( this.inputSocket,
this.inputStream,
this.outputStream);
clientId = null;
inputSocket = null;
inputStream = null;
outputStream = null;
}
protected void finalize () {
close();
}
}
/**
* Constructor for the HTTP server
*
* @param inGroup the thread group we should create our threads in.
* @param owner the HTTP transport we are working for
* @param usingInterface the network interface to use.
* @param serverSocketPort the port we will be listening on.
**/
public HttpServer( ThreadGroup inGroup, HttpTransport owner,
InetAddress usingInterface, int serverSocketPort ) {
super( inGroup, (Runnable) null, "Server Listener" );
this.owner = owner;
// Create the incoming thread pool.
unicastPool = new UnicastThreadPool( );
// Create the set of threads
unicastPool.createThreads();
// Open the incoming socket.
// FIXME 20010919 bondolo@jxta.org Since the server is dead if we didn't
// bind, we should be throwing an exception of some sort here
try {
serverSocket = new ServerSocket( serverSocketPort,
MaxCnxBacklog,
usingInterface );
} catch (BindException e0 ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal("[1] Cannot bind ServerSocket on port: " +
serverSocketPort + " because port is in use or disallowed." );
} catch (IOException e1) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal("[1] Canot create ServerSocket on port " +
serverSocketPort, e1 );
} catch (SecurityException e2 ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal("[1] Canot create ServerSocket on port " +
serverSocketPort, e2 );
}
}
/**
* Daemon where we wait for incoming connections.
**/
public void run() {
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Server is ready to receive request");
while (true) {
try {
IncomingUnicastThread willUse =
unicastPool.getIncomingThread( 0 );
// Strange that we did not get a thread...
if( null == willUse )
continue;
Socket inputSocket = serverSocket.accept();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( inputSocket.getInetAddress().getHostAddress() + ":" +
inputSocket.getPort() +
" connected on " + willUse.getName() );
try {
inputSocket.setSoLinger(true, HttpTransport.LingerDelay);
inputSocket.setSoTimeout( HttpTransport.SocketTimeout );
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("setting socket options failed ", e);
}
// wake up the thread when we get a socket
synchronized( willUse ) {
willUse.setSocket( inputSocket ) ;
willUse.notify( );
}
} catch (IOException e1) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("[1] ServerSocket.accept() on port " +
serverSocket.getLocalPort() +
" has failed.", e1 );
continue;
} catch (SecurityException e2) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("[2] ServerSocket.accept() on port " +
serverSocket.getLocalPort() +
" has failed.", e2 );
continue;
}
}
} catch ( Throwable all ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
}
}
/**
* add a new client to the pool of active connections. If there was a
* previous connection recorded for this client then we close and clean it
* up as well
*
* @param clientId identifies the client
* @param socket the socket it is talking to us with.
* @param is inputstream for the socket
* @param os outputstream for the socket
* @return boolean true if a new connection otherwise false.
**/
synchronized boolean addClientConnection(
String clientId, Socket socket, InputStream is, OutputStream os ) {
ClientConnection client = new ClientConnection(clientId, socket, is, os);
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Adding new client connection for " +
socket.getInetAddress().getHostAddress() + ":" +
socket.getPort() +
((null != is) ? " I " : " ") +
((null != os) ? " O " : " ") );
ClientConnection old = (ClientConnection) clientConnections.put(clientId, client);
if( null != old ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Closing previous connection to client" );
old.close( );
}
return( null == old );
}
/**
* locate the connection object for a clientId id and remove it.
*
* @param clientId identifies the client
* @return ClientConnection the connection object for the clientId id or null if
* no connection is known.
**/
synchronized ClientConnection getClientConnection( String clientId ) {
ClientConnection result = (ClientConnection) clientConnections.remove( clientId );
return result;
}
/**
* send a message to a client using an existing client connection if available
*
* @param clientId identifies the client
* @param buffers the message to send.
* @param len the length of the message
**/
void sendMessageToClient(String clientId, Vector buffers, long len ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("sendMessageToClient");
// Check if there is a pending connection to that clientId.
ClientConnection client = getClientConnection(clientId);
if (client != null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" Using client connection");
// we have to add the msg wrapper stuff.
String docstart = "<" + HttpMessage.CodeTag + ">" + Integer.toString(HttpMessage.Content) +
"</" + HttpMessage.CodeTag + ">" + "<" + HttpMessage.ContentTag + ">\n";
String docend = "</" + HttpMessage.ContentTag + ">";
try {
buffers.insertElementAt(docstart.getBytes(), 0);
buffers.addElement(docend.getBytes());
} catch (Exception ex) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("sendMessageToClient failed ", ex);
return;
}
client.sendToClient( buffers, len + docstart.length() + docend.length() );
} else {
// XXX 20010918 bondolo@jxta.org I'm not sure of this step why would
// we do this if we have no connection? The client may not have even
// ever registered.
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn(" Storing into the spooler for " + clientId);
try {
String dn = owner.HttpSpool + File.separatorChar + clientId;
String fn = owner.cm.createTmpName( dn );
HttpMessage httpMsg = new HttpMessage( dn, fn, buffers );
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("sendMessageToClient: failed to spool message", e);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -