📄 processexecutionengineimpl.java
字号:
// The MIT License
//
// Copyright (c) 2004 Evren Sirin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
/*
* Created on Dec 29, 2003
*
*/
package org.mindswap.owls.process.execution.impl;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.mindswap.owls.OWLSFactory;
import org.mindswap.owls.process.AtomicProcess;
import org.mindswap.owls.process.CompositeProcess;
import org.mindswap.owls.process.ControlConstruct;
import org.mindswap.owls.process.DataFlow;
import org.mindswap.owls.process.Process;
import org.mindswap.owls.process.ProcessComponentList;
import org.mindswap.owls.process.Sequence;
import org.mindswap.owls.process.Split;
import org.mindswap.owls.process.SplitJoin;
import org.mindswap.owls.process.Unordered;
import org.mindswap.owls.process.ValueMap;
import org.mindswap.owls.process.execution.ProcessExecutionEngine;
import org.mindswap.owls.process.execution.ProcessExecutionListener;
/**
* @author Evren Sirin
*
*/
public class ProcessExecutionEngineImpl implements ProcessExecutionEngine {
protected List executionListeners = new ArrayList();
/**
*
*/
public ProcessExecutionEngineImpl() {
}
public void notifyListeners(String msg) {
Iterator i = executionListeners.iterator();
while(i.hasNext()) {
ProcessExecutionListener l = (ProcessExecutionListener) i.next();
l.printMessage(msg);
}
}
public void setCurrentExecuteService(Process p) {
Iterator i = executionListeners.iterator();
while (i.hasNext()) {
ProcessExecutionListener l = (ProcessExecutionListener) i.next();
l.setCurrentExecuteService(p);
}
}
public void finishExecution(int retCode) {
Iterator i = executionListeners.iterator();
while (i.hasNext())
{
ProcessExecutionListener l = (ProcessExecutionListener) i.next();
l.finishExecution(retCode);
}
}
/* (non-Javadoc)
* @see org.mindswap.owls.process.execution.ProcessExecutionEngine#addExecutionListener(org.mindswap.owls.process.execution.ProcessExecutionListener)
*/
public void addExecutionListener(ProcessExecutionListener listener) {
executionListeners.add(listener);
}
public ValueMap execute(Process p) throws Exception { return execute(p, OWLSFactory.createValueMap()); }
public ValueMap execute(Process p, ValueMap values) throws Exception {
ValueMap valueMap = OWLSFactory.createValueMap();
setCurrentExecuteService(p);
valueMap.addMap(values);
valueMap.addMap(p.getDefaultValues());
valueMap.retainValues(p.getInputs());
if(p instanceof AtomicProcess) {
valueMap = executeAtomic((AtomicProcess) p, valueMap);
}
else if(p instanceof CompositeProcess) {
valueMap = executeConstruct(((CompositeProcess) p).getComposedOf(), valueMap, p.getDataFlow());
}
valueMap.retainValues(p.getOutputs());
notifyListeners("[DONE]");
return valueMap;
}
private ValueMap executeAtomic(AtomicProcess p, ValueMap values) throws Exception {
return p.getGrounding().invoke(values);
}
private ValueMap executeConstruct(ControlConstruct c, ValueMap values, DataFlow dataFlow) throws Exception {
if(c instanceof Sequence)
return executeOrdered(c, values, dataFlow);
else if(c instanceof Unordered)
// Unordered says it doesn't matter in which orders subelements
// are executed so the initial order is also ok. By checking
// preconditions We could also
// check different orderings and find the one that is
return executeOrdered(c, values, dataFlow);
else if(c instanceof Split)
return executeParallel(c, values, dataFlow, false);
else if(c instanceof SplitJoin)
return executeParallel(c, values, dataFlow, true);
return null;
}
private ValueMap executeOrdered(ControlConstruct s, ValueMap values, DataFlow dataFlow) throws Exception {
ValueMap allValues = OWLSFactory.createValueMap();
allValues.addMap(values);
ProcessComponentList processList = s.getComponents();
for(int i = 0; i < processList.size(); i++) {
Process process = (Process) processList.get(i);
allValues.applyDataFlow(dataFlow);
ValueMap result = execute(process, allValues);
boolean success = (result != null);
if(!success) {
notifyListeners("[ERROR]\n");
notifyListeners("Execution Stopped");
finishExecution(ProcessExecutionListener.EXEC_ERROR); // done
return null;
}
else {
allValues.addMap(result);
allValues.applyDataFlow(dataFlow);
}
}
return allValues;
}
class ProcessExecutionThread extends Thread {
Process process;
ValueMap values;
ValueMap result;
ProcessExecutionThread(Process process, ValueMap values) {
this.process = process;
this.values = values;
}
public void run() {
try {
result = execute(process, values);
} catch (Exception e) {
notifyListeners("A thread in parallel execution failed for process " + process);
}
}
public boolean isSuccess() {
return (result != null);
}
public ValueMap getResult() {
return result;
}
}
private ValueMap executeParallel(ControlConstruct s, ValueMap values, DataFlow dataFlow, boolean join) {
ValueMap allValues = OWLSFactory.createValueMap();
allValues.addMap(values);
ProcessComponentList processList = s.getComponents();
ProcessExecutionThread[] threads = new ProcessExecutionThread[processList.size()];
for(int i = 0; i < processList.size(); i++) {
DataFlow df = OWLSFactory.createDataFlow();
df.addAll(dataFlow);
df.addAll(s.getDataFlow());
Process process = (Process) processList.get(i);
threads[i] = new ProcessExecutionThread(process, allValues);
setCurrentExecuteService(process);
threads[i].run();
}
if(join) {
for(int i = 0; i < processList.size(); i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
notifyListeners("[ERROR]\n");
notifyListeners("Execution Stopped\n");
finishExecution(ProcessExecutionListener.EXEC_ERROR); // done
return null;
}
}
}
notifyListeners("Execution completed\n");
return allValues;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -