executionqueueimpl.java
来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 821 行 · 第 1/2 页
JAVA
821 行
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jacob.vpu;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.jacob.Channel;import org.apache.ode.jacob.ChannelListener;import org.apache.ode.jacob.IndexedObject;import org.apache.ode.jacob.JacobObject;import org.apache.ode.jacob.soup.Comm;import org.apache.ode.jacob.soup.CommChannel;import org.apache.ode.jacob.soup.CommGroup;import org.apache.ode.jacob.soup.CommRecv;import org.apache.ode.jacob.soup.CommSend;import org.apache.ode.jacob.soup.Continuation;import org.apache.ode.jacob.soup.ExecutionQueue;import org.apache.ode.jacob.soup.ExecutionQueueObject;import org.apache.ode.jacob.soup.ReplacementMap;import org.apache.ode.utils.CollectionUtils;import org.apache.ode.utils.ObjectPrinter;import java.io.Externalizable;import java.io.IOException;import java.io.InputStream;import java.io.ObjectInput;import java.io.ObjectInputStream;import java.io.ObjectOutput;import java.io.ObjectOutputStream;import java.io.ObjectStreamClass;import java.io.OutputStream;import java.io.PrintStream;import java.io.Serializable;import java.lang.reflect.Field;import java.lang.reflect.Method;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.zip.GZIPInputStream;import java.util.zip.GZIPOutputStream;/** * A fast, in-memory {@link org.apache.ode.jacob.soup.ExecutionQueue} implementation. */public class ExecutionQueueImpl implements ExecutionQueue { /** Class-level logger. */ private static final Log __log = LogFactory.getLog(ExecutionQueueImpl.class); private ClassLoader _classLoader; private static ConcurrentHashMap<String, ObjectStreamClass> _classDescriptors = new ConcurrentHashMap<String, ObjectStreamClass>(); /** * Cached set of enqueued {@link Continuation} objects (i.e. those read using * {@link #enqueueReaction(org.apache.ode.jacob.soup.Continuation)}). * These reactions are "cached"--that is it is not sent directly to the DAO * layer--to minimize unnecessary serialization/deserialization of closures. * This is a pretty useful optimization, as most {@link Continuation}s are * enqueued, and then immediately dequeued in the next cycle. By caching * {@link Continuation}s, we eliminate practically all serialization of * these objects, the only exception being cases where the system decides to * stop processing a particular soup despite the soup being able to make * forward progress; this scenario would occur if a maximum processign * time-per-instance policy were in effect. */ private Set<Continuation> _reactions = new HashSet<Continuation>(); private Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>(); /** * The "expected" cycle counter, use to detect database serialization * issues. */ private int _currentCycle; private int _objIdCounter; private ExecutionQueueStatistics _statistics = new ExecutionQueueStatistics(); private ReplacementMap _replacementMap; private Serializable _gdata; private Map<Object, LinkedList<IndexedObject>> _index = new HashMap<Object, LinkedList<IndexedObject>>(); public ExecutionQueueImpl(ClassLoader classLoader) { _classLoader = classLoader; } public void setClassLoader(ClassLoader classLoader) { _classLoader = classLoader; } public void setReplacementMap(ReplacementMap replacementMap) { _replacementMap = replacementMap; } public Map<Object, LinkedList<IndexedObject>> getIndex() { return _index; } public void add(CommChannel channel) { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("add", new Object[] { "channel", channel })); verifyNew(channel); ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter, channel.getType().getName(), channel .getDescription()); _channels.put(cframe.getId(), cframe); assignId(channel, cframe.getId()); } public void enqueueReaction(Continuation continuation) { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("enqueueReaction", new Object[] { "continuation", continuation })); verifyNew(continuation); _reactions.add(continuation); } public Continuation dequeueReaction() { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("dequeueReaction", CollectionUtils.EMPTY_OBJECT_ARRAY)); } Continuation continuation = null; if (!_reactions.isEmpty()) { Iterator it = _reactions.iterator(); continuation = (Continuation) it.next(); it.remove(); } return continuation; } public void add(CommGroup group) { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("add", new Object[] { "group", group })); verifyNew(group); CommGroupFrame commGroupFrame = new CommGroupFrame(group.isReplicated()); for (Iterator i = group.getElements(); i.hasNext();) { Comm comm = (Comm) i.next(); ChannelFrame chnlFrame = findChannelFrame(comm.getChannel().getId()); if (comm instanceof CommSend) { if (chnlFrame.replicatedSend) { // TODO: JACOB "bad-process" ex throw new IllegalStateException("Send attempted on channel containing replicated send! Channel= " + comm.getChannel()); } if (group.isReplicated()) chnlFrame.replicatedSend = true; CommSend commSend = (CommSend) comm; MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMethod().getName(), commSend.getArgs()); commGroupFrame.commFrames.add(mframe); chnlFrame.msgFrames.add(mframe); } else if (comm instanceof CommRecv) { if (chnlFrame.replicatedRecv) { // TODO: JACOB "bad-process" ex throw new IllegalStateException( "Receive attempted on channel containing replicated receive! Channel= " + comm.getChannel()); } if (group.isReplicated()) chnlFrame.replicatedRecv = true; CommRecv commRecv = (CommRecv) comm; ObjectFrame oframe = new ObjectFrame(commGroupFrame, chnlFrame, commRecv.getContinuation()); commGroupFrame.commFrames.add(oframe); chnlFrame.objFrames.add(oframe); } } // Match communications. for (Iterator i = group.getElements(); i.hasNext();) { Comm comm = (Comm) i.next(); matchCommunications(comm.getChannel()); } } private ChannelFrame findChannelFrame(Object id) { ChannelFrame chnlFrame = _channels.get(id); if (chnlFrame == null) { throw new IllegalArgumentException("No such channel; id=" + id); } return chnlFrame; } public int cycle() { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("cycle", CollectionUtils.EMPTY_OBJECT_ARRAY)); } return ++_currentCycle; } public String createExport(CommChannel channel) { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("createExport", new Object[] { "channel", channel })); ChannelFrame cframe = findChannelFrame(channel.getId()); cframe.refCount++; return channel.getId().toString(); } public CommChannel consumeExport(String exportId) { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("consumeExport", new Object[] { "exportId", exportId })); } Integer id = Integer.valueOf(exportId); ChannelFrame cframe = findChannelFrame(id); cframe.refCount--; CommChannel commChannel = new CommChannel(cframe.type); commChannel.setId(id); commChannel.setDescription("EXPORTED CHANNEL"); return commChannel; } public boolean hasReactions() { return !_reactions.isEmpty(); } public void flush() { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("flush", CollectionUtils.EMPTY_OBJECT_ARRAY)); } } public void read(InputStream iis) throws IOException, ClassNotFoundException { _channels.clear(); _reactions.clear(); _index.clear(); ExecutionQueueInputStream sis = new ExecutionQueueInputStream(iis); _objIdCounter = sis.readInt(); _currentCycle = sis.readInt(); int reactions = sis.readInt(); for (int i = 0; i < reactions; ++i) { JacobObject closure = (JacobObject) sis.readObject(); String methodName = sis.readUTF(); Method method = closure.getMethod(methodName); int numArgs = sis.readInt(); Object[] args = new Object[numArgs]; for (int j = 0; j < numArgs; ++j) { args[j] = sis.readObject(); } _reactions.add(new Continuation(closure, method, args)); } int numChannels = sis.readInt(); for (int i = 0; i < numChannels; ++i) { int objFrames = sis.readInt(); for (int j = 0; j < objFrames; ++j) { sis.readObject(); } int msgFrames = sis.readInt(); for (int j = 0; j < msgFrames; ++j) { sis.readObject(); } } numChannels = sis.readInt(); for (int i = 0; i < numChannels; ++i) { ChannelFrame cframe = (ChannelFrame) sis.readObject(); _channels.put(cframe.getId(), cframe); } _gdata = (Serializable) sis.readObject(); sis.close(); } private void index(IndexedObject object) { LinkedList<IndexedObject> vals = _index.get(object.getKey()); if (vals == null) { vals = new LinkedList<IndexedObject>(); _index.put(object.getKey(), vals); } vals.add(object); } public void write(OutputStream oos) throws IOException { flush(); ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(oos);// XQXMLOutputStream sos = createObjectOutputStream(new OutputStreamWriter(oos)); sos.writeInt(_objIdCounter); sos.writeInt(_currentCycle); // Write out the reactions. sos.writeInt(_reactions.size()); for (Continuation c : _reactions) { sos.writeObject(c.getClosure()); sos.writeUTF(c.getMethod().getName()); sos.writeInt(c.getArgs() == null ? 0 : c.getArgs().length); for (int j = 0; c.getArgs() != null && j < c.getArgs().length; ++j) sos.writeObject(c.getArgs()[j]); } sos.writeInt(_channels.values().size()); for (Iterator i = _channels.values().iterator(); i.hasNext();) { ChannelFrame cframe = (ChannelFrame) i.next(); sos.writeInt(cframe.objFrames.size()); for (Iterator j = cframe.objFrames.iterator(); j.hasNext();) { sos.writeObject(j.next()); } sos.writeInt(cframe.msgFrames.size()); for (Iterator j = cframe.msgFrames.iterator(); j.hasNext();) { sos.writeObject(j.next()); } } Set referencedChannels = sos.getSerializedChannels(); for (Iterator i = _channels.values().iterator(); i.hasNext();) { ChannelFrame cframe = (ChannelFrame) i.next(); if (referencedChannels.contains(Integer.valueOf(cframe.id)) || cframe.refCount > 0) { // skip } else { if (__log.isDebugEnabled()) __log.debug("GC Channel: " + cframe); i.remove(); } } sos.writeInt(_channels.values().size()); for (Iterator i = _channels.values().iterator(); i.hasNext();) { ChannelFrame cframe = (ChannelFrame) i.next(); if (__log.isDebugEnabled()) { __log.debug("Writing Channel: " + cframe); } sos.writeObject(cframe); } // Write the global data. sos.writeObject(_gdata); sos.close(); } public boolean isComplete() { // If we have more reactions we're not done. if (!_reactions.isEmpty()) { return false; } // If we have no reactions, but there are some channels that have // external references, we are not done. for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) { if (i.next().refCount > 0) { return false; } } return true; } public void dumpState(PrintStream ps) { ps.print(this.toString()); ps.println(" state dump:"); ps.println("-- GENERAL INFO"); ps.println(" Current Cycle : " + _currentCycle); ps.println(" Num. Reactions : " + _reactions.size()); _statistics.printStatistics(ps); if (!_reactions.isEmpty()) { ps.println("-- REACTIONS"); int cnt = 0; for (Iterator i = _reactions.iterator(); i.hasNext();) { Continuation continuation = (Continuation) i.next(); ps.println(" #" + (++cnt) + ": " + continuation.toString()); } } } private void matchCommunications(CommChannel channel) { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("matchCommunications", new Object[] { "channel", channel })); } ChannelFrame cframe = _channels.get(channel.getId()); while (cframe != null && !cframe.msgFrames.isEmpty() && !cframe.objFrames.isEmpty()) { MessageFrame mframe = cframe.msgFrames.iterator().next(); ObjectFrame oframe = cframe.objFrames.iterator().next(); Continuation continuation = new Continuation(oframe._continuation, oframe._continuation .getMethod(mframe.method), mframe.args); if (__log.isInfoEnabled()) { continuation.setDescription(channel + " ? {...} | " + channel + " ! " + mframe.method + "(...)"); } enqueueReaction(continuation); if (!mframe.commGroupFrame.replicated) { removeCommGroup(mframe.commGroupFrame); }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?