httpcoreniosender.java

来自「开源的axis2框架的源码。用于开发WEBSERVER」· Java 代码 · 共 463 行 · 第 1/2 页

JAVA
463
字号
     * copied from the request messages
     * @param msgContext the Axis2 Message context from which these headers should be removed
     */
    private void removeUnwantedHeaders(MessageContext msgContext) {
        Map headers = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
        if (headers != null && !headers.isEmpty()) {
            headers.remove(HTTP.CONN_DIRECTIVE);
            headers.remove(HTTP.TRANSFER_ENCODING);
            headers.remove(HTTP.DATE_DIRECTIVE);
            headers.remove(HTTP.SERVER_DIRECTIVE);
            headers.remove(HTTP.CONTENT_TYPE);
            headers.remove(HTTP.CONTENT_LEN);
            headers.remove(HTTP.USER_AGENT);
        }
    }

    /**
     * Send the request message asynchronously to the given EPR
     * @param epr the destination EPR for the message
     * @param msgContext the message being sent
     * @throws AxisFault on error
     */
    private void sendAsyncRequest(EndpointReference epr, MessageContext msgContext) throws AxisFault {
        try {
            URL url = new URL(epr.getAddress());
            int port = url.getPort();
            if (port == -1) {
                // use default
                if ("http".equals(url.getProtocol())) {
                    port = 80;
                } else if ("https".equals(url.getProtocol())) {
                    port = 443;
                }
            }
            HttpHost httpHost = new HttpHost(url.getHost(), port, url.getProtocol());

            Axis2HttpRequest axis2Req = new Axis2HttpRequest(epr, httpHost, msgContext);

            NHttpClientConnection conn = ConnectionPool.getConnection(url.getHost(), port);

            if (conn == null) {
                ioReactor.connect(new InetSocketAddress(url.getHost(), port),
                    null, axis2Req, sessionRequestCallback);
                log.debug("A new connection established");
            } else {
                ((ClientHandler) handler).submitRequest(conn, axis2Req);
                log.debug("An existing connection reused");
            }

            axis2Req.streamMessageContents();

        } catch (MalformedURLException e) {
            handleException("Malformed destination EPR : " + epr.getAddress(), e);
        } catch (IOException e) {
            handleException("IO Error while submiting request message for sending", e);
        }
    }

    /**
     * Send the passed in response message, asynchronously
     * @param msgContext the message context to be sent
     * @throws AxisFault on error
     */
    private void sendAsyncResponse(MessageContext msgContext) throws AxisFault {

        // remove unwanted HTTP headers (if any from the current message)
        removeUnwantedHeaders(msgContext);
        
        ServerWorker worker = (ServerWorker) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
        HttpResponse response = worker.getResponse();

        OMOutputFormat format = Util.getOMOutputFormat(msgContext);
        MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
        response.setHeader(
            HTTP.CONTENT_TYPE,
            messageFormatter.getContentType(msgContext, format, msgContext.getSoapAction()));

        // return http 500 when a SOAP fault is returned
        if (msgContext.getEnvelope().getBody().hasFault()) {
            response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
        }

        // if this is a dummy message to handle http 202 case with non-blocking IO
        // set the status code to 202 and the message body to an empty byte array (see below)
        if (Utils.isExplicitlyTrue(msgContext, NhttpConstants.SC_ACCEPTED) &&
                msgContext.getProperty(
                    //org.apache.sandesha2.Sandesha2Constants.MessageContextProperties.SEQUENCE_ID
                    "WSRMSequenceId") == null) {
            response.setStatusCode(HttpStatus.SC_ACCEPTED);
        }

        // set any transport headers
        Map transportHeaders = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS);
        if (transportHeaders != null && !transportHeaders.values().isEmpty()) {
            Iterator iter = transportHeaders.keySet().iterator();
            while (iter.hasNext()) {
                Object header = iter.next();
                Object value = transportHeaders.get(header);
                if (value != null && header instanceof String && value instanceof String) {
                    response.setHeader((String) header, (String) value);
                }
            }
        }
        worker.getServiceHandler().commitResponse(worker.getConn(), response);

        OutputStream out = worker.getOutputStream();
        try {
            if (Utils.isExplicitlyTrue(msgContext, NhttpConstants.SC_ACCEPTED) &&
                msgContext.getProperty(
                    //Sandesha2Constants.MessageContextProperties.SEQUENCE_ID
                    "WSRMSequenceId") == null) {
                // see comment above on the reasoning
                out.write(new byte[0]);
            } else {
                messageFormatter.writeTo(msgContext, format, out, true);
            }
            out.close();
        } catch (IOException e) {
            handleException("IO Error sending response message", e);
        }

        try {
            worker.getIs().close();
        } catch (IOException ignore) {}        
    }

    private void sendUsingOutputStream(MessageContext msgContext) throws AxisFault {
        OMOutputFormat format = Util.getOMOutputFormat(msgContext);
        MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(msgContext);
        OutputStream out = (OutputStream) msgContext.getProperty(MessageContext.TRANSPORT_OUT);

        if (msgContext.isServerSide()) {
            OutTransportInfo transportInfo =
                (OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);

            if (transportInfo != null) {
                transportInfo.setContentType(
                messageFormatter.getContentType(msgContext, format, msgContext.getSoapAction()));
            } else {
                throw new AxisFault(Constants.OUT_TRANSPORT_INFO + " has not been set");
            }
        }

        try {
            messageFormatter.writeTo(msgContext, format, out, true);
            out.close();
        } catch (IOException e) {
            handleException("IO Error sending response message", e);
        }
    }


    public void cleanup(MessageContext msgContext) throws AxisFault {
        // do nothing
    }

    public void stop() {
        try {
            ioReactor.shutdown();
            log.info("Sender shut down");
        } catch (IOException e) {
            log.warn("Error shutting down IOReactor", e);
        }
    }

    /**
     * Return a SessionRequestCallback which gets notified of a connection failure
     * or an error during a send operation. This method finds the corresponding
     * Axis2 message context for the outgoing request, and find the message receiver
     * and sends a fault message back to the message receiver that is marked as
     * related to the outgoing request
     * @return a Session request callback
     */
    private static SessionRequestCallback getSessionRequestCallback() {
        return new SessionRequestCallback() {
            public void completed(SessionRequest request) {
            }

            public void failed(SessionRequest request) {
                handleError(request);
            }

            public void timeout(SessionRequest request) {
                handleError(request);
            }

            public void cancelled(SessionRequest sessionRequest) {

            }

            private void handleError(SessionRequest request) {
                if (request.getAttachment() != null &&
                    request.getAttachment() instanceof Axis2HttpRequest) {

                    Axis2HttpRequest axis2Request = (Axis2HttpRequest) request.getAttachment();
                    MessageContext mc = axis2Request.getMsgContext();
                    MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();

                    try {
                        // this fault is NOT caused by the endpoint while processing. so we have to
                        // inform that this is a sending error (e.g. endpoint failure) and handle it
                        // differently at the message receiver.

                        Exception exception = request.getException();
                        MessageContext nioFaultMessageContext =
                            MessageContextBuilder.createFaultMessageContext(
                                /** this is not a mistake I do NOT want getMessage()*/
                                mc, new AxisFault(exception.toString(), exception));
                        nioFaultMessageContext.setProperty(NhttpConstants.SENDING_FAULT, Boolean.TRUE);
                        mr.receive(nioFaultMessageContext);
                        
                    } catch (AxisFault af) {
                        log.error("Unable to report back failure to the message receiver", af);
                    }
                }
            }
        };
    }

    // -------------- utility methods -------------
    private void handleException(String msg, Exception e) throws AxisFault {
        log.error(msg, e);
        throw new AxisFault(msg, e);
    }

    private void handleException(String msg) throws AxisFault {
        log.error(msg);
        throw new AxisFault(msg);
    }
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?