📄 staxjob.java
字号:
else { codeObject = __builtin__.compile( codeString, "<pyExec string>", "exec"); } fCompiledPyCodeCache.put(codeString, codeObject); } else { if (COUNT_PYCODE_CACHES) fCompiledPyCodeCacheGets++; } return codeObject; } } // Queue listener methods public void registerSTAFQueueListener(String msgType, STAXSTAFQueueListener listener) { synchronized (fQueueListenerMap) { TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType); if (listenerSet == null) { listenerSet = new TreeSet(new STAXObjectComparator()); fQueueListenerMap.put(msgType, listenerSet); } listenerSet.add(listener); } } public void unregisterSTAFQueueListener(String msgType, STAXSTAFQueueListener listener) { synchronized (fQueueListenerMap) { TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType); if (listenerSet != null) listenerSet.remove(listener); } } // // Data management functions // public boolean setData(String dataName, Object data) { // Return true if added successfully, else return false. synchronized (fDataMap) { if (!fDataMap.containsKey(dataName)) { fDataMap.put(dataName, data); return true; } } return false; } public Object getData(String dataName) { synchronized (fDataMap) { return fDataMap.get(dataName); } } public boolean removeData(String dataName) { // Return true if removed successfully, else return false. synchronized (fDataMap) { if (fDataMap.containsKey(dataName)) { fDataMap.remove(dataName); return true; } } return false; } // // Execution methods // public void startExecution() throws STAFException, STAXException { fHandle = new STAFHandle("STAX/Job/" + fJobNumber); if (fClearlogs) clearLogs(); // Assign the name of the directory that the job can store data in fJobDataDir = fSTAX.getDataDir() + STAX.fileSep + "job" + STAX.fileSep + "Job" + fJobNumber; clearJobDataDir(); fSTAFQueueMonitor = new STAFQueueMonitor(this); fSTAFQueueMonitor.start(); // Initialize data for the Job fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this) { public void visit(Object o, Iterator iter) { STAXJobManagementHandler handler = (STAXJobManagementHandler)o; handler.initJob((STAXJob)fData); } }); // Register to listen for messages that STAF requests have completed registerSTAFQueueListener("STAF/RequestComplete", this); // Generate job start event HashMap jobStartMap = new HashMap(); jobStartMap.put("type", "job"); jobStartMap.put("block", "main"); jobStartMap.put("status", "begin"); jobStartMap.put("jobID", String.valueOf(fJobNumber)); jobStartMap.put("startFunction", fStartFunction); jobStartMap.put("jobName", fJobName); generateEvent(STAXJob.STAX_JOB_EVENT, jobStartMap, true); // Get the current date and time and set as the starting date/time fStartTimestamp = new STAXTimestamp(); STAXThread thread = (STAXThread)fThreadMap.get(new Integer(1)); STAXAction action = (STAXAction)fFunctionMap.get(fStartFunction); if (action == null) { throw new STAXInvalidStartFunctionException(fStartFunction); } else { // Call the main function passing any arguments action = new STAXCallAction("'" + fStartFunction + "'", fStartFuncArgs); } ArrayList actionList = new ArrayList(); // If HOLD was specified on EXECUTE request, add hold action as // the first action in the actionList. if (fExecuteAndHold) actionList.add(new STAXHoldAction("'main'")); // Add additional Python variables about the Job thread.pySetVar("STAXJobID", new Integer(fJobNumber)); thread.pySetVar("STAXJobName", fJobName); thread.pySetVar("STAXJobXMLFile", fXmlFile); thread.pySetVar("STAXJobXMLMachine", fXmlMachine); thread.pySetVar("STAXJobStartDate", fStartTimestamp.getDateString()); thread.pySetVar("STAXJobStartTime", fStartTimestamp.getTimeString()); thread.pySetVar("STAXJobSourceMachine", fSourceMachine); thread.pySetVar("STAXJobSourceHandleName", fSourceHandleName); thread.pySetVar("STAXJobSourceHandle", new Integer(fSourceHandle)); thread.pySetVar("STAXJobStartFunctionName", fStartFunction); thread.pySetVar("STAXJobStartFunctionArgs", fStartFuncArgs); thread.pySetVar("STAXCurrentFunction", Py.None); if (!fScriptFileMachine.equals("")) thread.pySetVar("STAXJobScriptFileMachine", fScriptFileMachine); else thread.pySetVar("STAXJobScriptFileMachine", fSourceMachine); thread.pySetVar("STAXJobScriptFiles", fScriptFiles.toArray()); thread.pySetVar("STAXJobHandle", fHandle); thread.pySetVar("STAXServiceName", fSTAX.getServiceName()); thread.pySetVar("STAXServiceMachine", fSTAX.getLocalMachineName()); thread.pySetVar("STAXServiceMachineNickname", fSTAX.getLocalMachineNickname()); thread.pySetVar("STAXEventServiceName", fSTAX.getEventServiceName()); thread.pySetVar("STAXEventServiceMachine", fSTAX.getEventServiceMachine()); thread.pySetVar("STAXJobUserLog", new STAFLog( STAFLog.MACHINE, fSTAX.getServiceName().toUpperCase() + "_Job_" + fJobNumber + "_User", fHandle, 0)); thread.pySetVar("STAXJobLogName", fSTAX.getServiceName().toUpperCase() + "_Job_" + fJobNumber); thread.pySetVar("STAXJobUserLogName", fSTAX.getServiceName().toUpperCase() + "_Job_" + fJobNumber + "_User"); thread.pySetVar("STAXJobWriteLocation", fJobDataDir); thread.pySetVar("STAXMessageLog", new Integer(0)); thread.pySetVar("STAXLogMessage", new Integer(0)); if (fLogTCElapsedTime) thread.pySetVar("STAXLogTCElapsedTime", new Integer(1)); else thread.pySetVar("STAXLogTCElapsedTime", new Integer(0)); if (fLogTCNumStarts) thread.pySetVar("STAXLogTCNumStarts", new Integer(1)); else thread.pySetVar("STAXLogTCNumStarts", new Integer(0)); if (fLogTCStartStop) thread.pySetVar("STAXLogTCStartStop", new Integer(1)); else thread.pySetVar("STAXLogTCStartStop", new Integer(0)); // Add default signal handlers to the actionList for the main block. addDefaultSignalHandlers(actionList); // Put the "main" function/block at the bottom of the stack, // with its action being a sequence group that contains the // default actions (scripts and signalhandlers) in the <stax> // element and then the call of the main function. while (!fDefaultActions.isEmpty()) { actionList.add((STAXAction)fDefaultActions.removeLast()); } actionList.add(action); // Add call of main function action = new STAXSequenceAction(actionList); thread.pushAction(new STAXBlockAction("main", action)); // Log a "start" job message in the Service and Job logs String msg = "JobID: " + fJobNumber + ", File: " + fXmlFile + ", Machine: " + fXmlMachine + ", Function: " + fStartFunction + ", Args: " + fStartFuncArgs + ", JobName: "; if (fJobName.equals("")) msg += "<N/A>"; else msg += fJobName; Iterator it = fScriptFiles.iterator(); while (it.hasNext()) msg += ", ScriptFile: " + (String)it.next(); if (! fScriptFileMachine.equals("")) msg += ", ScriptFileMachine: " + fScriptFileMachine; it = fScripts.iterator(); while (it.hasNext()) msg += ", Script: " + (String)it.next(); log(STAXJob.SERVICE_LOG, "start", msg); log(STAXJob.JOB_LOG, "start", msg); thread.pySetVar("STAXJob", this); // Schedule the thread to run thread.schedule(); } public void addDefaultSignalHandlers(ArrayList actionList) { // Array of default SignalHandlers. Each signal is described by // a signal name, an action, and a signal message variable name String[][] defaultSHArray = { { "STAXPythonEvaluationError", "terminate", "STAXPythonEvalMsg" }, { "STAXProcessStartError", "continue", "STAXProcessStartErrorMsg" }, { "STAXProcessStartTimeout", "continue", "STAXProcessStartTimeoutMsg" }, { "STAXCommandStartError", "terminate", "STAXCommandStartErrorMsg" }, { "STAXFunctionDoesNotExist", "terminate", "STAXFunctionDoesNotExistMsg" }, { "STAXInvalidBlockName", "terminate", "STAXInvalidBlockNameMsg" }, { "STAXBlockDoesNotExist", "continue", "STAXBlockDoesNotExistMsg" }, { "STAXLogError", "continue", "STAXLogMsg" }, { "STAXTestcaseMissingError", "continue", "STAXTestcaseMissingMsg" }, { "STAXInvalidTcStatusResult", "continue", "STAXInvalidTcStatusResultMsg" }, { "STAXInvalidTimerValue", "terminate", "STAXInvalidTimerValueMsg" }, { "STAXNoSuchSignalHandler", "continue", "STAXNoSuchSignalHandlerMsg" }, { "STAXEmptyList", "continue", "STAXEmptyListMsg" }, { "STAXFunctionArgValidate", "terminate", "STAXFunctionArgValidateMsg" }, { "STAXImportError", "terminate", "STAXImportErrorMsg" }, { "STAXInvalidTestcaseMode", "continue", "STAXInvalidTestcaseModeMsg" } }; // Add default SignalHandlers to actionList for (int i = 0; i < defaultSHArray.length; i++) { ArrayList signalHandlerActionList = new ArrayList(); String signalName = defaultSHArray[i][0]; String signalAction = defaultSHArray[i][1]; String signalMsgVarName = defaultSHArray[i][2]; String signalMsgText = ""; if (signalAction.equals("terminate")) { signalMsgText = "'" + signalName + " signal raised. " + "Terminating job. '" + " + " + signalMsgVarName; } else if (signalAction.equals("continue")) { signalMsgText = "'" + signalName + " signal raised. " + "Continuing job. '" + " + " + signalMsgVarName; } // Log the error message in the STAX job log and send to the STAX // job monitor. signalHandlerActionList.add(new STAXLogAction( signalMsgText, "'error'", "1", "1", STAXJob.JOB_LOG)); if (signalAction.equals("terminate")) { // Add a Terminate Job Action to the action list signalHandlerActionList.add( new STAXTerminateAction("'main'")); } // Add the signalhandler action to the action list actionList.add( new STAXSignalHandlerAction( "'" + signalName + "'", new STAXSequenceAction(signalHandlerActionList))); } } // // Event methods // public void generateEvent(String eventSubType, Map details) { generateEvent(eventSubType, details, false); } public void generateEvent(String eventSubType, Map details, boolean notifyAll) { StringBuffer detailsString = new StringBuffer(); String key = ""; String value = ""; Iterator keyIter = details.keySet().iterator(); while (keyIter.hasNext()) { key = (String)keyIter.next(); value = (String)details.get(key); if (value == null) { // do nothing } else { detailsString.append("property ").append( STAFUtil.wrapData(key + "=" + value)).append(" "); } } // The type machine should always be the local machine // The details parm must already be in the :length: format STAFResult result = submitSync( fSTAX.getEventServiceMachine(), fSTAX.getEventServiceName(), "generate type " + fSTAX.getServiceName().toUpperCase() + "/" + fSTAX.getLocalMachineName() + "/" + fJobNumber + " subtype " + eventSubType + " " + detailsString.toString()); // Debug if (false) { System.out.println("Event:\n" + "generate type " + fSTAX.getServiceName().toUpperCase() + "/" + fSTAX.getLocalMachineName() + "/" + fJobNumber + " subtype " + eventSubType + " " + details); } if (notifyAll) { submitSync(fSTAX.getEventServiceMachine(), fSTAX.getEventServiceName(), "generate type " + fSTAX.getServiceName().toUpperCase() + "/" + fSTAX.getLocalMachineName() + " subtype " + eventSubType + " " + detailsString.toString()); // Debug if (false)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -