executionqueueimpl.java

来自「bpel执行引擎用来执行bpel业务流程」· Java 代码 · 共 821 行 · 第 1/2 页

JAVA
821
字号
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.jacob.vpu;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.jacob.Channel;import org.apache.ode.jacob.ChannelListener;import org.apache.ode.jacob.IndexedObject;import org.apache.ode.jacob.JacobObject;import org.apache.ode.jacob.soup.Comm;import org.apache.ode.jacob.soup.CommChannel;import org.apache.ode.jacob.soup.CommGroup;import org.apache.ode.jacob.soup.CommRecv;import org.apache.ode.jacob.soup.CommSend;import org.apache.ode.jacob.soup.Continuation;import org.apache.ode.jacob.soup.ExecutionQueue;import org.apache.ode.jacob.soup.ExecutionQueueObject;import org.apache.ode.jacob.soup.ReplacementMap;import org.apache.ode.utils.CollectionUtils;import org.apache.ode.utils.ObjectPrinter;import java.io.Externalizable;import java.io.IOException;import java.io.InputStream;import java.io.ObjectInput;import java.io.ObjectInputStream;import java.io.ObjectOutput;import java.io.ObjectOutputStream;import java.io.ObjectStreamClass;import java.io.OutputStream;import java.io.PrintStream;import java.io.Serializable;import java.lang.reflect.Field;import java.lang.reflect.Method;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.zip.GZIPInputStream;import java.util.zip.GZIPOutputStream;/** * A fast, in-memory {@link org.apache.ode.jacob.soup.ExecutionQueue} implementation. */public class ExecutionQueueImpl implements ExecutionQueue {    /** Class-level logger. */    private static final Log __log = LogFactory.getLog(ExecutionQueueImpl.class);    private ClassLoader _classLoader;    private static ConcurrentHashMap<String, ObjectStreamClass> _classDescriptors        = new ConcurrentHashMap<String, ObjectStreamClass>();    /**     * Cached set of enqueued {@link Continuation} objects (i.e. those read using     * {@link #enqueueReaction(org.apache.ode.jacob.soup.Continuation)}).     * These reactions are "cached"--that is it is not sent directly to the DAO     * layer--to minimize unnecessary serialization/deserialization of closures.     * This is a pretty useful optimization, as most {@link Continuation}s are     * enqueued, and then immediately dequeued in the next cycle. By caching     * {@link Continuation}s, we eliminate practically all serialization of     * these objects, the only exception being cases where the system decides to     * stop processing a particular soup despite the soup being able to make     * forward progress; this scenario would occur if a maximum processign     * time-per-instance policy were in effect.     */    private Set<Continuation> _reactions = new HashSet<Continuation>();    private Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>();    /**     * The "expected" cycle counter, use to detect database serialization     * issues.     */    private int _currentCycle;    private int _objIdCounter;    private ExecutionQueueStatistics _statistics = new ExecutionQueueStatistics();    private ReplacementMap _replacementMap;    private Serializable _gdata;    private Map<Object, LinkedList<IndexedObject>> _index = new HashMap<Object, LinkedList<IndexedObject>>();    public ExecutionQueueImpl(ClassLoader classLoader) {        _classLoader = classLoader;    }    public void setClassLoader(ClassLoader classLoader) {        _classLoader = classLoader;    }    public void setReplacementMap(ReplacementMap replacementMap) {        _replacementMap = replacementMap;    }    public Map<Object, LinkedList<IndexedObject>> getIndex() {        return _index;    }    public void add(CommChannel channel) {        if (__log.isTraceEnabled())            __log.trace(ObjectPrinter.stringifyMethodEnter("add", new Object[] { "channel", channel }));        verifyNew(channel);        ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter, channel.getType().getName(), channel                .getDescription());        _channels.put(cframe.getId(), cframe);        assignId(channel, cframe.getId());    }    public void enqueueReaction(Continuation continuation) {        if (__log.isTraceEnabled())            __log.trace(ObjectPrinter.stringifyMethodEnter("enqueueReaction", new Object[] { "continuation",                    continuation }));        verifyNew(continuation);        _reactions.add(continuation);    }    public Continuation dequeueReaction() {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("dequeueReaction", CollectionUtils.EMPTY_OBJECT_ARRAY));        }        Continuation continuation = null;        if (!_reactions.isEmpty()) {            Iterator it = _reactions.iterator();            continuation = (Continuation) it.next();            it.remove();        }        return continuation;    }    public void add(CommGroup group) {        if (__log.isTraceEnabled())            __log.trace(ObjectPrinter.stringifyMethodEnter("add", new Object[] { "group", group }));        verifyNew(group);        CommGroupFrame commGroupFrame = new CommGroupFrame(group.isReplicated());        for (Iterator i = group.getElements(); i.hasNext();) {            Comm comm = (Comm) i.next();            ChannelFrame chnlFrame = findChannelFrame(comm.getChannel().getId());            if (comm instanceof CommSend) {                if (chnlFrame.replicatedSend) {                    // TODO: JACOB "bad-process" ex                    throw new IllegalStateException("Send attempted on channel containing replicated send! Channel= "                            + comm.getChannel());                }                if (group.isReplicated())                    chnlFrame.replicatedSend = true;                CommSend commSend = (CommSend) comm;                MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMethod().getName(),                        commSend.getArgs());                commGroupFrame.commFrames.add(mframe);                chnlFrame.msgFrames.add(mframe);            } else if (comm instanceof CommRecv) {                if (chnlFrame.replicatedRecv) {                    // TODO: JACOB "bad-process" ex                    throw new IllegalStateException(                            "Receive attempted on channel containing replicated receive! Channel= " + comm.getChannel());                }                if (group.isReplicated())                    chnlFrame.replicatedRecv = true;                CommRecv commRecv = (CommRecv) comm;                ObjectFrame oframe = new ObjectFrame(commGroupFrame, chnlFrame, commRecv.getContinuation());                commGroupFrame.commFrames.add(oframe);                chnlFrame.objFrames.add(oframe);            }        }        // Match communications.        for (Iterator i = group.getElements(); i.hasNext();) {            Comm comm = (Comm) i.next();            matchCommunications(comm.getChannel());        }    }    private ChannelFrame findChannelFrame(Object id) {        ChannelFrame chnlFrame = _channels.get(id);        if (chnlFrame == null) {            throw new IllegalArgumentException("No such channel; id=" + id);        }        return chnlFrame;    }    public int cycle() {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("cycle", CollectionUtils.EMPTY_OBJECT_ARRAY));        }        return ++_currentCycle;    }    public String createExport(CommChannel channel) {        if (__log.isTraceEnabled())            __log.trace(ObjectPrinter.stringifyMethodEnter("createExport", new Object[] { "channel", channel }));        ChannelFrame cframe = findChannelFrame(channel.getId());        cframe.refCount++;        return channel.getId().toString();    }    public CommChannel consumeExport(String exportId) {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("consumeExport", new Object[] { "exportId", exportId }));        }        Integer id = Integer.valueOf(exportId);        ChannelFrame cframe = findChannelFrame(id);        cframe.refCount--;        CommChannel commChannel = new CommChannel(cframe.type);        commChannel.setId(id);        commChannel.setDescription("EXPORTED CHANNEL");        return commChannel;    }    public boolean hasReactions() {        return !_reactions.isEmpty();    }    public void flush() {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("flush", CollectionUtils.EMPTY_OBJECT_ARRAY));        }    }    public void read(InputStream iis) throws IOException, ClassNotFoundException {        _channels.clear();        _reactions.clear();        _index.clear();        ExecutionQueueInputStream sis = new ExecutionQueueInputStream(iis);        _objIdCounter = sis.readInt();        _currentCycle = sis.readInt();        int reactions = sis.readInt();        for (int i = 0; i < reactions; ++i) {            JacobObject closure = (JacobObject) sis.readObject();            String methodName = sis.readUTF();            Method method = closure.getMethod(methodName);            int numArgs = sis.readInt();            Object[] args = new Object[numArgs];            for (int j = 0; j < numArgs; ++j) {                args[j] = sis.readObject();            }            _reactions.add(new Continuation(closure, method, args));        }        int numChannels = sis.readInt();        for (int i = 0; i < numChannels; ++i) {            int objFrames = sis.readInt();            for (int j = 0; j < objFrames; ++j) {                sis.readObject();            }            int msgFrames = sis.readInt();            for (int j = 0; j < msgFrames; ++j) {                sis.readObject();            }        }        numChannels = sis.readInt();        for (int i = 0; i < numChannels; ++i) {            ChannelFrame cframe = (ChannelFrame) sis.readObject();            _channels.put(cframe.getId(), cframe);        }        _gdata = (Serializable) sis.readObject();        sis.close();    }    private void index(IndexedObject object) {        LinkedList<IndexedObject> vals = _index.get(object.getKey());        if (vals == null) {            vals = new LinkedList<IndexedObject>();            _index.put(object.getKey(), vals);        }        vals.add(object);    }    public void write(OutputStream oos) throws IOException {        flush();        ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(oos);//        XQXMLOutputStream sos = createObjectOutputStream(new OutputStreamWriter(oos));        sos.writeInt(_objIdCounter);        sos.writeInt(_currentCycle);        // Write out the reactions.        sos.writeInt(_reactions.size());        for (Continuation c : _reactions) {            sos.writeObject(c.getClosure());            sos.writeUTF(c.getMethod().getName());            sos.writeInt(c.getArgs() == null ? 0 : c.getArgs().length);            for (int j = 0; c.getArgs() != null && j < c.getArgs().length; ++j)                sos.writeObject(c.getArgs()[j]);        }        sos.writeInt(_channels.values().size());        for (Iterator i = _channels.values().iterator(); i.hasNext();) {            ChannelFrame cframe = (ChannelFrame) i.next();            sos.writeInt(cframe.objFrames.size());            for (Iterator j = cframe.objFrames.iterator(); j.hasNext();) {                sos.writeObject(j.next());            }            sos.writeInt(cframe.msgFrames.size());            for (Iterator j = cframe.msgFrames.iterator(); j.hasNext();) {                sos.writeObject(j.next());            }        }        Set referencedChannels = sos.getSerializedChannels();        for (Iterator i = _channels.values().iterator(); i.hasNext();) {            ChannelFrame cframe = (ChannelFrame) i.next();            if (referencedChannels.contains(Integer.valueOf(cframe.id)) || cframe.refCount > 0) {                // skip            } else {                if (__log.isDebugEnabled())                    __log.debug("GC Channel: " + cframe);                i.remove();            }        }        sos.writeInt(_channels.values().size());        for (Iterator i = _channels.values().iterator(); i.hasNext();) {            ChannelFrame cframe = (ChannelFrame) i.next();            if (__log.isDebugEnabled()) {                __log.debug("Writing Channel: " + cframe);            }            sos.writeObject(cframe);        }        // Write the global data.        sos.writeObject(_gdata);        sos.close();    }    public boolean isComplete() {        // If we have more reactions we're not done.        if (!_reactions.isEmpty()) {            return false;        }        // If we have no reactions, but there are some channels that have        // external references, we are not done.        for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {            if (i.next().refCount > 0) {                return false;            }        }        return true;    }    public void dumpState(PrintStream ps) {        ps.print(this.toString());        ps.println(" state dump:");        ps.println("-- GENERAL INFO");        ps.println("   Current Cycle          : " + _currentCycle);        ps.println("   Num. Reactions  : " + _reactions.size());        _statistics.printStatistics(ps);        if (!_reactions.isEmpty()) {            ps.println("-- REACTIONS");            int cnt = 0;            for (Iterator i = _reactions.iterator(); i.hasNext();) {                Continuation continuation = (Continuation) i.next();                ps.println("   #" + (++cnt) + ":  " + continuation.toString());            }        }    }    private void matchCommunications(CommChannel channel) {        if (__log.isTraceEnabled()) {            __log.trace(ObjectPrinter.stringifyMethodEnter("matchCommunications", new Object[] { "channel", channel }));        }        ChannelFrame cframe = _channels.get(channel.getId());        while (cframe != null && !cframe.msgFrames.isEmpty() && !cframe.objFrames.isEmpty()) {            MessageFrame mframe = cframe.msgFrames.iterator().next();            ObjectFrame oframe = cframe.objFrames.iterator().next();            Continuation continuation = new Continuation(oframe._continuation, oframe._continuation                    .getMethod(mframe.method), mframe.args);            if (__log.isInfoEnabled()) {                continuation.setDescription(channel + " ? {...} | " + channel + " ! " + mframe.method + "(...)");            }            enqueueReaction(continuation);            if (!mframe.commGroupFrame.replicated) {                removeCommGroup(mframe.commGroupFrame);            }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?