httpcoreniosender.java
来自「开源的axis2框架的源码。用于开发WEBSERVER」· Java 代码 · 共 463 行 · 第 1/2 页
JAVA
463 行
* copied from the request messages
* @param msgContext the Axis2 Message context from which these headers should be removed
*/
private void removeUnwantedHeaders(MessageContext msgContext) {
Map headers = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
if (headers != null && !headers.isEmpty()) {
headers.remove(HTTP.CONN_DIRECTIVE);
headers.remove(HTTP.TRANSFER_ENCODING);
headers.remove(HTTP.DATE_DIRECTIVE);
headers.remove(HTTP.SERVER_DIRECTIVE);
headers.remove(HTTP.CONTENT_TYPE);
headers.remove(HTTP.CONTENT_LEN);
headers.remove(HTTP.USER_AGENT);
}
}
/**
* Send the request message asynchronously to the given EPR
* @param epr the destination EPR for the message
* @param msgContext the message being sent
* @throws AxisFault on error
*/
private void sendAsyncRequest(EndpointReference epr, MessageContext msgContext) throws AxisFault {
try {
URL url = new URL(epr.getAddress());
int port = url.getPort();
if (port == -1) {
// use default
if ("http".equals(url.getProtocol())) {
port = 80;
} else if ("https".equals(url.getProtocol())) {
port = 443;
}
}
HttpHost httpHost = new HttpHost(url.getHost(), port, url.getProtocol());
Axis2HttpRequest axis2Req = new Axis2HttpRequest(epr, httpHost, msgContext);
NHttpClientConnection conn = ConnectionPool.getConnection(url.getHost(), port);
if (conn == null) {
ioReactor.connect(new InetSocketAddress(url.getHost(), port),
null, axis2Req, sessionRequestCallback);
log.debug("A new connection established");
} else {
((ClientHandler) handler).submitRequest(conn, axis2Req);
log.debug("An existing connection reused");
}
axis2Req.streamMessageContents();
} catch (MalformedURLException e) {
handleException("Malformed destination EPR : " + epr.getAddress(), e);
} catch (IOException e) {
handleException("IO Error while submiting request message for sending", e);
}
}
/**
* Send the passed in response message, asynchronously
* @param msgContext the message context to be sent
* @throws AxisFault on error
*/
private void sendAsyncResponse(MessageContext msgContext) throws AxisFault {
// remove unwanted HTTP headers (if any from the current message)
removeUnwantedHeaders(msgContext);
ServerWorker worker = (ServerWorker) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
HttpResponse response = worker.getResponse();
OMOutputFormat format = Util.getOMOutputFormat(msgContext);
MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
response.setHeader(
HTTP.CONTENT_TYPE,
messageFormatter.getContentType(msgContext, format, msgContext.getSoapAction()));
// return http 500 when a SOAP fault is returned
if (msgContext.getEnvelope().getBody().hasFault()) {
response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
}
// if this is a dummy message to handle http 202 case with non-blocking IO
// set the status code to 202 and the message body to an empty byte array (see below)
if (Utils.isExplicitlyTrue(msgContext, NhttpConstants.SC_ACCEPTED) &&
msgContext.getProperty(
//org.apache.sandesha2.Sandesha2Constants.MessageContextProperties.SEQUENCE_ID
"WSRMSequenceId") == null) {
response.setStatusCode(HttpStatus.SC_ACCEPTED);
}
// set any transport headers
Map transportHeaders = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
if (transportHeaders != null && !transportHeaders.values().isEmpty()) {
Iterator iter = transportHeaders.keySet().iterator();
while (iter.hasNext()) {
Object header = iter.next();
Object value = transportHeaders.get(header);
if (value != null && header instanceof String && value instanceof String) {
response.setHeader((String) header, (String) value);
}
}
}
worker.getServiceHandler().commitResponse(worker.getConn(), response);
OutputStream out = worker.getOutputStream();
try {
if (Utils.isExplicitlyTrue(msgContext, NhttpConstants.SC_ACCEPTED) &&
msgContext.getProperty(
//Sandesha2Constants.MessageContextProperties.SEQUENCE_ID
"WSRMSequenceId") == null) {
// see comment above on the reasoning
out.write(new byte[0]);
} else {
messageFormatter.writeTo(msgContext, format, out, true);
}
out.close();
} catch (IOException e) {
handleException("IO Error sending response message", e);
}
try {
worker.getIs().close();
} catch (IOException ignore) {}
}
private void sendUsingOutputStream(MessageContext msgContext) throws AxisFault {
OMOutputFormat format = Util.getOMOutputFormat(msgContext);
MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
OutputStream out = (OutputStream) msgContext.getProperty(MessageContext.TRANSPORT_OUT);
if (msgContext.isServerSide()) {
OutTransportInfo transportInfo =
(OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
if (transportInfo != null) {
transportInfo.setContentType(
messageFormatter.getContentType(msgContext, format, msgContext.getSoapAction()));
} else {
throw new AxisFault(Constants.OUT_TRANSPORT_INFO + " has not been set");
}
}
try {
messageFormatter.writeTo(msgContext, format, out, true);
out.close();
} catch (IOException e) {
handleException("IO Error sending response message", e);
}
}
public void cleanup(MessageContext msgContext) throws AxisFault {
// do nothing
}
public void stop() {
try {
ioReactor.shutdown();
log.info("Sender shut down");
} catch (IOException e) {
log.warn("Error shutting down IOReactor", e);
}
}
/**
* Return a SessionRequestCallback which gets notified of a connection failure
* or an error during a send operation. This method finds the corresponding
* Axis2 message context for the outgoing request, and find the message receiver
* and sends a fault message back to the message receiver that is marked as
* related to the outgoing request
* @return a Session request callback
*/
private static SessionRequestCallback getSessionRequestCallback() {
return new SessionRequestCallback() {
public void completed(SessionRequest request) {
}
public void failed(SessionRequest request) {
handleError(request);
}
public void timeout(SessionRequest request) {
handleError(request);
}
public void cancelled(SessionRequest sessionRequest) {
}
private void handleError(SessionRequest request) {
if (request.getAttachment() != null &&
request.getAttachment() instanceof Axis2HttpRequest) {
Axis2HttpRequest axis2Request = (Axis2HttpRequest) request.getAttachment();
MessageContext mc = axis2Request.getMsgContext();
MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
try {
// this fault is NOT caused by the endpoint while processing. so we have to
// inform that this is a sending error (e.g. endpoint failure) and handle it
// differently at the message receiver.
Exception exception = request.getException();
MessageContext nioFaultMessageContext =
MessageContextBuilder.createFaultMessageContext(
/** this is not a mistake I do NOT want getMessage()*/
mc, new AxisFault(exception.toString(), exception));
nioFaultMessageContext.setProperty(NhttpConstants.SENDING_FAULT, Boolean.TRUE);
mr.receive(nioFaultMessageContext);
} catch (AxisFault af) {
log.error("Unable to report back failure to the message receiver", af);
}
}
}
};
}
// -------------- utility methods -------------
private void handleException(String msg, Exception e) throws AxisFault {
log.error(msg, e);
throw new AxisFault(msg, e);
}
private void handleException(String msg) throws AxisFault {
log.error(msg);
throw new AxisFault(msg);
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?