📄 dirpoll.java
字号:
/* * jPOS Project [http://jpos.org] * Copyright (C) 2000-2008 Alejandro P. Revilla * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */package org.jpos.util;import java.io.*;import java.text.SimpleDateFormat;import java.util.*;import org.jpos.core.*;import org.jpos.iso.ISOException;import org.jpos.util.*;/** * DirPoll operates on a set of directories which defaults to * <ul> * <li>request * <li>response * <li>tmp * <li>run * <li>bad * <li>archive * </ul> * scanning for incoming requests (of varying priorities) * on the request directory and processing them by means of * DirPoll.Processor or DirPoll.FileProcessor * * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a> * @author <a href="mailto:mmilliss@moneyswitch.net">Matthew Milliss</a> * @since jPOS 1.2.7 * @version $Revision: 2594 $ $Date: 2008-01-22 14:41:31 -0200 (Tue, 22 Jan 2008) $ */public class DirPoll extends SimpleLogSource implements Runnable, FilenameFilter, ReConfigurable, Destroyable{ private long pollInterval; private File requestDir; private File responseDir; private File tmpDir; private File badDir; private File runDir; private File archiveDir; private Vector prio; private int currentPriority; private String basePath; private String responseSuffix; private ThreadPool pool; private Object processor; private Configuration cfg; private boolean shutdown; private boolean paused = false; private boolean shouldArchive; private boolean shouldTimestampArchive; private String archiveDateFormat; //------------------------------------ Constructor/setters/getters, etc. /** * @param basePath base path of DirPoll tree * @param pool ThreadPoll (may be null) */ public DirPoll () { prio = new Vector(); setPollInterval(1000); setPath ("."); pool = null; cfg = null; } public synchronized void setPath(String base) { this.basePath = base; requestDir = new File(base, "request"); responseDir = new File(base, "response"); tmpDir = new File(base, "tmp"); badDir = new File(base, "bad"); runDir = new File(base, "run"); archiveDir = new File(base, "archive"); } public void setShouldTimestampArchive(boolean shouldTimestampArchive) { this.shouldTimestampArchive = shouldTimestampArchive; } public void setArchiveDateFormat(String dateFormat) { this.archiveDateFormat = dateFormat; } public void setShouldArchive(boolean shouldArchive) { this.shouldArchive = shouldArchive; } public String getPath() { return basePath; } public void setRequestDir (String dir) { requestDir = new File (basePath, dir); } public void setResponseDir (String dir) { responseDir = new File (basePath, dir); } public void setTmpDir (String dir) { tmpDir = new File (basePath, dir); } public void setBadDir (String dir) { badDir = new File (basePath, dir); } public void setRunDir (String dir) { runDir = new File (basePath, dir); } public void setArchiveDir (String dir) { archiveDir = new File (basePath, dir); } public void setPollInterval(long pollInterval) { this.pollInterval = pollInterval; } public void setResponseSuffix (String suffix) { this.responseSuffix = suffix; } public long getPollInterval() { return pollInterval; } public void setProcessor (Object processor) { this.processor = processor; } /** * DirPool is not really Configurable, it uses QSPConfig instead * but anyway it receives Configuration and ReConfiguration requests * and pass along them to the underlying processor. * @param cfg Configuration object */ public void setConfiguration (Configuration cfg) throws ConfigurationException { if (processor != null) { if ( (processor instanceof ReConfigurable) || ((cfg == null) && (processor instanceof Configurable)) ) { ((Configurable) processor).setConfiguration (cfg); } } this.cfg = cfg; setRequestDir (cfg.get ("request.dir", "request")); setResponseDir (cfg.get ("response.dir", "response")); setTmpDir (cfg.get ("tmp.dir", "tmp")); setRunDir (cfg.get ("run.dir", "run")); setBadDir (cfg.get ("bad.dir", "bad")); setArchiveDir (cfg.get ("archive.dir", "archive")); setResponseSuffix (cfg.get ("response.suffix", null)); setShouldArchive (cfg.getBoolean ("archive", false)); setArchiveDateFormat ( cfg.get ("archive.dateformat", "yyyyMMddHHmmss") ); setShouldTimestampArchive (cfg.getBoolean ("archive.timestamp", false)); } /** * @param priorities blank separated list of extensions */ public void setPriorities (String priorities) { StringTokenizer st = new StringTokenizer (priorities); Vector v = new Vector(); while (st.hasMoreTokens()) { String ext = st.nextToken(); v.addElement (ext.equals ("*") ? "" : ext); } if (v.size() == 0) v.addElement (""); synchronized (this) { prio = v; } } public synchronized void setThreadPool (ThreadPool pool) { this.pool = pool; } //--------------------------------------- FilenameFilter implementation public boolean accept(File dir, String name) { String ext = currentPriority >= 0 ? ((String) prio.elementAt(currentPriority)) : null; if (ext != null && !name.endsWith(ext)) return false; File f = new File (dir, name); return f.isFile() && (f.length() > 0); } //--------------------------------------------- Runnable implementation public void run() { Thread.currentThread().setName ("DirPoll-"+basePath); if (prio.size() == 0) addPriority(""); while (!shutdown) { synchronized (this) { if (paused) { try { wait(); paused = false; } catch (InterruptedException e) { } } } try { File f; synchronized (this) { f = scan(); } if (f != null) { getPool().execute (new ProcessorRunner (f)); Thread.yield(); // be nice } else Thread.sleep(pollInterval); } catch (InterruptedException e) { } catch (Throwable e) { Logger.log (new LogEvent (this, "dirpoll", e)); try { Thread.sleep(pollInterval * 10); // anti hog } catch (InterruptedException ex) { } } } } public void destroy () { shutdown = true; } //----------------------------------------------------- public helpers public void createDirs() { requestDir.mkdirs(); responseDir.mkdirs(); tmpDir.mkdirs(); badDir.mkdirs(); runDir.mkdirs(); archiveDir.mkdirs(); } public void addPriority(String fileExtension) { prio.addElement (fileExtension); } //----------------------------------------------------- private helpers private byte[] readRequest (File f) throws IOException { byte[] b = new byte[(int) f.length()]; FileInputStream in = new FileInputStream(f); in.read(b); in.close(); return b; } private void writeResponse (String requestName, byte[] b) throws IOException { if (responseSuffix != null) { int pos = requestName.lastIndexOf ('.'); if (pos > 0) requestName = requestName.substring (0, pos) + responseSuffix; } File tmp = new File(tmpDir, requestName); FileOutputStream out = new FileOutputStream(tmp); out.write(b); out.close(); moveTo (tmp, responseDir); } private File moveTo(File f, File dir) throws IOException { File destination = new File(dir, f.getName()); if (!f.renameTo(destination)) throw new IOException("Unable to move"+f.getName()); return destination; } private void store(File f, File destinationDirectory) throws IOException { String storedFilename = f.getName(); if (shouldTimestampArchive) storedFilename = f.getName() + "." + new SimpleDateFormat(archiveDateFormat).format(new Date()); File destination = new File(destinationDirectory, storedFilename); if (!f.renameTo(destination)) throw new IOException("Unable to archive " + "'" + f.getName() + "' in directory " + destinationDirectory); } private File scan() { for (currentPriority=0; currentPriority < prio.size(); currentPriority++) { String files[] = requestDir.list(this); if (files != null && files.length > 0) return new File(requestDir, files[0]); } return null; } private synchronized ThreadPool getPool() { if (pool == null) pool = new ThreadPool (1, 10); return pool; } // ------------------------------------------------ inner interfaces public interface Processor { /** * @param name request name * @param request request image * @return response (or null) */ public byte[] process(String name, byte[] request) throws DirPollException; } public interface FileProcessor { /** * @param name request File * @exception should something go wrong */ public void process (File name) throws DirPollException; } public class ProcessorRunner implements Runnable { File request; LogEvent logEvent; public ProcessorRunner (File request) throws IOException { this.request = moveTo (request, runDir); this.logEvent = null; } public void run() { LogEvent evt = new LogEvent ( (LogSource) DirPoll.this, "dirpoll", request.getName() ); try { if (processor == null) throw new DirPollException ("null processor - nothing to do"); else if (processor instanceof Processor) { byte[] resp = ((Processor) processor).process ( request.getName(), readRequest (request) ); if (resp != null) writeResponse (request.getName(), resp); } else if (processor instanceof FileProcessor) ((FileProcessor) processor).process (request); if (shouldArchive) { store(request, archiveDir); } else { if (!request.delete ()) throw new DirPollException ("error: can't unlink request " + request.getName()); } } catch (Throwable e) { logEvent = evt; evt.addMessage (e); try { store (request, badDir); } catch (IOException _e) { evt.addMessage ("Can't move to "+badDir.getPath()); evt.addMessage (_e); } } finally { if (logEvent != null) Logger.log (logEvent); } } } public static class DirPollException extends ISOException { public DirPollException () { super(); } public DirPollException (String detail) { super(detail); } public DirPollException (Exception nested) { super(nested); } public DirPollException (String detail, Exception nested) { super(detail, nested); } } public void pause() { synchronized (this) { if (!paused) { paused = true; // Wake up the run() method from sleeping and tell it to pause notify(); } } } public void unpause() { synchronized (this) { if (paused) { paused = false; // Wake up the wait()ing thread from being paused notify(); // The run() method will reset the paused flag } } } public boolean isPaused() { synchronized (this) { return paused; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -