📄 filemessagereceiver.java
字号:
/* * $Id: FileMessageReceiver.java 12637 2008-09-12 17:34:03Z aperepel $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.file;import org.mule.DefaultMuleMessage;import org.mule.api.DefaultMuleException;import org.mule.api.MuleException;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.CreateException;import org.mule.api.routing.RoutingException;import org.mule.api.service.Service;import org.mule.api.transport.Connector;import org.mule.api.transport.MessageAdapter;import org.mule.transport.AbstractPollingMessageReceiver;import org.mule.transport.ConnectException;import org.mule.transport.DefaultMessageAdapter;import org.mule.transport.file.i18n.FileMessages;import org.mule.util.FileUtils;import java.io.File;import java.io.FileFilter;import java.io.FileNotFoundException;import java.io.FilenameFilter;import java.io.IOException;import java.io.RandomAccessFile;import java.nio.channels.FileChannel;import java.nio.channels.FileLock;import java.util.Comparator;import edu.emory.mathcs.backport.java.util.Arrays;import org.apache.commons.collections.comparators.ReverseComparator;/** * <code>FileMessageReceiver</code> is a polling listener that reads files from a * directory. */public class FileMessageReceiver extends AbstractPollingMessageReceiver{ public static final String COMPARATOR_CLASS_NAME_PROPERTY = "comparator"; public static final String COMPARATOR_REVERSE_ORDER_PROPERTY = "reverseOrder"; private static final File[] NO_FILES = new File[0]; private String readDir = null; private String moveDir = null; private File readDirectory = null; private File moveDirectory = null; private String moveToPattern = null; private FilenameFilter filenameFilter = null; private FileFilter fileFilter = null; public FileMessageReceiver(Connector connector, Service service, InboundEndpoint endpoint, String readDir, String moveDir, String moveToPattern, long frequency) throws CreateException { super(connector, service, endpoint); this.setFrequency(frequency); this.readDir = readDir; this.moveDir = moveDir; this.moveToPattern = moveToPattern; if (endpoint.getFilter() instanceof FilenameFilter) { filenameFilter = (FilenameFilter) endpoint.getFilter(); } else if (endpoint.getFilter() instanceof FileFilter) { fileFilter = (FileFilter) endpoint.getFilter(); } else if (endpoint.getFilter() != null) { throw new CreateException(FileMessages.invalidFileFilter(endpoint.getEndpointURI()), this); } } protected void doConnect() throws Exception { if (readDir != null) { readDirectory = FileUtils.openDirectory(readDir); if (!(readDirectory.canRead())) { throw new ConnectException(FileMessages.fileDoesNotExist(readDirectory.getAbsolutePath()), this); } else { logger.debug("Listening on endpointUri: " + readDirectory.getAbsolutePath()); } } if (moveDir != null) { moveDirectory = FileUtils.openDirectory((moveDir)); if (!(moveDirectory.canRead()) || !moveDirectory.canWrite()) { throw new ConnectException(FileMessages.moveToDirectoryNotWritable(), this); } } } protected void doDisconnect() throws Exception { // template method } protected void doDispose() { // nothing to do } public void poll() { try { File[] files = this.listFiles(); if (logger.isDebugEnabled()) { logger.debug("Files: " + files); } Comparator comparator = getComparator(); if (comparator != null) { Arrays.sort(files, comparator); } for (int i = 0; i < files.length; i++) { // don't process directories if (files[i].isFile()) { this.processFile(files[i]); } } } catch (Exception e) { this.handleException(e); } } public synchronized void processFile(final File sourceFile) throws MuleException { //TODO RM*: This can be put in a Filter. Also we can add an AndFileFilter/OrFileFilter to allow users to //combine file filters (since we can only pass a single filter to File.listFiles, we would need to wrap //the current And/Or filters to extend {@link FilenameFilter} boolean checkFileAge = ((FileConnector) connector).getCheckFileAge(); if (checkFileAge) { long fileAge = ((FileConnector) connector).getFileAge(); long lastMod = sourceFile.lastModified(); long now = System.currentTimeMillis(); long thisFileAge = now - lastMod; if (thisFileAge < fileAge) { if (logger.isDebugEnabled()) { logger.debug("The file has not aged enough yet, will return nothing for: " + sourceFile); } return; } } // don't process a file that is locked by another process (probably still being written) if (!attemptFileLock(sourceFile)) { return; } FileConnector fc = ((FileConnector) connector); String sourceFileOriginalName = sourceFile.getName(); // Perform some quick checks to make sure file can be processed if (!(sourceFile.canRead() && sourceFile.exists() && sourceFile.isFile())) { throw new DefaultMuleException(FileMessages.fileDoesNotExist(sourceFileOriginalName)); } // This isn't nice but is needed as MessageAdaptor is required to resolve // destination file name, and StreamingReceiverFileInputStream is // required to create MessageAdaptor DefaultMessageAdapter fileParserMsgAdaptor = new DefaultMessageAdapter(null); fileParserMsgAdaptor.setProperty(FileConnector.PROPERTY_ORIGINAL_FILENAME, sourceFileOriginalName); // set up destination file File destinationFile = null; if (moveDir != null) { String destinationFileName = sourceFileOriginalName; if (moveToPattern != null) { destinationFileName = ((FileConnector) connector).getFilenameParser().getFilename(fileParserMsgAdaptor, moveToPattern); } // don't use new File() directly, see MULE-1112 destinationFile = FileUtils.newFile(moveDir, destinationFileName); } MessageAdapter msgAdapter = null; try { if (fc.isStreaming()) { msgAdapter = connector.getMessageAdapter(new ReceiverFileInputStream(sourceFile, fc.isAutoDelete(), destinationFile)); } else { msgAdapter = connector.getMessageAdapter(sourceFile); } } catch (FileNotFoundException e)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -