pick.java

来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 344 行 · 第 1/2 页

JAVA
344
字号
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.bpel.runtime;import java.util.Calendar;import java.util.Collection;import java.util.Date;import javax.xml.namespace.QName;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.common.CorrelationKey;import org.apache.ode.bpel.common.FaultException;import org.apache.ode.bpel.evt.VariableModificationEvent;import org.apache.ode.bpel.explang.EvaluationException;import org.apache.ode.bpel.o.OElementVarType;import org.apache.ode.bpel.o.OMessageVarType;import org.apache.ode.bpel.o.OPickReceive;import org.apache.ode.bpel.o.OScope;import org.apache.ode.bpel.o.OMessageVarType.Part;import org.apache.ode.bpel.runtime.channels.FaultData;import org.apache.ode.bpel.runtime.channels.PickResponseChannel;import org.apache.ode.bpel.runtime.channels.PickResponseChannelListener;import org.apache.ode.bpel.runtime.channels.TerminationChannelListener;import org.apache.ode.utils.DOMUtils;import org.apache.ode.utils.xsd.Duration;import org.apache.ode.bpel.evar.ExternalVariableModuleException;import org.apache.ode.bpel.iapi.BpelEngineException;import org.w3c.dom.Element;import org.w3c.dom.Node;/** * Template for the BPEL <code>pick</code> activity. */class PICK extends ACTIVITY {    private static final long serialVersionUID = 1L;    private static final Log __log = LogFactory.getLog(PICK.class);    private OPickReceive _opick;    // if multiple alarms are set, this is the alarm the evaluates to    // the shortest absolute time until firing.    private OPickReceive.OnAlarm _alarm = null;    public PICK(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {        super(self, scopeFrame, linkFrame);        _opick = (OPickReceive) self.o;    }    /**     * @see org.apache.ode.jacob.JacobRunnable#run()     */    public void run() {        PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);        Date timeout;        Selector[] selectors;        try {            selectors = new Selector[_opick.onMessages.size()];            int idx = 0;            for (OPickReceive.OnMessage onMessage : _opick.onMessages) {                CorrelationKey key = null; // this will be the case for the                // createInstance activity                PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(onMessage.partnerLink);                if (onMessage.matchCorrelation == null && !_opick.createInstanceFlag) {                    // Adding a route for opaque correlation. In this case,                    // correlation is on "out-of-band" session-id                    String sessionId = getBpelRuntimeContext().fetchMySessionId(pLinkInstance);                    key = new CorrelationKey(-1, new String[] { sessionId });                } else if (onMessage.matchCorrelation != null) {                    if (!getBpelRuntimeContext().isCorrelationInitialized(                            _scopeFrame.resolve(onMessage.matchCorrelation))) {                        // the following should really test if this is a "join"                        // type correlation...                        if (!_opick.createInstanceFlag)                            throw new FaultException(_opick.getOwner().constants.qnCorrelationViolation,                                    "Correlation not initialized.");                    } else {                        key = getBpelRuntimeContext().readCorrelation(_scopeFrame.resolve(onMessage.matchCorrelation));                        assert key != null;                    }                }                selectors[idx] = new Selector(idx, pLinkInstance, onMessage.operation.getName(), onMessage.operation                        .getOutput() == null, onMessage.messageExchangeId, key);                idx++;            }            timeout = null;            for (OPickReceive.OnAlarm onAlarm : _opick.onAlarms) {                Date dt = onAlarm.forExpr != null ? offsetFromNow(getBpelRuntimeContext().getExpLangRuntime()                        .evaluateAsDuration(onAlarm.forExpr, getEvaluationContext())) : getBpelRuntimeContext()                        .getExpLangRuntime().evaluateAsDate(onAlarm.untilExpr, getEvaluationContext()).getTime();                if (timeout == null || timeout.compareTo(dt) > 0) {                    timeout = dt;                    _alarm = onAlarm;                }            }            getBpelRuntimeContext().select(pickResponseChannel, timeout, _opick.createInstanceFlag, selectors);        } catch (FaultException e) {            __log.error(e);            FaultData fault = createFault(e.getQName(), _opick, e.getMessage());            dpe(_opick.outgoingLinks);            _self.parent.completed(fault, CompensationHandler.emptySet());            return;        } catch (EvaluationException e) {            String msg = "Unexpected evaluation error evaluating alarm.";            __log.error(msg, e);            throw new InvalidProcessException(msg, e);        }        // Dead path all the alarms that have no chace of coming first.        for (OPickReceive.OnAlarm oa : _opick.onAlarms) {            if (!oa.equals(_alarm)) {                dpe(oa.activity);            }        }        instance(new WAITING(pickResponseChannel));    }    /**     * Calculate a duration offset from right now.     *     * @param duration     *            the offset     * @return the resulting date.     */    private static Date offsetFromNow(Duration duration) {        Calendar cal = Calendar.getInstance();        duration.addTo(cal);        return cal.getTime();    }    @SuppressWarnings("unchecked")    private void initVariable(String mexId, OPickReceive.OnMessage onMessage) {        // This is allowed, if there is no parts in the message for example.        if (onMessage.variable == null) return;        Element msgEl;        try {            // At this point, not being able to get the request is most probably            // a mex that hasn't properly replied to (process issue).            msgEl = getBpelRuntimeContext().getMyRequest(mexId);        } catch (BpelEngineException e) {            __log.error("The message exchange seems to be in an unconsistent state, you're " +                "probably missing a reply on a request/response interaction.");            _self.parent.failure(e.toString(), null);            return;        }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?