📄 staxjob.java
字号:
{ System.out.println("Event:\n" + "generate type " + fSTAX.getServiceName().toUpperCase() + "/" + fSTAX.getLocalMachineName() + " subtype " + eventSubType + " " + details); } } } public STAFResult log(int logfile, String level, String message) { String serviceName = fSTAX.getServiceName().toUpperCase(); String logName; if (logfile == STAXJob.SERVICE_LOG) logName = serviceName + "_Service"; else if (logfile == STAXJob.JOB_LOG) logName = serviceName + "_Job_" + fJobNumber; else if (logfile == STAXJob.USER_JOB_LOG) logName = serviceName + "_Job_" + fJobNumber + "_User"; else { // Log to STAX_Service log if invalid logfile specified System.out.println("STAX Service Error: Invalid Logfile" + logfile); logName = serviceName + "_Service"; } String logRequest = "LOG MACHINE LOGNAME " + STAFUtil.wrapData(logName) + " LEVEL " + level + " MESSAGE " + STAFUtil.wrapData(message); STAFResult result = submitSync("LOCAL", "LOG", logRequest); // Check if the result was unsuccesful (except ignore RC 2) if (result.rc != 0 && result.rc != 2) { if (logfile != STAXJob.USER_JOB_LOG) { System.out.println("STAX Log failed with RC " + result.rc + " and Result " + result.result + " level: " + level + " logRequest: " + logRequest); } } return result; } public void clearLogs() { String serviceName = fSTAX.getServiceName().toUpperCase(); String jobLogName; String userJobLogName; jobLogName = serviceName + "_Job_" + fJobNumber; userJobLogName = serviceName + "_Job_" + fJobNumber + "_User"; String logRequest = "DELETE MACHINE " + fSTAX.getLocalMachineNickname() + " LOGNAME " + STAFUtil.wrapData(jobLogName) + " CONFIRM"; STAFResult result = submitSync("LOCAL", "LOG", logRequest); logRequest = "DELETE MACHINE " + fSTAX.getLocalMachineNickname() + " LOGNAME " + STAFUtil.wrapData(userJobLogName) + " CONFIRM"; result = submitSync("LOCAL", "LOG", logRequest); } public void clearJobDataDir() { // Delete the job data directory and recreate it File dir = new File(fJobDataDir); if (dir.exists()) { String deleteDirRequest = "DELETE ENTRY " + STAFUtil.wrapData(fJobDataDir) + " RECURSE CONFIRM"; submitSync("local", "FS", deleteDirRequest); } if (!dir.exists()) { dir.mkdirs(); } } // Submit methods public STAFResult submitAsync(String location, String service, String request, STAXSTAFRequestCompleteListener listener) { STAFResult result; synchronized(fRequestMap) { result = fHandle.submit2( STAFHandle.ReqQueue, location, service, request); if (result.rc == STAFResult.Ok) { try { Integer requestNumber = new Integer(result.result); fRequestMap.put(requestNumber, listener); result.result = (new Integer(requestNumber.intValue())).toString(); } catch (NumberFormatException e) { System.out.println(e.toString()); result.result = "0"; } } } return result; } public STAFResult submitSync(String location, String service, String request) { STAFResult result = fHandle.submit2(STAFHandle.ReqSync, location, service, request); return result; } public STAFResult submitAsyncForget(String location, String service, String request) { STAFResult result = fHandle.submit2(STAFHandle.ReqFireAndForget, location, service, request); return result; } // STAXSTAFQueueListener method public void handleQueueMessage(STAXSTAFMessage message, STAXJob job) { int requestNumber = message.getRequestNumber(); STAXSTAFRequestCompleteListener listener = null; synchronized (fRequestMap) { Integer key = new Integer(requestNumber); listener = (STAXSTAFRequestCompleteListener)fRequestMap.get(key); if (listener != null) { fRequestMap.remove(key); } } if (listener != null) { listener.requestComplete(requestNumber, new STAFResult( message.getRequestRC(), message.getRequestResult())); } else { // Log a message in the job log String msg = "STAXJob.handleQueueMessage: " + " No listener found for message:\n" + STAFMarshallingContext.unmarshall( message.getResult()).toString(); job.log(STAXJob.JOB_LOG, "warning", msg); } } // STAXThreadCompleteListener method public void threadComplete(STAXThread thread, int endCode) { // Debug: if (false) { System.out.println("Thread #" + thread.getThreadNumber() + " complete"); } boolean jobComplete = false; synchronized (fThreadMap) { fThreadMap.remove(new Integer(thread.getThreadNumber())); if (fThreadMap.isEmpty()) jobComplete = true; } if (jobComplete == true) { // Perform terminateJob on interested parties fSTAX.visitJobManagementHandlers(new STAXVisitorHelper(this) { public void visit(Object o, Iterator iter) { STAXJobManagementHandler handler = (STAXJobManagementHandler)o; handler.terminateJob((STAXJob)fData); } }); // Get the current date and time and set as job ending date/time fEndTimestamp = new STAXTimestamp(); // Set the result so that it is available upon job completion. try { fResult = thread.pyObjectEval("STAXResult"); } catch (STAXPythonEvaluationException e) { fResult = Py.None; } // Generate job completion event HashMap jobEndMap = new HashMap(); jobEndMap.put("type", "job"); jobEndMap.put("block", "main"); jobEndMap.put("status", "end"); jobEndMap.put("jobID", String.valueOf(fJobNumber)); jobEndMap.put("result", fResult.toString()); generateEvent(STAXJob.STAX_JOB_EVENT, jobEndMap, true); // Log the result from the job in a Status message in the Job log STAFMarshallingContext mc = STAFMarshallingContext. unmarshall(fResult.toString()); log(STAXJob.JOB_LOG, "status", "Job Result: " + mc); // Log a Stop message for the job in the STAX Service and Job logs String msg = "JobID: " + fJobNumber; log(STAXJob.SERVICE_LOG, "stop", msg); log(STAXJob.JOB_LOG, "stop", msg); submitSync("local", "QUEUE", "QUEUE TYPE STAF/Service/STAX/End " + "MESSAGE " + STAFUtil.wrapData("")); // Debug if (COUNT_PYCODE_CACHES && STAX.CACHE_PYTHON_CODE) { System.out.println( "Job " + fJobNumber + ": " + " cacheGets=" + fCompiledPyCodeCacheGets + " cacheAdds=" + fCompiledPyCodeCacheAdds); } while (!fCompletionNotifiees.isEmpty()) { STAXJobCompleteListener listener = (STAXJobCompleteListener)fCompletionNotifiees.removeFirst(); if (listener != null) listener.jobComplete(this); } try { fHandle.unRegister(); } catch (STAFException e) { /* Do Nothing */ } // Clear out job's private variables fScripts = new ArrayList(); fScriptFiles = new ArrayList(); fFunctionMap = new HashMap(); fThreadMap = new HashMap(); fDefaultActions = new LinkedList(); fCompletionNotifiees = new LinkedList(); fDataMap = new HashMap(); fCompiledPyCodeCache = new HashMap(); // Commented out setting the following variables to null because // STAFQueueMonitor.notifyListeners() is running in another // thread and could access these variables and get a NPE. // fQueueListenerMap = null; // fHandle = null; // fSTAFQueueMonitor = null; // fRequestMap = new HashMap(); } } private STAX fSTAX; private Object fNextThreadNumberSynch = new Object(); private int fNextThreadNumber = 1; private int fJobNumber = 0; private Object fNextProcNumberSynch = new Object(); private int fProcNumber = 1; private Object fNextCmdNumberSynch = new Object(); private int fCmdNumber = 1; private Object fNextProcessKeySynch = new Object(); private int fProcessKey = 1; private String fJobDataDir = new String(); private String fJobName = new String(); private String fXmlMachine = new String(); private String fXmlFile = new String(); private String fStartFunction = new String(); private String fStartFuncArgs = "None"; private boolean fClearlogs; private List fScripts = new ArrayList(); private List fScriptFiles = new ArrayList(); private String fScriptFileMachine = new String(); private HashMap fFunctionMap = new HashMap(); // I don't think we need a whole map, just a reference to thread 1 private HashMap fThreadMap = new HashMap(); private STAFHandle fHandle = null; private LinkedList fDefaultActions = new LinkedList(); private STAFQueueMonitor fSTAFQueueMonitor = null; private LinkedList fCompletionNotifiees = new LinkedList(); private boolean fExecuteAndHold = false; private String fSourceMachine = new String(); // Source == Originating private String fSourceHandleName = new String(); private int fSourceHandle; private int fNotifyOnEnd = STAXJob.NO_NOTIFY_ONEND; private PyObject fResult = Py.None; private STAXTimestamp fStartTimestamp; private STAXTimestamp fEndTimestamp; private HashMap fQueueListenerMap = new HashMap(); private HashMap fDataMap = new HashMap(); private HashMap fRequestMap = new HashMap(); // Map of active STAF requests private boolean fLogTCElapsedTime; private boolean fLogTCNumStarts; private boolean fLogTCStartStop; private HashMap fCompiledPyCodeCache = new HashMap(); private long fCompiledPyCodeCacheAdds = 0; private long fCompiledPyCodeCacheGets = 0; class STAFQueueMonitor extends Thread { STAFQueueMonitor(STAXJob job) { fJob = job; } public void run() { for (;;) { STAFResult result = submitSync("local", "QUEUE", "GET WAIT"); if (result.rc != STAFResult.Ok) { // XXX: Maybe we should do something to prevent an infinite // loop continue; } // Debug: if (false) System.out.println("Result: " + result.result); // Break up result STAXSTAFMessage msg = new STAXSTAFMessage(result.result); String type = msg.getType(); if (type != null && type.equalsIgnoreCase("STAF/Service/STAX/End")) { return; } notifyListeners(msg); } } public void notifyListeners(STAXSTAFMessage msg) { // Perform a lookup of registered message handlers for // this message prefix and pass the STAXSTAFMessage to // the registered message handlers. synchronized (fQueueListenerMap) { Iterator mapIter = fQueueListenerMap.keySet().iterator(); int listenersFound = 0; while (mapIter.hasNext()) { String msgType = (String)mapIter.next(); String theType = msg.getType(); if ((msgType == null) || // Messages from 3.x clients that STAX is interested // in will have a type (theType != null && theType.equalsIgnoreCase(msgType)) || // The following is for messages from 2.x clients // which will have a null type and the message will // begin with the type (which for processes will be // STAF/PROCESS/END instead of STAF/Process/End (theType == null && msg.getMessage().toUpperCase(). startsWith(msgType.toUpperCase()))) { TreeSet listenerSet = (TreeSet)fQueueListenerMap.get(msgType); if (listenerSet == null) continue; Iterator iter = listenerSet.iterator(); while (iter.hasNext()) { STAXSTAFQueueListener listener = (STAXSTAFQueueListener)iter.next(); listener.handleQueueMessage(msg, fJob); listenersFound++; } } } if (listenersFound == 0) { // Log a message in the job log String logMsg = "STAXJob.notifyListeners: " + "No listener found for message:\n" + STAFMarshallingContext.unmarshall( msg.getResult()).toString(); fJob.log(STAXJob.JOB_LOG, "warning", logMsg); } } } STAXJob fJob; } // end STAFQueueMonitor}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -