📄 httprelayservlet.java
字号:
/*
* $Id: HttpRelayServlet.java,v 1.24 2002/06/28 00:07:32 kuldeep Exp $ Copyright
* (c) 2001 Sun Microsystems, Inc. All rights reserved. Redistribution and use
* in source and binary forms, with or without modification, are permitted
* provided that the following conditions are met: 1. Redistributions of source
* code must retain the above copyright notice, this list of conditions and the
* following disclaimer. 2. Redistributions in binary form must reproduce the
* above copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the distribution.
* 3. The end-user documentation included with the redistribution, if any, must
* include the following acknowledgment: "This product includes software
* developed by the Sun Microsystems, Inc. for Project JXTA." Alternately, this
* acknowledgment may appear in the software itself, if and wherever such
* third-party acknowledgments normally appear. 4. The names "Sun", "Sun
* Microsystems, Inc.", "JXTA" and "Project JXTA" must not be used to endorse
* or promote products derived from this software without prior written
* permission. For written permission, please contact Project JXTA at
* http://www.jxta.org. 5. Products derived from this software may not be
* called "JXTA", nor may "JXTA" appear in their name, without prior written
* permission of Sun. THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL SUN MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* ==================================================================== This
* software consists of voluntary contributions made by many individuals on
* behalf of Project JXTA. For more information on Project JXTA, please see
* <http://www.jxta.org/>. This license is based on the BSD license adopted by
* the Apache Foundation.
*/
package net.jxta.impl.endpoint.servlethttp;
import java.io.*;
import java.util.HashMap;
import javax.servlet.*;
import javax.servlet.http.*;
import net.jxta.id.IDFactory;
import net.jxta.peer.PeerID;
import net.jxta.peergroup.PeerGroupID;
import net.jxta.document.MimeMediaType;
import net.jxta.impl.endpoint.*;
import net.jxta.impl.relay.RelayLease;
import net.jxta.impl.relay.RelayLeaseException;
import net.jxta.impl.relay.RelayServer;
import net.jxta.impl.util.BoundedQueue;
import net.jxta.endpoint.EndpointService;
import net.jxta.endpoint.Message;
import javax.servlet.SingleThreadModel;
import org.apache.log4j.Category;
import org.apache.log4j.Priority;
/**
* Http Servlet implementation of the the HTTP Relay Server
*/
public class HttpRelayServlet extends HttpServlet {
/**
* we allow clients to stay blocked before we send a "no msg" response
*
*/
private final static int BLOCK_WAIT_TIMEOUT = 60 * 1000;
private static final MimeMediaType msgType = new MimeMediaType( "application/x-jxta-msg" );
private final static Category LOG = Category.getInstance( HttpRelayServlet.class.getName() );
/** The endpoint that the servlet is receiving messages for **/
private EndpointService endpoint = null;
/** Clients cannot POST more than 10MB **/
private static final int MAX_POST_CONTENT_LEN = 10 * 1024 * 1024;
/**
* the relay server that this http relay is using *
*/
RelayServer relay = null;
/**
* Stores the endpoint from the ServletContext in to the data member
* for easy access.
*/
public void init( ServletConfig config )
throws ServletException {
super.init( config );
try {
relay = (RelayServer) getServletContext().getAttribute( "relayServer" );
if ( relay == null ) {
throw new ServletException("Servlet Context did not contain " +
"'relayServer'" );
}
endpoint = (EndpointService) getServletContext().getAttribute("endpoint");
if ( endpoint == null ){
throw new ServletException( "Servlet Context did not contain "+
"'endpoint'");
}
} catch ( ClassCastException e ) {
throw new ServletException( "'relayServer' attribute was not of " +
"the proper type in the Servlet " +
"Context" );
}
}
/**
* handles all relay get commands, in addition, it checks to see if the
* content-length of the GET request is not zero. In that case, this
* method will validate the message content length and process it.
*
* possible headers in the request
* x-jxta-command=[ obtainLease | releaseLease | poll ]
* x-jxta-client=[ peerId ]
* x-jxta-timeout=[ -1 (do not block) | 0 (block forever) | 1 to MAX_POSITIVE_LONG ]
*
* possible headers in the response
* x-jxta-relay=[ peerId ]
* x-jxta-client=[ peerId ]
* x-jxta-lease=[ 1 to MAX_POSITIVE_LONG ]
*
*/
public void doGet( HttpServletRequest req, HttpServletResponse res )
throws ServletException, IOException {
String command = req.getHeader(HttpUtil.HEADER_PARAM_COMMAND);
if (command == null) {
//DoJa will send parameters as QueryString
command = req.getParameter(HttpUtil.HEADER_PARAM_COMMAND);
}
String clientPeerId = req.getHeader(HttpUtil.HEADER_PARAM_CLIENT);
if (clientPeerId == null) {
// DoJa will send parameters as QueryString
clientPeerId = req.getParameter(HttpUtil.HEADER_PARAM_CLIENT);
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "GET received from " + req.getRemoteAddr() + " command = " + command );
LOG.debug( " clientPeerId = " + clientPeerId );
LOG.debug( " request length = " + req.getContentLength() );
}
// Check if the input stream has any incoming messages
// passsed in the current req content
// Read such messages and queue them for processing
if (req.getContentLength() > 0) {
int numberofMessages = 1;
String numStr = req.getHeader("x-jxta-num-msg");
if (numStr != null) {
try {
numberofMessages = Integer.parseInt(numStr);
} catch (NumberFormatException e) {
LOG.warn( "Could not parse x-jxta-num-msg header", e );
}
}
LOG.debug( " request numberofMessages = " + numberofMessages );
int count = 0;
BufferedInputStream in = new BufferedInputStream(req.getInputStream());
try {
for (count = 0; count < numberofMessages; count++) {
readMessage(in);
}
} catch (IOException e) {
// done reading messages or queue is full
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "IOException during read ", e);
}
}
in.close();
LOG.debug( " numberofMessages read = " + count );
res.setHeader("x-jxta-num-msg", Integer.toString(count));
}
if (HttpUtil.COMMAND_VALUE_OBTAIN_LEASE.equals(command)) {
// check if the client has a peerId
if (clientPeerId == null) {
// generate a peerId
PeerID peerId = IDFactory.newPeerID(PeerGroupID.worldPeerGroupID);
clientPeerId = peerId.getUniqueValue().toString();
}
res.setHeader(HttpUtil.HEADER_PARAM_RELAY,
relay.getPeerID().getUniqueValue().toString());
res.setHeader(HttpUtil.HEADER_PARAM_CLIENT, clientPeerId);
// check if the client has a lease
String leaseId = relay.getLeaseId(clientPeerId);
RelayLease lease = null;
if (leaseId == null) {
lease = relay.issueNewLease(clientPeerId);
} else {
try {
lease = relay.renewLease(leaseId);
} catch (RelayLeaseException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Lease not renewed; lease exception: " + e );
}
}
}
if (lease != null) {
res.setHeader(HttpUtil.HEADER_PARAM_LEASE,
Long.toString(lease.getLeaseLength()));
}
res.setStatus(HttpServletResponse.SC_OK);
res.setContentLength(0);
} else if (HttpUtil.COMMAND_VALUE_RELEASE_LEASE.equals(command)) {
// check if the client gave a peerId
if (clientPeerId == null) {
// check if the client has a lease
String leaseId = relay.getLeaseId(clientPeerId);
if (leaseId == null) {
relay.removeLease(leaseId);
}
}
res.setStatus(HttpServletResponse.SC_OK);
res.setContentLength(0);
} else if (HttpUtil.COMMAND_VALUE_POLL.equals(command)) {
// since getDateHeader() returns -1 if the header does not exist,
// the default value is treated as do not block
long timeout = -1;
String timeoutString = req.getHeader(HttpUtil.HEADER_PARAM_TIMEOUT);
if (timeoutString == null){
// DoJa will send parameters as QueryString
timeoutString = req.getParameter(HttpUtil.HEADER_PARAM_TIMEOUT);
}
try {
timeout = Long.parseLong(timeoutString);
} catch (NumberFormatException e) {
LOG.warn( "Could not parse timeout header", e );
}
LOG.debug( " timeout = " + timeout );
// check if the client has a peerId
if (clientPeerId == null) {
// generate a peerId
PeerID peerId = IDFactory.newPeerID(PeerGroupID.worldPeerGroupID);
clientPeerId = peerId.getUniqueValue().toString();
}
res.setHeader(HttpUtil.HEADER_PARAM_RELAY,
relay.getPeerID().getUniqueValue().toString());
res.setHeader(HttpUtil.HEADER_PARAM_CLIENT, clientPeerId);
// check if the client has a lease
String leaseId = relay.getLeaseId(clientPeerId);
RelayLease lease = null;
if (leaseId == null) {
lease = relay.issueNewLease(clientPeerId);
} else {
try {
lease = relay.renewLease(leaseId);
} catch (RelayLeaseException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Lease not renewed; lease exception: " + e );
}
}
}
if (lease != null) {
res.setHeader(HttpUtil.HEADER_PARAM_LEASE,
Long.toString(lease.getLeaseLength()));
}
BoundedQueue queue = relay.getOutboundMessageQueueByPeerId( clientPeerId );
if ( queue != null ) {
MessageImpl msg = null;
try {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "trying to dequeue message for client " + clientPeerId );
}
msg = (MessageImpl) queue.dequeue(timeout);
} catch ( InterruptedException e ) {
// send back empty response
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No message dequeued; sending blank 200" );
}
}
// if there is a message, send it. Otherwise, respond with
// okay with a 0 content-len
if ( msg == null ) {
res.setStatus(HttpServletResponse.SC_OK);
res.setContentLength(0);
return;
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "Relay sending message back for lease " + leaseId );
}
sendMessage( res, msg );
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "No queue associate with lease (probably an " +
"invalid lease for client): " + clientPeerId );
}
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Invalid lease specified");
}
} else {
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Invalid command specified");
}
}
/**
* handles all relay commands
*
*/
public void doPost( HttpServletRequest req, HttpServletResponse res )
throws ServletException, IOException {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug( "POST received from " + req.getRemoteAddr() );
LOG.debug( " request length = " + req.getContentLength() );
}
// When using Java HttpURLConnection, if getOutputStream() is called on the client,
// the request type is set to POST and is out of control of the programmer
// Check if this is a GET with data and needs to be processed by the doGet() method
if (req.getHeader(HttpUtil.HEADER_PARAM_COMMAND) != null ||
// DoJa will send parameters as QueryString
req.getQueryString() != null) {
doGet(req, res);
return;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -