📄 httprelayservlet.java
字号:
String queryString = null;
try {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( " try to read querystring" );
}
BufferedReader reader = new BufferedReader( new InputStreamReader( req.getInputStream() ) );
queryString = reader.readLine();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( " done reading querystring" );
}
if ( queryString != null ) {
processCommand( queryString, res );
} else {
res.sendError( HttpServletResponse.SC_BAD_REQUEST,
"Problem reading request" );
}
} catch ( IOException e ) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Exception reading client request: " + e );
}
}
}
/**
* processes the command in the querystring and manipulates the
* response object *
*
*/
private void processCommand( String queryString, HttpServletResponse res )
throws IOException {
HashMap map = QueryString.parse( queryString );
Object[] arr = map.keySet().toArray();
String command = (String) map.get( HttpUtil.COMMAND_NAME );
if (command == null) {
throw new IOException("query string did not contain any commands ");
}
if (LOG.isEnabledFor(Priority.DEBUG)){
LOG.debug( " command: " + command );
}
if ( command.equals( HttpUtil.COMMAND_VALUE_BLOCK ) ) {
processBlock( map, res );
}
if ( command.equals( HttpUtil.COMMAND_VALUE_POLL ) ) {
processPoll( map, res );
}
if ( command.equals( HttpUtil.COMMAND_VALUE_OBTAIN_LEASE ) ) {
processObtainLease( map, res );
}
if ( command.equals( HttpUtil.COMMAND_VALUE_RENEW_LEASE ) ) {
processRenewLease( map, res );
}
}
/**
* Processes the request for a new lease
*/
private void processObtainLease( HashMap map, HttpServletResponse res )
throws IOException {
String clientPeerId = (String) map.get( HttpUtil.PARAM_PEER_ID );
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Client trying to obtain lease: " + clientPeerId );
}
if ( clientPeerId == null ) {
PeerID peerId = IDFactory.newPeerID(PeerGroupID.worldPeerGroupID);
clientPeerId = peerId.getUniqueValue().toString();
}
RelayLease lease = relay.issueNewLease( clientPeerId );
QueryString qs = toQueryString( lease );
qs.add( HttpUtil.PARAM_PEER_ID, relay.getPeerID().toString() );
qs.add( HttpUtil.PARAM_CLIENT_PEER_ID, clientPeerId );
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Granting lease to client: " + clientPeerId );
}
sendOK( res, qs.toString() );
}
/**
* Processes the request to renew a lease *
*
*/
private void processRenewLease( HashMap map, HttpServletResponse res )
throws IOException {
String leaseId = (String) map.get( HttpUtil.PARAM_LEASE_ID );
if (leaseId == null) {
throw new IOException("query string did not contain a leaseId ");
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Renewing lease; old lease id: " + leaseId );
}
if ( leaseId != null ) {
try {
RelayLease lease = relay.renewLease( leaseId );
QueryString qs = toQueryString( lease );
qs.add( HttpUtil.PARAM_PEER_ID, relay.getPeerID().toString() );
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Renewed lease; response=" + qs.toString() );
}
sendOK( res, qs.toString() );
} catch ( RelayLeaseException e ) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Lease not renewed; lease exception: " + e );
}
sendBadRequest( res, e.getMessage() );
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Lease not renewed; leaseId not specified" );
}
sendBadRequest( res, "leaseId not specified" );
}
}
/**
* process the command to block, waiting for a message *
*/
private void processBlock( HashMap map, HttpServletResponse res )
throws IOException {
String leaseId = (String) map.get( HttpUtil.PARAM_LEASE_ID );
if (leaseId == null) {
throw new IOException("query string did not contain a leaseId ");
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "processBlock() for lease " + leaseId );
}
if ( leaseId != null ) {
BoundedQueue queue = relay.getOutboundMessageQueueByLeaseId( leaseId );
if ( queue != null ) {
MessageImpl msg = null;
try {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "trying to dequeue message for lease " + leaseId );
}
msg = (MessageImpl) queue.dequeue( BLOCK_WAIT_TIMEOUT );
} catch ( InterruptedException e ) {
// send back empty response
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No message dequeued; sending blank 200" );
}
sendOK( res );
}
// if there is a message, send it. Otherwise, respond with
// okay with a 0 content-len
if ( msg != null ) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Relay sending message back for lease " + leaseId );
}
sendMessage( res, msg );
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Relay dequeued no msg, sending OK for lease " +
leaseId );
}
sendOK( res );
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No queue associate with lease (probably an " +
"invalid lease): " + leaseId );
}
sendBadRequest( res, "Invalid lease specified" );
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No lease specified" );
}
sendBadRequest( res, "No lease specified" );
}
}
/**
* process the request to poll for a message *
*
*/
private void processPoll( HashMap map, HttpServletResponse res )
throws IOException {
String leaseId = (String) map.get( HttpUtil.PARAM_LEASE_ID );
if (leaseId == null) {
throw new IOException("query string did not contain a leaseId ");
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "processBlock() for lease " + leaseId );
}
if ( leaseId != null ) {
BoundedQueue queue = relay.getOutboundMessageQueueByLeaseId( leaseId );
if ( queue != null ) {
MessageImpl msg = null;
try {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "trying to dequeue message for lease " + leaseId );
}
msg = (MessageImpl) queue.dequeue( BoundedQueue.DO_NOT_BLOCK );
} catch ( InterruptedException e ) {
// send back empty response
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No message dequeued; sending blank 200" );
}
sendOK( res );
}
// if there is a message, send it. Otherwise, respond with
// okay with a 0 content-len
if ( msg != null ) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Relay sending message back for lease " + leaseId );
}
sendMessage( res, msg );
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Relay dequeued no msg, sending OK for lease " +
leaseId );
}
sendOK( res );
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No queue associate with lease (probably an " +
"invalid lease): " + leaseId );
}
sendBadRequest( res, "Invalid lease specified" );
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No lease specified" );
}
sendBadRequest( res, "No lease specified" );
}
}
/**
* convenience method for sending SC_BAD_REQUEST *
*/
private void sendBadRequest( HttpServletResponse res, String msg )
throws IOException {
res.sendError( HttpServletResponse.SC_BAD_REQUEST, msg );
}
/**
* Sends the jxta message back in the HTTP response *
*/
private void sendMessage( HttpServletResponse res, MessageImpl msg )
throws IOException {
res.setStatus( HttpServletResponse.SC_OK );
// buffer the message into the memory stream. We only do this so that
// we can tell the full length of the message first. This is bad;
// it should be fixed in the new message implementation.
ByteArrayOutputStream tmpOut = new ByteArrayOutputStream();
MessageWireFormatFactory.newMessageWireFormat( msgType).writeMessage( tmpOut, msg );
res.setContentLength( tmpOut.size() );
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "sendMessage() preparing; content-length = "
+ tmpOut.size() );
}
// now write the input stream from the buffer to the response's out
// stream
InputStream in = new ByteArrayInputStream( tmpOut.toByteArray() );
OutputStream out = res.getOutputStream();
HttpUtil.transfer( in, out );
out.flush();
out.close();
}
/**
* responds with SC_OK and writes body to the body of the response. A
* CRLF pair is appended to body. *
*
*/
private void sendOK( HttpServletResponse res, String body )
throws IOException {
res.setStatus( HttpServletResponse.SC_OK );
byte[] buf = ( body + "\r\n" ).getBytes();
res.setContentLength( buf.length );
OutputStream out = new BufferedOutputStream( res.getOutputStream() );
out.write( buf );
out.flush();
out.close();
}
/**
* Sends a 200 back with no content *
*
*/
private void sendOK( HttpServletResponse res )
throws IOException {
res.setStatus( HttpServletResponse.SC_OK );
res.setContentLength( 0 );
}
/**
* takes a lease and gives back a QueryString *
*
*/
private QueryString toQueryString( RelayLease lease ) {
QueryString query = new QueryString();
query.add( HttpUtil.PARAM_LEASE_LENGTH,
Integer.toString( lease.getLeaseLength() ) );
query.add( HttpUtil.PARAM_LEASE_ID, lease.getLeaseId() );
return query;
}
/**
* Processes an incoming message. Read it, construct a message, hand
* the message to the send queue
**/
private void readMessage(BufferedInputStream in) throws IOException {
// construct the message. Send BAD_REQUEST if the message construction
// fails
if (LOG.isEnabledFor(Priority.DEBUG)){
LOG.debug("Constructing MessageImpl and calling EndpointService.demux()");
}
Message msg = endpoint.newMessage();
MessageWireFormatFactory.newMessageWireFormat(msgType).readMessage(in, msg);
/* Do it straight instead. Demux does a handshake. */
try {
endpoint.demux(msg);
} catch (IOException ex) {
if (LOG.isEnabledFor(Priority.ERROR)){
LOG.error("Error sending message: " + ex);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -