📄 jobmanager.java
字号:
/* * $Id: JobManager.java 5462 2005-08-05 18:35:48Z jonesde $ * * Copyright (c) 2001, 2002 The Open For Business Project - www.ofbiz.org * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included * in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR * THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */package org.ofbiz.service.job;import java.io.IOException;import java.util.ArrayList;import java.util.Collection;import java.util.Date;import java.util.Iterator;import java.util.List;import java.util.Map;import java.sql.Timestamp;import org.ofbiz.base.util.Debug;import org.ofbiz.base.util.UtilDateTime;import org.ofbiz.base.util.UtilMisc;import org.ofbiz.base.util.UtilProperties;import org.ofbiz.base.util.UtilValidate;import org.ofbiz.entity.GenericDelegator;import org.ofbiz.entity.GenericEntityException;import org.ofbiz.entity.GenericValue;import org.ofbiz.entity.condition.EntityCondition;import org.ofbiz.entity.condition.EntityConditionList;import org.ofbiz.entity.condition.EntityExpr;import org.ofbiz.entity.condition.EntityOperator;import org.ofbiz.entity.serialize.SerializeException;import org.ofbiz.entity.serialize.XmlSerializer;import org.ofbiz.entity.transaction.GenericTransactionException;import org.ofbiz.entity.transaction.TransactionUtil;import org.ofbiz.service.DispatchContext;import org.ofbiz.service.GenericDispatcher;import org.ofbiz.service.GenericServiceException;import org.ofbiz.service.LocalDispatcher;import org.ofbiz.service.calendar.RecurrenceInfo;import org.ofbiz.service.calendar.RecurrenceInfoException;import org.ofbiz.service.config.ServiceConfigUtil;/** * JobManager * * @author <a href="mailto:jaz@ofbiz.org">Andy Zeneski</a> * @version $Rev: 5462 $ * @since 2.0 */public class JobManager { public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); public static final Map updateFields = UtilMisc.toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); public static final String module = JobManager.class.getName(); public static final String dispatcherName = "JobDispatcher"; protected GenericDelegator delegator; protected JobPoller jp; /** Creates a new JobManager object. */ public JobManager(GenericDelegator delegator) { this.delegator = delegator; jp = new JobPoller(this); } /** Queues a Job to run now. */ public void runJob(Job job) throws JobManagerException { if (job.isValid()) jp.queueNow(job); } /** Returns the ServiceDispatcher. */ public LocalDispatcher getDispatcher() { LocalDispatcher thisDispatcher = null; try { thisDispatcher = GenericDispatcher.getLocalDispatcher(dispatcherName, delegator); } catch (GenericServiceException e) { Debug.logError(e, module); } return thisDispatcher; } /** Returns the GenericDelegator. */ public GenericDelegator getDelegator() { return this.delegator; } public synchronized Iterator poll() { List poll = new ArrayList(); Collection jobEnt = null; // sort the results by time List order = UtilMisc.toList("runTime"); // basic query List expressions = UtilMisc.toList(new EntityExpr("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()), new EntityExpr("startDateTime", EntityOperator.EQUALS, null), new EntityExpr("cancelDateTime", EntityOperator.EQUALS, null), new EntityExpr("runByInstanceId", EntityOperator.EQUALS, null)); // limit to just defined pools List pools = ServiceConfigUtil.getRunPools(); List poolsExpr = UtilMisc.toList(new EntityExpr("poolId", EntityOperator.EQUALS, null)); if (pools != null) { Iterator poolsIter = pools.iterator(); while (poolsIter.hasNext()) { String poolName = (String) poolsIter.next(); poolsExpr.add(new EntityExpr("poolId", EntityOperator.EQUALS, poolName)); } } // make the conditions EntityCondition baseCondition = new EntityConditionList(expressions, EntityOperator.AND); EntityCondition poolCondition = new EntityConditionList(poolsExpr, EntityOperator.OR); EntityCondition mainCondition = new EntityConditionList(UtilMisc.toList(baseCondition, poolCondition), EntityOperator.AND); // we will loop until we have no more to do boolean pollDone = false; while (!pollDone) { boolean beganTransaction; try { beganTransaction = TransactionUtil.begin(); } catch (GenericTransactionException e) { Debug.logError(e, "Unable to start transaction; not polling for jobs", module); return null; } if (!beganTransaction) { Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); return null; } try { // first update the jobs w/ this instance running information delegator.storeByCondition("JobSandbox", updateFields, mainCondition); // now query all the 'queued' jobs for this instance jobEnt = delegator.findByAnd("JobSandbox", updateFields, order); //jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order); } catch (GenericEntityException ee) { Debug.logError(ee, "Cannot load jobs from datasource.", module); } catch (Exception e) { Debug.logError(e, "Unknown error.", module); } if (jobEnt != null && jobEnt.size() > 0) { Iterator i = jobEnt.iterator(); while (i.hasNext()) { GenericValue v = (GenericValue) i.next(); DispatchContext dctx = getDispatcher().getDispatchContext(); if (dctx == null) { Debug.logError("Unable to locate DispatchContext object; not running job!", module); continue; } Job job = new PersistedServiceJob(dctx, v, null); // todo fix the requester try { job.queue(); poll.add(job); } catch (InvalidJobException e) { Debug.logError(e, module); } } } else { pollDone = true; } // finished this run; commit the transaction try { TransactionUtil.commit(beganTransaction); } catch (GenericTransactionException e) { Debug.logError(e, module); } } return poll.iterator(); } public synchronized void reloadCrashedJobs() { String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); List toStore = new ArrayList(); List crashed = null; List exprs = UtilMisc.toList(new EntityExpr("finishDateTime", EntityOperator.EQUALS, null)); exprs.add(new EntityExpr("cancelDateTime", EntityOperator.EQUALS, null)); exprs.add(new EntityExpr("runByInstanceId", EntityOperator.EQUALS, instanceId)); try { crashed = delegator.findByAnd("JobSandbox", exprs, UtilMisc.toList("startDateTime")); } catch (GenericEntityException e) { Debug.logError(e, "Unable to load crashed jobs", module); } if (crashed != null && crashed.size() > 0) { Iterator i = crashed.iterator(); while (i.hasNext()) { GenericValue job = (GenericValue) i.next(); long runtime = job.getTimestamp("runTime").getTime(); RecurrenceInfo ri = JobManager.getRecurrenceInfo(job); if (ri != null) { long next = ri.next(); if (next <= runtime) { Timestamp now = UtilDateTime.nowTimestamp(); // only re-schedule if there is no new recurrences since last run Debug.log("Scheduling Job : " + job, module); String newJobId = job.getDelegator().getNextSeqId("JobSandbox").toString(); String pJobId = job.getString("parentJobId"); if (pJobId == null) { pJobId = job.getString("jobId"); } GenericValue newJob = GenericValue.create(job); newJob.set("statusId", "SERVICE_PENDING"); newJob.set("runTime", now); newJob.set("jobId", newJobId); newJob.set("previousJobId", job.getString("jobId")); newJob.set("parentJobId", pJobId); newJob.set("startDateTime", null); newJob.set("runByInstanceId", null); toStore.add(newJob); // set the cancel time on the old job to the same as the re-schedule time
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -