jacobvpu.java
来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 493 行 · 第 1/2 页
JAVA
493 行
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jacob.vpu;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.jacob.*;import org.apache.ode.jacob.soup.*;import org.apache.ode.utils.CollectionUtils;import org.apache.ode.utils.ObjectPrinter;import org.apache.ode.utils.msg.MessageBundle;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.util.HashMap;import java.util.Map;import java.util.Stack;/** * The JACOB Virtual Processing Unit ("VPU"). * * @author Maciej Szefler <a href="mailto:mbs@fivesight.com" /> */public final class JacobVPU { private static final Log __log = LogFactory.getLog(JacobVPU.class); /** * Internationalization messages. */ private static final JacobMessages __msgs = MessageBundle.getMessages(JacobMessages.class); /** * Thread-local for associating a thread with a VPU. Needs to be stored in a stack to allow reentrance. */ static final ThreadLocal<Stack<JacobThread>> __activeJacobThread = new ThreadLocal<Stack<JacobThread>>(); private static final Method REDUCE_METHOD; /** * Resolve the {@link JacobRunnable#run} method statically */ static { try { REDUCE_METHOD = JacobRunnable.class.getMethod("run", CollectionUtils.EMPTY_CLASS_ARRAY); } catch (Exception e) { throw new Error("Cannot resolve 'run' method", e); } } /** * Persisted cross-VPU state (state of the channels) */ private ExecutionQueue _executionQueue; private Map<Class, Object> _extensions = new HashMap<Class, Object>(); /** * Classloader used for loading object continuations. */ private ClassLoader _classLoader = getClass().getClassLoader(); private int _cycle; private Statistics _statistics = new Statistics(); /** * The fault "register" of the VPU . */ private RuntimeException _fault; /** * Default constructor. */ public JacobVPU() { } /** * Re-hydration constructor. * * @param executionQueue previously saved execution context */ public JacobVPU(ExecutionQueue executionQueue) { setContext(executionQueue); } /** * Instantiation constructor; used to initialize context with the concretion * of a process abstraction. * * @param context virgin context object * @param concretion the process */ public JacobVPU(ExecutionQueue context, JacobRunnable concretion) { setContext(context); inject(concretion); } /** * Execute one VPU cycle. * * @return <code>true</code> if the run queue is not empty after this cycle, <code>false</code> otherwise. */ public boolean execute() { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("execute", CollectionUtils.EMPTY_OBJECT_ARRAY)); } if (_executionQueue == null) { throw new IllegalStateException("No state object for VPU!"); } if (_fault != null) { throw _fault; } if (!_executionQueue.hasReactions()) { return false; } _cycle = _executionQueue.cycle(); Continuation rqe = _executionQueue.dequeueReaction(); JacobThreadImpl jt = new JacobThreadImpl(rqe); long ctime = System.currentTimeMillis(); try { jt.run(); } catch (RuntimeException re) { _fault = re; throw re; } long rtime = System.currentTimeMillis() - ctime; ++_statistics.numCycles; _statistics.totalRunTimeMs += rtime; _statistics.incRunTime(jt._targetStr, rtime); return true; } public void flush() { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("flush", CollectionUtils.EMPTY_OBJECT_ARRAY)); } _executionQueue.flush(); } /** * Set the state of of the VPU; this is analagous to loading a CPU with a * thread's context (re-hydration). * * @param executionQueue * process executionQueue (state) */ public void setContext(ExecutionQueue executionQueue) { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("setContext", new Object[] { "executionQueue", executionQueue })); } _executionQueue = executionQueue; _executionQueue.setClassLoader(_classLoader); } public void registerExtension(Class extensionClass, Object obj) { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter .stringifyMethodEnter("registerExtension", new Object[] { "extensionClass", extensionClass, "obj", obj })); } _extensions.put(extensionClass, obj); } /** * Add an item to the run queue. */ public void addReaction(JacobObject jo, Method method, Object[] args, String desc) { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("addReaction", new Object[] { "jo", jo, "method", method, "args", args, "desc", desc })); } Continuation continuation = new Continuation(jo, method, args); continuation.setDescription(desc); _executionQueue.enqueueReaction(continuation); ++_statistics.runQueueEntries; } /** * Get the active Jacob thread, i.e. the one associated with the current Java thread. */ public static JacobThread activeJacobThread() { return __activeJacobThread.get().peek(); } /** * Inject a concretion into the process context. This amounts to chaning the * process context from <code>P</code> to <code>P|Q</code> where * <code>P</code> is the previous process context and <code>Q</code> is * the injected process. This method is equivalent to the parallel operator, * but is intended to be used from outside of an active {@link JacobThread}. */ public void inject(JacobRunnable concretion) { if (__log.isDebugEnabled()) { __log.debug("injecting " + concretion); } addReaction(concretion, REDUCE_METHOD, CollectionUtils.EMPTY_OBJECT_ARRAY, (__log.isInfoEnabled() ? concretion.toString() : null)); } static String stringifyMethods(Class kind) { StringBuffer buf = new StringBuffer(); Method[] methods = kind.getMethods(); boolean found = false; for (Method method : methods) { if (method.getDeclaringClass() == Object.class) { continue; } if (found) { buf.append(" & "); } buf.append(method.getName()).append('('); Class[] argTypes = method.getParameterTypes(); for (int j = 0; j < argTypes.length; ++j) { if (j > 0) { buf.append(", "); } buf.append(argTypes[j].getName()); } buf.append(") {...}"); found = true; } return buf.toString(); } static String stringify(Object[] list) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?