📄 filemessagereceiver.java
字号:
{ // we can ignore since we did manage to acquire a lock, but just in case logger.error("File being read disappeared!", e); return; } msgAdapter.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, sourceFileOriginalName); if (!fc.isStreaming()) { moveAndDelete(sourceFile, destinationFile, sourceFileOriginalName, msgAdapter); } else { // If we are streaming no need to move/delete now, that will be done when // stream is closed this.routeMessage(new DefaultMuleMessage(msgAdapter), endpoint.isSynchronous()); } } private void moveAndDelete(final File sourceFile, File destinationFile, String sourceFileOriginalName, MessageAdapter msgAdapter) { boolean fileWasMoved = false; try { // If we are moving the file to a read directory, move it there now and // hand over a reference to the // File in its moved location if (destinationFile != null) { // move sourceFile to new destination fileWasMoved = FileUtils.moveFile(sourceFile, destinationFile); // move didn't work - bail out (will attempt rollback) if (!fileWasMoved) { throw new DefaultMuleException(FileMessages.failedToMoveFile(sourceFile.getAbsolutePath(), destinationFile.getAbsolutePath())); } // create new MessageAdapter for destinationFile msgAdapter = connector.getMessageAdapter(destinationFile); msgAdapter.setProperty(FileConnector.PROPERTY_FILENAME, destinationFile.getName()); msgAdapter.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, sourceFileOriginalName); } // finally deliver the file message this.routeMessage(new DefaultMuleMessage(msgAdapter), endpoint.isSynchronous()); // at this point msgAdapter either points to the old sourceFile // or the new destinationFile. if (((FileConnector) connector).isAutoDelete()) { // no moveTo directory if (destinationFile == null) { // delete source if (!sourceFile.delete()) { throw new DefaultMuleException(FileMessages.failedToDeleteFile(sourceFile.getAbsolutePath())); } } else { // nothing to do here since moveFile() should have deleted // the source file for us } } } catch (Exception e) { boolean fileWasRolledBack = false; // only attempt rollback if file move was successful if (fileWasMoved) { fileWasRolledBack = rollbackFileMove(destinationFile, sourceFile.getAbsolutePath()); } // wrap exception & handle it Exception ex = new RoutingException(FileMessages.exceptionWhileProcessing(sourceFile.getName(), (fileWasRolledBack ? "successful" : "unsuccessful")), new DefaultMuleMessage(msgAdapter), endpoint, e); this.handleException(ex); } } /** * Try to acquire a lock on a file and release it immediately. Usually used as a * quick check to see if another process is still holding onto the file, e.g. a * large file (more than 100MB) is still being written to. * * @param sourceFile file to check * @return <code>true</code> if the file can be locked */ protected boolean attemptFileLock(final File sourceFile) { // check if the file can be processed, be sure that it's not still being // written // if the file can't be locked don't process it yet, since creating // a new FileInputStream() will throw an exception FileLock lock = null; FileChannel channel = null; boolean fileCanBeLocked = false; try { channel = new RandomAccessFile(sourceFile, "rw").getChannel(); // Try acquiring the lock without blocking. This method returns // null or throws an exception if the file is already locked. lock = channel.tryLock(); } catch (FileNotFoundException fnfe) { logger.warn("Unable to open " + sourceFile.getAbsolutePath(), fnfe); } catch (IOException e) { // Unable to create a lock. This exception should only be thrown when // the file is already locked. No sense in repeating the message over // and over. } finally { if (lock != null) { // if lock is null the file is locked by another process fileCanBeLocked = true; try { // Release the lock lock.release(); } catch (IOException e) { // ignore } } if (channel != null) { try { // Close the file channel.close(); } catch (IOException e) { // ignore } } } return fileCanBeLocked; } /** * Get a list of files to be processed. * * @return an array of files to be processed. * @throws org.mule.api.MuleException which will wrap any other exceptions or * errors. */ File[] listFiles() throws MuleException { try { File[] todoFiles = null; if (fileFilter != null) { todoFiles = readDirectory.listFiles(fileFilter); } else { todoFiles = readDirectory.listFiles(filenameFilter); } // logger.trace("Reading directory " + readDirectory.getAbsolutePath() + // " -> " + TODOFiles.length + " file(s)"); return (todoFiles == null ? NO_FILES : todoFiles); } catch (Exception e) { throw new DefaultMuleException(FileMessages.errorWhileListingFiles(), e); } } /** Exception tolerant roll back method */ protected boolean rollbackFileMove(File sourceFile, String destinationFilePath) { boolean result = false; try { result = FileUtils.moveFile(sourceFile, FileUtils.newFile(destinationFilePath)); } catch (Throwable t) { logger.debug("rollback of file move failed: " + t.getMessage()); } return result; } protected Comparator getComparator() throws Exception { Object o = getEndpoint().getProperty(COMPARATOR_CLASS_NAME_PROPERTY); Object reverseProperty = this.getEndpoint().getProperty(COMPARATOR_REVERSE_ORDER_PROPERTY); boolean reverse = false; if (o != null) { if (reverseProperty != null) { reverse = Boolean.valueOf((String) reverseProperty).booleanValue(); } Class clazz = Class.forName(o.toString()); o = clazz.newInstance(); return reverse ? new ReverseComparator((Comparator) o) : (Comparator) o; } return null; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -