📄 filewritercallback.java
字号:
/*------------------------------------------------------------------------------Name: FileWriterCallback.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.contrib.filewriter;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.Iterator;import java.util.Map;import java.util.TreeMap;import java.util.logging.Logger;import org.xmlBlaster.contrib.ContribConstants;import org.xmlBlaster.contrib.I_Update;import org.xmlBlaster.contrib.filewatcher.FilenameFilter;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.jms.XBMessage;import org.xmlBlaster.util.qos.ClientProperty;/** * FileWriterCallback stores messages to the file system. * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class FileWriterCallback implements I_Update, ContribConstants { private static Logger log = Logger.getLogger(FileWriterCallback.class.getName()); private final static int BUF_SIZE = 300000; private String dirName; private String lockExtention; private File directory; private boolean overwrite; private String tmpDirName; private File tmpDirectory; /** if true it cleans up the chunks after processing, otherwise it leaves them. */ private boolean keepDumpFiles; /** * Creates a callback * @param dirName The name of the directory where to store the files. * @param tmpDirName the name of the directory where to store the temporary files. * @param lockExtention The extention to use to lock the reading of the file. This is used if the * entries have to be retrieved by the filepoller. Until such a file exists, the entry is not * processed by the file poller. * @param overwrite if set to true it will overwrite existing files. * @throws Exception */ public FileWriterCallback(String dirName, String tmpDirName, String lockExtention, boolean overwrite, boolean keepDumpFiles) throws Exception { this.dirName = dirName; if (dirName == null) throw new Exception ("The directory where to store the files is null, can not continue"); this.lockExtention = lockExtention; this.directory = new File(this.dirName); if (this.directory == null) throw new Exception("The created directory '" + dirName + "' resulted in a null File object"); if (!this.directory.exists()) { if (this.directory.mkdir()) throw new Exception("The directory '" + dirName + "' could not be created"); } if (!this.directory.canWrite()) throw new Exception("Can not write to the directory '" + dirName + "'"); if (!this.directory.isDirectory()) throw new Exception("'" + dirName + "' is not a directory, can not use it to store files"); this.tmpDirName = tmpDirName; this.tmpDirectory = new File(this.tmpDirName); if (this.tmpDirectory == null) throw new Exception("The created temporary directory '" + tmpDirName + "' resulted in a null File object"); if (!this.tmpDirectory.exists()) { if (!this.tmpDirectory.mkdir()) throw new Exception("The temporary directory '" + tmpDirName + "' could not be created"); } if (!this.tmpDirectory.canWrite()) throw new Exception("Can not write to the temporary directory '" + tmpDirName + "'"); if (!this.tmpDirectory.isDirectory()) throw new Exception("The temporary '" + tmpDirName + "' is not a directory, can not use it to store files"); this.overwrite = overwrite; this.keepDumpFiles = keepDumpFiles; } private static void storeChunk(File tmpDir, String fileName, long chunkNumber, char sep, boolean overwrite, InputStream is) throws Exception { fileName = fileName + sep + chunkNumber; File file = new File(tmpDir, fileName); if (file == null) throw new Exception("the file for '" + fileName + "' was null"); if (file.exists()) { if (file.isDirectory()) throw new Exception("'" + fileName + "' exists already and is a directory, can not continue"); if (overwrite) { log.warning("file '" + fileName + "' exists already. Will overwrite it."); } else { log.warning("file '" + fileName + "' exists already. Will not overwrite it."); return; } } try { log.info("storing file '" + fileName + "' on directory '" + tmpDir.getName() + "' and chunk number '" + chunkNumber + "'"); FileOutputStream fos = new FileOutputStream(file); int ret = 0; byte[] buf = new byte[BUF_SIZE]; while ( (ret=is.read(buf)) > -1) { fos.write(buf, 0, ret); } // fos.write(content); fos.close(); } catch (IOException ex) { throw new Exception("update: an exception occured when storing the file '" + fileName + "'", ex); } } /** * Returns the number used as the postfix of this file. If there is no such number, -1 is * returned. If the content is null or empty, an exception is thrown. If there is a postfix * but it is not a number, a -1 is returned. * * @param filename the filename to check. * @param prefix the prefix of the filename (the part before the number) * @return a long specifying the postfix number of the file. * @throws Exception if the content is null or empty. */ private long extractNumberPostfixFromFile(String filename, String prefix) throws Exception { if (filename == null || filename.trim().length() < 1) throw new Exception("The filename is empty"); int prefixLength = prefix.length(); if (filename.startsWith(prefix)) { String postfix = filename.substring(prefixLength).trim(); if (postfix.length() < 1) return -1L; try { return Long.parseLong(postfix); } catch (Throwable ex) { return -1L; } } return -1L; } /** * The filename prefix for which to retrieve all chunks. * @param fileName * @param sep * @return * @throws Exception */ private File[] getChunkFilenames(String fileName, char sep) throws Exception { // scan for all chunks: String prefix = fileName + sep; String expression = prefix + '*'; File[] files = this.tmpDirectory.listFiles(new FilenameFilter(expression, false)); if (files.length > 0) { TreeMap map = new TreeMap(); for (int i=0; i < files.length; i++) { long postfix = extractNumberPostfixFromFile(files[i].getName(), prefix); if (postfix > -1L) { if (files[i].exists() && files[i].canRead() && files[i].isFile()) map.put(new Long(postfix), files[i]); } } File[] ret = new File[map.size()]; Iterator iter = map.keySet().iterator(); int i = 0; while (iter.hasNext()) { ret[i] = (File)map.get(iter.next()); i++; } return ret; } return new File[0]; } /** * Puts all chunks stored in separate files together in one single one. * * @param fileName The name of the complete (destination) file. * @param expectedChunks The number of expected chunks. If the number of chunks found is * bigger, the exceeding ones are ignored and a warning is written to the logs. If the number * is too low it checks if the file already exists. If it exists a warning is written and * the method returns, otherwise an exception is thrown. Keeps also track of locking the file. * @param lastContent the content of the last chunk of the message (can be null). * @param isCompleteMsg is true if the content of the message is contained in one single * chunk (i.e. if the message is not chunked). * @throws Exception If an error occurs when writing / reading the files. This method tries
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -