⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bpelengineimpl.java

📁 bpel执行引擎用来执行bpel业务流程
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    boolean isProcessRegistered(QName pid) {        return _activeProcesses.containsKey(pid);    }    /**     * Register a process with the engine.     * @param process the process to register     */    void registerProcess(BpelProcess process) {        _activeProcesses.put(process.getPID(), process);        for (Endpoint e : process.getServiceNames()) {            __log.debug("Register process: serviceId=" + e + ", process=" + process);            _serviceMap.put(e, process);        }        process.activate(this);    }    /**     * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes     * would not be registered under the same service qname but different endpoint.     *     * @param service     *            target service id     * @param request     *            request message     * @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.     */    BpelProcess route(QName service, Message request) {        // TODO: use the message to route to the correct service if more than        // one service is listening on the same endpoint.        BpelProcess routed = null;        for (Endpoint endpoint : _serviceMap.keySet()) {            if (endpoint.serviceName.equals(service))                routed = _serviceMap.get(endpoint);        }        if (__log.isDebugEnabled())            __log.debug("Routed: svcQname " + service + " --> " + routed);        return routed;    }    OProcess getOProcess(QName processId) {        BpelProcess process = _activeProcesses.get(processId);        if (process == null) return null;        return process.getOProcess();    }    public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {        final WorkEvent we = new WorkEvent(jobInfo.jobDetail);        // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,        // Note that we don't want to wait too long here to get our lock, since we are likely holding        // on to scheduler's locks of various sorts.        try {            _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);            _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {                public void afterCompletion(boolean success) {                    _instanceLockManager.unlock(we.getIID());                }                public void beforeCompletion() { }            });        } catch (InterruptedException e) {            // Retry later.            __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);            throw new Scheduler.JobProcessorException(true);        } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {            __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");            // TODO: This should really be more of something like the exponential backoff algorithm in ethernet.            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()                    + Math.min(randomExp(1000), 10000)));            return;        }        // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle        // all types of failure here, the scheduler is not going to know how to handle our errors,        // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come        // to a grinding halt.        try {            BpelProcess process;            if (we.getProcessId() != null) {                process = _activeProcesses.get(we.getProcessId());            } else {                ProcessInstanceDAO instance;                if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());                else instance = _contexts.dao.getConnection().getInstance(we.getIID());                if (instance == null) {                    __log.debug(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));                    // nothing we can do, this instance is not in the database, it will always fail, not                     // exactly an error since can occur in normal course of events.                    return;                }                ProcessDAO processDao = instance.getProcess();                process = _activeProcesses.get(processDao.getProcessId());            }            if (process == null) {                // If the process is not active, it means that we should not be                // doing any work on its behalf, therefore we will reschedule the                // events for some time in the future (1 minute).                Date future = new Date(System.currentTimeMillis() + (60 * 1000));                __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));                _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, future);                return;            }            process.handleWorkEvent(jobInfo.jobDetail);            debuggingDelay();        } catch (BpelEngineException bee) {            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);            throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee));        } catch (ContextException ce) {            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);            throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce));        } catch (RuntimeException rte) {            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);            throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte));        } catch (Throwable t) {            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);            throw new Scheduler.JobProcessorException(false);        }    }    private boolean checkRetry(final JobInfo jobInfo, Throwable t) {        // TODO, better handling of failed jobs (put them in the DB perhaps?)        if (jobInfo.retryCount < MAX_RETRIES)            return true;        __log.error("Job could not be completed after " + MAX_RETRIES + ": " + jobInfo, t);        boolean saveToDisk = false;        if (jobInfo.jobDetail.get("final") == null) {            __log.error("Rescheduling problematic job for a bit later: " + jobInfo, t);            try {                if (jobInfo.jobDetail.get("inmem") != null)                    _contexts.scheduler.scheduleVolatileJob(true, jobInfo.jobDetail);                else                    _contexts.scheduler.execIsolatedTransaction(new Callable<Void>() {                        public Void call() throws Exception {                            jobInfo.jobDetail.put("final", true);                            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail,                                    new Date(System.currentTimeMillis() + 60 * 1000));                            return null;                        }                    });            } catch (Exception ex) {                __log.error("Error rescheduling problematic job: " + jobInfo,ex);                saveToDisk = true;            }        } else {            saveToDisk = true;        }        if (saveToDisk)            try {                File f = File.createTempFile("ode-bad-job", ".ser", new File(""));                ObjectOutputStream fos = new ObjectOutputStream(new FileOutputStream(f));                fos.writeObject(jobInfo);                fos.close();                __log.error("Saved problematic job to disk (last resort): " + jobInfo +" in file " + f);            } catch (Exception ex) {                __log.error("Could not save bad job; it will be lost: " + jobInfo, ex);            }        // No more retries.        return false;    }    /**     * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially     * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.     */    private void debuggingDelay() {        // Do a delay for debugging purposes.        if (_delayMean != 0)            try {                long delay = randomExp(_delayMean);                // distribution                // with mean                // _delayMean                __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");                Thread.sleep(delay);            } catch (InterruptedException e) {                ; // ignore            }    }    private long randomExp(double mean) {        double u = _random.nextDouble(); // Uniform        long delay = (long) (-Math.log(u) * mean); // Exponential        return delay;    }    void fireEvent(BpelEvent event) {        // Note that the eventListeners list is a copy-on-write array, so need        // to mess with synchronization.        for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {   			l.onEvent(event);        }    }    /**     * Get the list of globally-registered message-exchange interceptors.     *     * @return list     */    List<MessageExchangeInterceptor> getGlobalInterceptors() {        return _contexts.globalIntereceptors;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -