axisengine.java
来自「开源的axis2框架的源码。用于开发WEBSERVER」· Java 代码 · 共 541 行 · 第 1/2 页
JAVA
541 行
//message will be resumed again, but perhaps we need to unwind back to
//the point at which the message was resumed and provide another API
//to allow the full unwind if the message is going to be discarded.
//invoke the phases
InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
//invoking the MR
if (pi.equals(InvocationResponse.CONTINUE)) {
checkMustUnderstand(msgContext);
if (msgContext.isServerSide()) {
// invoke the Message Receivers
MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
if (receiver == null) {
throw new AxisFault(Messages.getMessage(
"nomessagereciever",
msgContext.getAxisOperation().getName().toString()));
}
receiver.receive(msgContext);
}
flowComplete(msgContext);
}
return pi;
}
/**
* To resume the invocation at the send path , this is neened since it is require to call
* TransportSender at the end
*
* @param msgContext
* @return An InvocationResponse allowing the invoker to perhaps determine
* whether or not the message processing will ever succeed.
* @throws AxisFault
*/
public static InvocationResponse resumeSend(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " resumeSend:" + msgContext.getMessageID());
}
//REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeSendFault as well, when, in fact, this does both
//REVIEW: Unlike with send, there is no wrapping try/catch clause which would
//fire off the flowComplete on an error, as we have to assume that the
//message will be resumed again, but perhaps we need to unwind back to
//the point at which the message was resumed and provide another API
//to allow the full unwind if the message is going to be discarded.
//invoke the phases
InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
//Invoking Transport Sender
if (pi.equals(InvocationResponse.CONTINUE)) {
// write the Message to the Wire
TransportOutDescription transportOut = msgContext.getTransportOut();
TransportSender sender = transportOut.getSender();
sender.invoke(msgContext);
flowComplete(msgContext);
}
return pi;
}
/**
* Resume processing of a message.
*
* @param msgctx
* @return An InvocationResponse allowing the invoker to perhaps determine
* whether or not the message processing will ever succeed.
* @throws AxisFault
*/
public static InvocationResponse resume(MessageContext msgctx) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgctx.getLogIDString() + " resume:" + msgctx.getMessageID());
}
msgctx.setPaused(false);
if (msgctx.getFLOW() == MessageContext.IN_FLOW) {
return resumeReceive(msgctx);
} else {
return resumeSend(msgctx);
}
}
/**
* This methods represents the outflow of the Axis, this could be either at the server side or the client side.
* Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
* deployment time by the deployment module
*
* @param msgContext
* @throws AxisFault
* @see MessageContext
* @see Phase
* @see Handler
*/
public static void send(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " send:" + msgContext.getMessageID());
}
// find and invoke the Phases
OperationContext operationContext = msgContext.getOperationContext();
ArrayList executionChain = operationContext.getAxisOperation().getPhasesOutFlow();
//rather than having two steps added both oparation and global chain together
ArrayList outPhases = new ArrayList();
outPhases.addAll(executionChain);
outPhases.addAll(msgContext.getConfigurationContext().getAxisConfiguration().getOutFlowPhases());
msgContext.setExecutionChain(outPhases);
msgContext.setFLOW(MessageContext.OUT_FLOW);
try {
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.CONTINUE)) {
// write the Message to the Wire
TransportOutDescription transportOut = msgContext.getTransportOut();
if (transportOut == null) {
throw new AxisFault("Transport out has not been set");
}
TransportSender sender = transportOut.getSender();
// This boolean property only used in client side fireAndForget invocation
//It will set a property into message context and if some one has set the
//property then transport sender will invoke in a diffrent thread
Object isTransportNonBlocking = msgContext.getProperty(
MessageContext.TRANSPORT_NON_BLOCKING);
if (isTransportNonBlocking != null &&
((Boolean) isTransportNonBlocking).booleanValue()) {
msgContext.getConfigurationContext().getThreadPool().execute(
new TransportNonBlockingInvocationWorker(msgContext, sender));
} else {
sender.invoke(msgContext);
}
//REVIEW: In the case of the TransportNonBlockingInvocationWorker, does this need to wait until that finishes?
flowComplete(msgContext);
} else if (pi.equals(InvocationResponse.SUSPEND)) {
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
} else {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.send()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
} catch (AxisFault e) {
msgContext.setFailureReason(e);
flowComplete(msgContext);
throw e;
}
}
/**
* Sends the SOAP Fault to another SOAP node.
*
* @param msgContext
* @throws AxisFault
*/
public static void sendFault(MessageContext msgContext) throws AxisFault {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(msgContext.getLogIDString() + " sendFault:" + msgContext.getMessageID());
}
OperationContext opContext = msgContext.getOperationContext();
//FIXME: If this gets paused in the operation-specific phases, the resume is not going to function correctly as the phases will not have all been set
// find and execute the Fault Out Flow Handlers
if (opContext != null) {
AxisOperation axisOperation = opContext.getAxisOperation();
ArrayList faultExecutionChain = axisOperation.getPhasesOutFaultFlow();
//adding both operation specific and global out fault flows.
ArrayList outFaultPhases = new ArrayList();
outFaultPhases.addAll((ArrayList) faultExecutionChain.clone());
msgContext.setExecutionChain((ArrayList) outFaultPhases.clone());
msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
try {
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.SUSPEND)) {
log.warn(msgContext.getLogIDString() +
" The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
return;
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
return;
} else if (!pi.equals(InvocationResponse.CONTINUE)) {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
}
catch (AxisFault e) {
msgContext.setFailureReason(e);
flowComplete(msgContext);
throw e;
}
}
msgContext.setExecutionChain((ArrayList) msgContext.getConfigurationContext()
.getAxisConfiguration().getOutFaultFlowPhases().clone());
msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
if (pi.equals(InvocationResponse.CONTINUE)) {
// Actually send the SOAP Fault
TransportOutDescription transportOut = msgContext.getTransportOut();
if (transportOut == null) {
throw new AxisFault("Transport out has not been set");
}
TransportSender sender = transportOut.getSender();
sender.invoke(msgContext);
flowComplete(msgContext);
} else if (pi.equals(InvocationResponse.SUSPEND)) {
} else if (pi.equals(InvocationResponse.ABORT)) {
flowComplete(msgContext);
} else {
String errorMsg =
"Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
log.error(msgContext.getLogIDString() + " " + errorMsg);
throw new AxisFault(errorMsg);
}
}
/**
* This class is used when someone invoke a service invocation with two transports
* If we dont create a new thread then the main thread will block untill it gets the
* response . In the case of HTTP transportsender will block untill it gets HTTP 200
* So , main thread also block till transport sender rereases the tread. So there is no
* actual non-blocking. That is why when sending we creat a new thead and send the
* requset via that.
* <p/>
* So whole porpose of this class to send the requset via a new thread
* <p/>
* way transport.
*/
private static class TransportNonBlockingInvocationWorker implements Runnable {
private MessageContext msgctx;
private TransportSender sender;
public TransportNonBlockingInvocationWorker(MessageContext msgctx,
TransportSender sender) {
this.msgctx = msgctx;
this.sender = sender;
}
public void run() {
try {
sender.invoke(msgctx);
} catch (Exception e) {
log.info(msgctx.getLogIDString() + " " + e.getMessage());
if (msgctx.getProperty(MessageContext.DISABLE_ASYNC_CALLBACK_ON_TRANSPORT_ERROR) ==
null) {
AxisOperation axisOperation = msgctx.getAxisOperation();
if (axisOperation != null) {
MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
Object callback = ((CallbackReceiver) msgReceiver)
.lookupCallback(msgctx.getMessageID());
if (callback == null) return; // TODO: should we log this??
if (callback instanceof Callback) {
((Callback)callback).onError(e);
} else {
((AxisCallback)callback).onError(e);
}
}
}
}
}
}
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?