📄 bpelengineimpl.java
字号:
boolean isProcessRegistered(QName pid) { return _activeProcesses.containsKey(pid); } /** * Register a process with the engine. * @param process the process to register */ void registerProcess(BpelProcess process) { _activeProcesses.put(process.getPID(), process); for (Endpoint e : process.getServiceNames()) { __log.debug("Register process: serviceId=" + e + ", process=" + process); _serviceMap.put(e, process); } process.activate(this); } /** * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes * would not be registered under the same service qname but different endpoint. * * @param service * target service id * @param request * request message * @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized. */ BpelProcess route(QName service, Message request) { // TODO: use the message to route to the correct service if more than // one service is listening on the same endpoint. BpelProcess routed = null; for (Endpoint endpoint : _serviceMap.keySet()) { if (endpoint.serviceName.equals(service)) routed = _serviceMap.get(endpoint); } if (__log.isDebugEnabled()) __log.debug("Routed: svcQname " + service + " --> " + routed); return routed; } OProcess getOProcess(QName processId) { BpelProcess process = _activeProcesses.get(processId); if (process == null) return null; return process.getOProcess(); } public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException { final WorkEvent we = new WorkEvent(jobInfo.jobDetail); // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks, // Note that we don't want to wait too long here to get our lock, since we are likely holding // on to scheduler's locks of various sorts. try { _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS); _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() { public void afterCompletion(boolean success) { _instanceLockManager.unlock(we.getIID()); } public void beforeCompletion() { } }); } catch (InterruptedException e) { // Retry later. __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo); throw new Scheduler.JobProcessorException(true); } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) { __log.debug("Instance " + we.getIID() + " is busy, rescheduling job."); // TODO: This should really be more of something like the exponential backoff algorithm in ethernet. _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis() + Math.min(randomExp(1000), 10000))); return; } // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle // all types of failure here, the scheduler is not going to know how to handle our errors, // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come // to a grinding halt. try { BpelProcess process; if (we.getProcessId() != null) { process = _activeProcesses.get(we.getProcessId()); } else { ProcessInstanceDAO instance; if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID()); else instance = _contexts.dao.getConnection().getInstance(we.getIID()); if (instance == null) { __log.debug(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); // nothing we can do, this instance is not in the database, it will always fail, not // exactly an error since can occur in normal course of events. return; } ProcessDAO processDao = instance.getProcess(); process = _activeProcesses.get(processDao.getProcessId()); } if (process == null) { // If the process is not active, it means that we should not be // doing any work on its behalf, therefore we will reschedule the // events for some time in the future (1 minute). Date future = new Date(System.currentTimeMillis() + (60 * 1000)); __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future)); _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, future); return; } process.handleWorkEvent(jobInfo.jobDetail); debuggingDelay(); } catch (BpelEngineException bee) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee)); } catch (ContextException ce) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce); throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte)); } catch (Throwable t) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new Scheduler.JobProcessorException(false); } } private boolean checkRetry(final JobInfo jobInfo, Throwable t) { // TODO, better handling of failed jobs (put them in the DB perhaps?) if (jobInfo.retryCount < MAX_RETRIES) return true; __log.error("Job could not be completed after " + MAX_RETRIES + ": " + jobInfo, t); boolean saveToDisk = false; if (jobInfo.jobDetail.get("final") == null) { __log.error("Rescheduling problematic job for a bit later: " + jobInfo, t); try { if (jobInfo.jobDetail.get("inmem") != null) _contexts.scheduler.scheduleVolatileJob(true, jobInfo.jobDetail); else _contexts.scheduler.execIsolatedTransaction(new Callable<Void>() { public Void call() throws Exception { jobInfo.jobDetail.put("final", true); _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis() + 60 * 1000)); return null; } }); } catch (Exception ex) { __log.error("Error rescheduling problematic job: " + jobInfo,ex); saveToDisk = true; } } else { saveToDisk = true; } if (saveToDisk) try { File f = File.createTempFile("ode-bad-job", ".ser", new File("")); ObjectOutputStream fos = new ObjectOutputStream(new FileOutputStream(f)); fos.writeObject(jobInfo); fos.close(); __log.error("Saved problematic job to disk (last resort): " + jobInfo +" in file " + f); } catch (Exception ex) { __log.error("Could not save bad job; it will be lost: " + jobInfo, ex); } // No more retries. return false; } /** * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable. */ private void debuggingDelay() { // Do a delay for debugging purposes. if (_delayMean != 0) try { long delay = randomExp(_delayMean); // distribution // with mean // _delayMean __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms."); Thread.sleep(delay); } catch (InterruptedException e) { ; // ignore } } private long randomExp(double mean) { double u = _random.nextDouble(); // Uniform long delay = (long) (-Math.log(u) * mean); // Exponential return delay; } void fireEvent(BpelEvent event) { // Note that the eventListeners list is a copy-on-write array, so need // to mess with synchronization. for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) { l.onEvent(event); } } /** * Get the list of globally-registered message-exchange interceptors. * * @return list */ List<MessageExchangeInterceptor> getGlobalInterceptors() { return _contexts.globalIntereceptors; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -