📄 bpelserverimpl.java
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.ode.bpel.engine;import java.util.ArrayList;import java.util.HashSet;import java.util.List;import java.util.Properties;import java.util.Set;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;import javax.xml.namespace.QName;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.ode.bpel.dao.BpelDAOConnection;import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;import org.apache.ode.bpel.dao.ProcessDAO;import org.apache.ode.bpel.evt.BpelEvent;import org.apache.ode.bpel.iapi.BindingContext;import org.apache.ode.bpel.iapi.BpelEngine;import org.apache.ode.bpel.iapi.BpelEngineException;import org.apache.ode.bpel.iapi.BpelEventListener;import org.apache.ode.bpel.iapi.BpelServer;import org.apache.ode.bpel.iapi.EndpointReferenceContext;import org.apache.ode.bpel.iapi.MessageExchangeContext;import org.apache.ode.bpel.iapi.ProcessConf;import org.apache.ode.bpel.iapi.Scheduler;import org.apache.ode.bpel.iapi.Scheduler.JobInfo;import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;import org.apache.ode.bpel.o.OProcess;import org.apache.ode.utils.msg.MessageBundle;import org.apache.ode.utils.stl.CollectionsX;import org.apache.ode.utils.stl.MemberOfFunction;import org.apache.ode.bpel.evar.ExternalVariableModule;/** * <p> * The BPEL server implementation. * </p> * * <p> * This implementation is intended to be thread safe. The key concurrency * mechanism is a "management" read/write lock that synchronizes all management * operations (they require "write" access) and prevents concurrent management * operations and processing (processing requires "read" access). Write access * to the lock is scoped to the method, while read access is scoped to a * transaction. * </p> * * @author Maciej Szefler <mszefler at gmail dot com> * @author Matthieu Riou <mriou at apache dot org> */public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor { private static final Log __log = LogFactory.getLog(BpelServerImpl.class); private static final Messages __msgs = MessageBundle.getMessages(Messages.class); /** Maximum age of a process before it is quiesced */ private static Long __processMaxAge; /** * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. * Guarded by _mngmtLock.writeLock(). */ private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>(); private State _state = State.SHUTDOWN; private Contexts _contexts = new Contexts(); private DehydrationPolicy _dehydrationPolicy; private Properties _configProperties; BpelEngineImpl _engine; BpelDatabase _db; /** * Management lock for synchronizing management operations and preventing * processing (transactions) from occuring while management operations are * in progress. */ private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock(); static { // TODO Clean this up and factorize engine configuration try { String processMaxAge = System.getProperty("ode.process.maxage"); if (processMaxAge != null && processMaxAge.length() > 0) { __processMaxAge = Long.valueOf(processMaxAge); __log.info("Process definition max age adjusted. Max age = " + __processMaxAge + "ms."); } } catch (Throwable t) { if (__log.isDebugEnabled()) { __log.debug("Could not parse ode.process.maxage environment variable.", t); } else { __log.info("Could not parse ode.process.maxage environment variable; reaping disabled."); } } } private enum State { SHUTDOWN, INIT, RUNNING } public BpelServerImpl() { } public void start() { _mngmtLock.writeLock().lock(); try { if (!checkState(State.INIT, State.RUNNING)) { __log.debug("start() ignored -- already started"); return; } __log.debug("BPEL SERVER starting."); _contexts.scheduler.start(); _state = State.RUNNING; __log.info(__msgs.msgServerStarted()); if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start(); } finally { _mngmtLock.writeLock().unlock(); } } public void registerExternalVariableEngine(ExternalVariableModule eve) { _contexts.externalVariableEngines.put(eve.getName(), eve); } /** * Register a global listener to receive {@link BpelEvent}s froom all * processes. * @param listener */ public void registerBpelEventListener(BpelEventListener listener) { // Do not synchronize, eventListeners is copy-on-write array. listener.startup(_configProperties); _contexts.eventListeners.add(listener); } /** * Unregister a global listener from receive {@link BpelEvent}s from all * processes. * @param listener */ public void unregisterBpelEventListener(BpelEventListener listener) { // Do not synchronize, eventListeners is copy-on-write array. try { listener.shutdown(); } catch (Exception e) { __log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e); } finally { _contexts.eventListeners.remove(listener); } } private void unregisterBpelEventListeners() { for (BpelEventListener l : _contexts.eventListeners) { unregisterBpelEventListener(l); } } public void stop() { _mngmtLock.writeLock().lock(); try { if (!checkState(State.RUNNING, State.INIT)) { __log.debug("stop() ignored -- already stopped"); return; } __log.debug("BPEL SERVER STOPPING"); _contexts.scheduler.stop(); _engine = null; _state = State.INIT; __log.info(__msgs.msgServerStopped()); } finally { _mngmtLock.writeLock().unlock(); } } public void init() throws BpelEngineException { _mngmtLock.writeLock().lock(); try { if (!checkState(State.SHUTDOWN, State.INIT)) return; __log.debug("BPEL SERVER initializing "); _db = new BpelDatabase(_contexts.dao, _contexts.scheduler); _state = State.INIT; _engine = new BpelEngineImpl(_contexts); } finally { _mngmtLock.writeLock().unlock(); } } public void shutdown() throws BpelEngineException { _mngmtLock.writeLock().lock(); try { stop(); unregisterBpelEventListeners(); _db = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -