📄 entitysynccontext.java
字号:
/** * To see if it is running check: * - in the running status * - AND when the entitySync was last updated, and if it was more than maxRunningNoUpdateMillis ago, then don't consider it to be running * @return boolean representing if the EntitySync should be considered running */ public boolean isEntitySyncRunning() { boolean isInRunning = ("ESR_RUNNING".equals(this.entitySync.getString("runStatusId")) || "ESR_PENDING".equals(this.entitySync.getString("runStatusId"))); if (!isInRunning) { return false; } Timestamp esLastUpdated = this.entitySync.getTimestamp(ModelEntity.STAMP_FIELD); if (esLastUpdated == null) { // shouldn't ever happen, but just in case; assume is running if we don't know when it was last updated return true; } long esLastUpdatedMillis = esLastUpdated.getTime(); long nowTimestampMillis = UtilDateTime.nowTimestamp().getTime(); long timeSinceUpdated = nowTimestampMillis - esLastUpdatedMillis; if (timeSinceUpdated > this.maxRunningNoUpdateMillis) { // it has been longer than the maxRunningNoUpdateMillis, so don't consider it running return false; } return true; } public boolean hasMoreTimeToSync() { return currentRunStartTime.before(syncEndStamp); } protected void setCurrentRunEndTime() { this.currentRunEndTime = getNextRunEndTime(); } protected Timestamp getNextRunEndTime() { long syncSplit = this.isOfflineSync ? offlineSyncSplitMillis : syncSplitMillis; Timestamp nextRunEndTime = new Timestamp(this.currentRunStartTime.getTime() + syncSplit); if (nextRunEndTime.after(this.syncEndStamp)) { nextRunEndTime = this.syncEndStamp; } return nextRunEndTime; } public void advanceRunTimes() { this.currentRunStartTime = this.currentRunEndTime; this.setCurrentRunEndTime(); } public void setSplitStartTime() { this.splitStartTime = System.currentTimeMillis(); } protected static long getSyncSplitMillis(GenericValue entitySync) { long splitMillis = defaultSyncSplitMillis; Long syncSplitMillis = entitySync.getLong("syncSplitMillis"); if (syncSplitMillis != null) { splitMillis = syncSplitMillis.longValue(); } return splitMillis; } protected static long getOfflineSyncSplitMillis(GenericValue entitySync) { long splitMillis = defaultOfflineSyncSplitMillis; Long syncSplitMillis = entitySync.getLong("offlineSyncSplitMillis"); if (syncSplitMillis != null) { splitMillis = syncSplitMillis.longValue(); } return splitMillis; } protected static long getSyncEndBufferMillis(GenericValue entitySync) { long syncEndBufferMillis = defaultSyncEndBufferMillis; Long syncEndBufferMillisLong = entitySync.getLong("syncEndBufferMillis"); if (syncEndBufferMillisLong != null) { syncEndBufferMillis = syncEndBufferMillisLong.longValue(); } return syncEndBufferMillis; } protected static long getMaxRunningNoUpdateMillis(GenericValue entitySync) { long maxRunningNoUpdateMillis = defaultMaxRunningNoUpdateMillis; Long maxRunningNoUpdateMillisLong = entitySync.getLong("maxRunningNoUpdateMillis"); if (maxRunningNoUpdateMillisLong != null) { maxRunningNoUpdateMillis = maxRunningNoUpdateMillisLong.longValue(); } return maxRunningNoUpdateMillis; } /** create history record, target service should run in own tx */ public void createInitialHistory() throws SyncDataErrorException, SyncServiceErrorException { String errorMsg = "Not running EntitySync [" + entitySyncId + "], could not create EntitySyncHistory"; try { Map initialHistoryRes = dispatcher.runSync("createEntitySyncHistory", UtilMisc.toMap("entitySyncId", entitySyncId, "runStatusId", "ESR_RUNNING", "beginningSynchTime", this.currentRunStartTime, "lastCandidateEndTime", this.currentRunEndTime, "userLogin", userLogin)); if (ServiceUtil.isError(initialHistoryRes)) { throw new SyncDataErrorException(errorMsg, null, null, initialHistoryRes, null); } this.startDate = (Timestamp) initialHistoryRes.get("startDate"); } catch (GenericServiceException e) { throw new SyncServiceErrorException(errorMsg, e); } } public ArrayList assembleValuesToCreate() throws SyncDataErrorException { // first grab all values inserted in the date range, then get the updates (leaving out all values inserted in the data range) ArrayList valuesToCreate = new ArrayList(); // make it an ArrayList to easily merge in sorted lists if (this.nextCreateTxTime != null && (this.nextCreateTxTime.equals(currentRunEndTime) || this.nextCreateTxTime.after(currentRunEndTime))) { // this means that for all entities in this pack we found on the last pass that there would be nothing for this one, so just return nothing... return valuesToCreate; } //Debug.logInfo("Getting values to create; currentRunStartTime=" + currentRunStartTime + ", currentRunEndTime=" + currentRunEndTime, module); int entitiesSkippedForKnownNext = 0; // iterate through entities, get all records with tx stamp in the current time range, put all in a single list Iterator entityModelToUseCreateIter = entityModelToUseList.iterator(); while (entityModelToUseCreateIter.hasNext()) { int insertBefore = 0; ModelEntity modelEntity = (ModelEntity) entityModelToUseCreateIter.next(); // first test to see if we know that there are no records for this entity in this time period... Timestamp knownNextCreateTime = (Timestamp) this.nextEntityCreateTxTime.get(modelEntity.getEntityName()); if (knownNextCreateTime != null && (knownNextCreateTime.equals(currentRunEndTime) || knownNextCreateTime.after(currentRunEndTime))) { //Debug.logInfo("In assembleValuesToCreate found knownNextCreateTime [" + knownNextCreateTime + "] after currentRunEndTime [" + currentRunEndTime + "], so skipping time per period for entity [" + modelEntity.getEntityName() + "]", module); entitiesSkippedForKnownNext++; continue; } boolean beganTransaction = false; try { beganTransaction = TransactionUtil.begin(7200); } catch (GenericTransactionException e) { throw new SyncDataErrorException("Unable to begin JTA transaction", e); } try { // get the values created within the current time range EntityCondition findValCondition = new EntityConditionList(UtilMisc.toList( new EntityExpr(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunStartTime), new EntityExpr(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.LESS_THAN, currentRunEndTime)), EntityOperator.AND); EntityListIterator eli = delegator.findListIteratorByCondition(modelEntity.getEntityName(), findValCondition, null, UtilMisc.toList(ModelEntity.CREATE_STAMP_TX_FIELD, ModelEntity.CREATE_STAMP_FIELD)); GenericValue nextValue = null; long valuesPerEntity = 0; while ((nextValue = (GenericValue) eli.next()) != null) { // sort by the tx stamp and then the record stamp // find first value in valuesToStore list, starting with the current insertBefore value, that has a CREATE_STAMP_TX_FIELD after the nextValue.CREATE_STAMP_TX_FIELD, then do the same with CREATE_STAMP_FIELD while (insertBefore < valuesToCreate.size() && ((GenericValue) valuesToCreate.get(insertBefore)).getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD).before(nextValue.getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD))) { insertBefore++; } while (insertBefore < valuesToCreate.size() && ((GenericValue) valuesToCreate.get(insertBefore)).getTimestamp(ModelEntity.CREATE_STAMP_FIELD).before(nextValue.getTimestamp(ModelEntity.CREATE_STAMP_FIELD))) { insertBefore++; } valuesToCreate.add(insertBefore, nextValue); valuesPerEntity++; } eli.close(); // definately remove this message and related data gathering //long preCount = delegator.findCountByCondition(modelEntity.getEntityName(), findValCondition, null); //long entityTotalCount = delegator.findCountByCondition(modelEntity.getEntityName(), null, null); //if (entityTotalCount > 0 || preCount > 0 || valuesPerEntity > 0) Debug.logInfo("Got " + valuesPerEntity + "/" + preCount + "/" + entityTotalCount + " values for entity " + modelEntity.getEntityName(), module); // if we didn't find anything for this entity, find the next value's Timestamp and keep track of it if (valuesPerEntity == 0) { Timestamp startCheckStamp = new Timestamp(System.currentTimeMillis() - syncEndBufferMillis); EntityCondition findNextCondition = new EntityConditionList(UtilMisc.toList( new EntityExpr(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.NOT_EQUAL, null), new EntityExpr(ModelEntity.CREATE_STAMP_TX_FIELD, EntityOperator.GREATER_THAN_EQUAL_TO, currentRunEndTime)), EntityOperator.AND); EntityListIterator eliNext = delegator.findListIteratorByCondition(modelEntity.getEntityName(), findNextCondition, null, UtilMisc.toList(ModelEntity.CREATE_STAMP_TX_FIELD)); // get the first element and it's tx time value... GenericValue firstVal = (GenericValue) eliNext.next(); eliNext.close(); Timestamp nextTxTime; if (firstVal != null) { nextTxTime = firstVal.getTimestamp(ModelEntity.CREATE_STAMP_TX_FIELD); } else { // no results? well, then it's safe to say that up to the pre-querytime (minus the buffer, as usual) we are okay nextTxTime = startCheckStamp; } if (this.nextCreateTxTime == null || nextTxTime.before(this.nextCreateTxTime)) { this.nextCreateTxTime = nextTxTime; Debug.logInfo("EntitySync: Set nextCreateTxTime to [" + nextTxTime + "]", module); } Timestamp curEntityNextTxTime = (Timestamp) this.nextEntityCreateTxTime.get(modelEntity.getEntityName()); if (curEntityNextTxTime == null || nextTxTime.before(curEntityNextTxTime)) { this.nextEntityCreateTxTime.put(modelEntity.getEntityName(), nextTxTime); Debug.logInfo("EntitySync: Set nextEntityCreateTxTime to [" + nextTxTime + "] for the entity [" + modelEntity.getEntityName() + "]", module); } } } catch (GenericEntityException e) { try { TransactionUtil.rollback(beganTransaction, "Entity Engine error in assembleValuesToCreate", e); } catch (GenericTransactionException e2) { Debug.logWarning(e2, "Unable to call rollback()", module); } throw new SyncDataErrorException("Error getting values to create from the datasource", e); } catch (Throwable t) { try { TransactionUtil.rollback(beganTransaction, "Throwable error in assembleValuesToCreate", t); } catch (GenericTransactionException e2) { Debug.logWarning(e2, "Unable to call rollback()", module); } throw new SyncDataErrorException("Caught runtime error while getting values to create", t); } try { TransactionUtil.commit(beganTransaction); } catch (GenericTransactionException e) { throw new SyncDataErrorException("Commit transaction failed", e); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -