jacobvpu.java

来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 493 行 · 第 1/2 页

JAVA
493
字号
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jacob.vpu;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.jacob.*;import org.apache.ode.jacob.soup.*;import org.apache.ode.utils.CollectionUtils;import org.apache.ode.utils.ObjectPrinter;import org.apache.ode.utils.msg.MessageBundle;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.util.HashMap;import java.util.Map;import java.util.Stack;/** * The JACOB Virtual Processing Unit ("VPU"). * * @author Maciej Szefler <a href="mailto:mbs@fivesight.com" /> */public final class JacobVPU {    private static final Log __log = LogFactory.getLog(JacobVPU.class);    /**     * Internationalization messages.     */    private static final JacobMessages __msgs = MessageBundle.getMessages(JacobMessages.class);    /**     * Thread-local for associating a thread with a VPU. Needs to be stored in a stack to allow reentrance.     */    static final ThreadLocal<Stack<JacobThread>> __activeJacobThread = new ThreadLocal<Stack<JacobThread>>();    private static final Method REDUCE_METHOD;    /**     * Resolve the {@link JacobRunnable#run} method statically     */    static {        try {            REDUCE_METHOD = JacobRunnable.class.getMethod("run", CollectionUtils.EMPTY_CLASS_ARRAY);        } catch (Exception e) {            throw new Error("Cannot resolve 'run' method", e);        }    }    /**     * Persisted cross-VPU state (state of the channels)     */    private ExecutionQueue _executionQueue;    private Map<Class, Object> _extensions = new HashMap<Class, Object>();    /**     * Classloader used for loading object continuations.     */    private ClassLoader _classLoader = getClass().getClassLoader();    private int _cycle;    private Statistics _statistics = new Statistics();    /**     * The fault "register" of the VPU .     */    private RuntimeException _fault;    /**     * Default constructor.     */    public JacobVPU() {    }    /**     * Re-hydration constructor.     *     * @param executionQueue previously saved execution context     */    public JacobVPU(ExecutionQueue executionQueue) {        setContext(executionQueue);    }    /**     * Instantiation constructor; used to initialize context with the concretion     * of a process abstraction.     *     * @param context virgin context object     * @param concretion the process     */    public JacobVPU(ExecutionQueue context, JacobRunnable concretion) {        setContext(context);        inject(concretion);    }    /**     * Execute one VPU cycle.     *     * @return <code>true</code> if the run queue is not empty after this cycle, <code>false</code> otherwise.     */    public boolean execute() {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("execute", CollectionUtils.EMPTY_OBJECT_ARRAY));        }        if (_executionQueue == null) {            throw new IllegalStateException("No state object for VPU!");        }        if (_fault != null) {            throw _fault;        }        if (!_executionQueue.hasReactions()) {            return false;        }        _cycle = _executionQueue.cycle();        Continuation rqe = _executionQueue.dequeueReaction();        JacobThreadImpl jt = new JacobThreadImpl(rqe);        long ctime = System.currentTimeMillis();        try {            jt.run();        } catch (RuntimeException re) {            _fault = re;            throw re;        }        long rtime = System.currentTimeMillis() - ctime;        ++_statistics.numCycles;        _statistics.totalRunTimeMs += rtime;        _statistics.incRunTime(jt._targetStr, rtime);        return true;    }    public void flush() {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("flush", CollectionUtils.EMPTY_OBJECT_ARRAY));        }        _executionQueue.flush();    }    /**     * Set the state of of the VPU; this is analagous to loading a CPU with a     * thread's context (re-hydration).     *     * @param executionQueue     *            process executionQueue (state)     */    public void setContext(ExecutionQueue executionQueue) {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("setContext",                    new Object[] { "executionQueue", executionQueue }));        }        _executionQueue = executionQueue;        _executionQueue.setClassLoader(_classLoader);    }    public void registerExtension(Class extensionClass, Object obj) {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter                    .stringifyMethodEnter("registerExtension", new Object[] {                            "extensionClass", extensionClass, "obj", obj }));        }        _extensions.put(extensionClass, obj);    }    /**     * Add an item to the run queue.     */    public void addReaction(JacobObject jo, Method method, Object[] args, String desc) {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("addReaction",                    new Object[] { "jo", jo, "method", method, "args", args, "desc", desc }));        }        Continuation continuation = new Continuation(jo, method, args);        continuation.setDescription(desc);        _executionQueue.enqueueReaction(continuation);        ++_statistics.runQueueEntries;    }    /**     * Get the active Jacob thread, i.e. the one associated with the current Java thread.     */    public static JacobThread activeJacobThread() {        return __activeJacobThread.get().peek();    }    /**     * Inject a concretion into the process context. This amounts to chaning the     * process context from <code>P</code> to <code>P|Q</code> where     * <code>P</code> is the previous process context and <code>Q</code> is     * the injected process. This method is equivalent to the parallel operator,     * but is intended to be used from outside of an active {@link JacobThread}.     */    public void inject(JacobRunnable concretion) {        if (__log.isDebugEnabled()) {            __log.debug("injecting " + concretion);        }        addReaction(concretion, REDUCE_METHOD, CollectionUtils.EMPTY_OBJECT_ARRAY,                (__log.isInfoEnabled() ? concretion.toString() : null));    }    static String stringifyMethods(Class kind) {        StringBuffer buf = new StringBuffer();        Method[] methods = kind.getMethods();        boolean found = false;        for (Method method : methods) {            if (method.getDeclaringClass() == Object.class) {                continue;            }            if (found) {                buf.append(" & ");            }            buf.append(method.getName()).append('(');            Class[] argTypes = method.getParameterTypes();            for (int j = 0; j < argTypes.length; ++j) {                if (j > 0) {                    buf.append(", ");                }                buf.append(argTypes[j].getName());            }            buf.append(") {...}");            found = true;        }        return buf.toString();    }    static String stringify(Object[] list) {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?