📄 fileconnector.java
字号:
/* * $Id: FileConnector.java 12423 2008-07-29 19:38:10Z tcarlson $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.file;import org.mule.api.MessagingException;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.config.MuleProperties;import org.mule.api.endpoint.ImmutableEndpoint;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.service.Service;import org.mule.api.transport.DispatchException;import org.mule.api.transport.MessageAdapter;import org.mule.api.transport.MessageReceiver;import org.mule.config.i18n.CoreMessages;import org.mule.transformer.simple.ByteArrayToSerializable;import org.mule.transformer.simple.SerializableToByteArray;import org.mule.transport.AbstractConnector;import org.mule.transport.file.filters.FilenameWildcardFilter;import org.mule.util.FileUtils;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import java.util.Map;import java.util.Properties;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * <code>FileConnector</code> is used for setting up listeners on a directory and * for writing files to a directory. The connecotry provides support for defining * file output patterns and filters for receiving files. */public class FileConnector extends AbstractConnector{ public static final String FILE = "file"; private static Log logger = LogFactory.getLog(FileConnector.class); // These are properties that can be overridden on the Receiver by the endpoint declaration // inbound only public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency"; public static final String PROPERTY_FILE_AGE = "fileAge"; public static final String PROPERTY_MOVE_TO_PATTERN = "moveToPattern"; public static final String PROPERTY_MOVE_TO_DIRECTORY = "moveToDirectory"; public static final String PROPERTY_READ_FROM_DIRECTORY = "readFromDirectoryName"; // outbound only public static final String PROPERTY_OUTPUT_PATTERN = "outputPattern"; // apparently unused (once strange override code deleted)// public static final String PROPERTY_DELETE_ON_READ = "autoDelete";// public static final String PROPERTY_SERVICE_OVERRIDE = "serviceOverrides"; // message properties public static final String PROPERTY_FILENAME = "filename"; public static final String PROPERTY_ORIGINAL_FILENAME = "originalFilename"; public static final String PROPERTY_DIRECTORY = "directory"; public static final String PROPERTY_WRITE_TO_DIRECTORY = "writeToDirectoryName"; public static final String PROPERTY_FILE_SIZE = "fileSize"; public static final long DEFAULT_POLLING_FREQUENCY = 1000; /** * Time in milliseconds to poll. On each poll the poll() method is called */ private long pollingFrequency = 0; private String moveToPattern = null; private String writeToDirectoryName = null; private String moveToDirectoryName = null; private String readFromDirectoryName = null; private String outputPattern = null; private boolean outputAppend = false; private boolean autoDelete = true; private boolean checkFileAge = false; private long fileAge = 0; private FileOutputStream outputStream = null; private boolean serialiseObjects = false; private boolean streaming = true; public FilenameParser filenameParser; /* * (non-Javadoc) * * @see org.mule.transport.AbstractConnector#doInitialise() */ public FileConnector() { super(); filenameParser = new SimpleFilenameParser(); } // @Override public void setMaxDispatchersActive(int value) { if (isOutputAppend() && value != 1) { logger.warn("MULE-1773: cannot configure maxDispatchersActive when using outputAppend. New value not set"); } else { super.setMaxDispatchersActive(value); } } // @Override protected Object getReceiverKey(Service service, InboundEndpoint endpoint) { if (endpoint.getFilter() != null && endpoint.getFilter() instanceof FilenameWildcardFilter) { return endpoint.getEndpointURI().getAddress() + "/" + ((FilenameWildcardFilter) endpoint.getFilter()).getPattern(); } return endpoint.getEndpointURI().getAddress(); } /** * Registers a listener for a particular directory The following properties can * be overriden in the endpoint declaration * <ul> * <li>moveToDirectory</li> * <li>filterPatterns</li> * <li>filterClass</li> * <li>pollingFrequency</li> * </ul> */ public MessageReceiver createReceiver(Service service, InboundEndpoint endpoint) throws Exception { String readDir = endpoint.getEndpointURI().getAddress(); if (null != getReadFromDirectory()) { readDir = getReadFromDirectory(); } long polling = this.pollingFrequency; String moveTo = moveToDirectoryName; String moveToPattern = getMoveToPattern(); Map props = endpoint.getProperties(); if (props != null) { // Override properties on the endpoint for the specific endpoint String read = (String) props.get(PROPERTY_READ_FROM_DIRECTORY); if (read != null) { readDir = read; } String move = (String) props.get(PROPERTY_MOVE_TO_DIRECTORY); if (move != null) { moveTo = move; } String tempMoveToPattern = (String) props.get(PROPERTY_MOVE_TO_PATTERN); if (tempMoveToPattern != null) { if (logger.isDebugEnabled()) { logger.debug("set moveTo Pattern to: " + tempMoveToPattern); } moveToPattern = tempMoveToPattern; } String tempPolling = (String) props.get(PROPERTY_POLLING_FREQUENCY); if (tempPolling != null) { polling = Long.parseLong(tempPolling); } if (polling <= 0) { polling = DEFAULT_POLLING_FREQUENCY; } if (logger.isDebugEnabled()) { logger.debug("set polling frequency to: " + polling); } String tempFileAge = (String) props.get(PROPERTY_FILE_AGE); if (tempFileAge != null) { try { setFileAge(Long.parseLong(tempFileAge)); } catch (Exception ex1) { logger.error("Failed to set fileAge", ex1); } } // this is surreal! what on earth was it useful for? // Map srvOverride = (Map) props.get(PROPERTY_SERVICE_OVERRIDE); // if (srvOverride != null) // { // if (serviceOverrides == null) // { // serviceOverrides = new Properties(); // } // serviceOverrides.setProperty(MuleProperties.CONNECTOR_INBOUND_TRANSFORMER, // NoActionTransformer.class.getName()); // serviceOverrides.setProperty(MuleProperties.CONNECTOR_OUTBOUND_TRANSFORMER, // NoActionTransformer.class.getName()); // } } try { return serviceDescriptor.createMessageReceiver(this, service, endpoint, new Object[]{readDir, moveTo, moveToPattern, new Long(polling)}); } catch (Exception e) { throw new InitialisationException( CoreMessages.failedToCreateObjectWith("Message Receiver", serviceDescriptor), e, this); } } public String getProtocol() { return FILE; } public FilenameParser getFilenameParser() { return filenameParser; } public void setFilenameParser(FilenameParser filenameParser) { this.filenameParser = filenameParser; } protected void doDispose() { try { doStop(); } catch (MuleException e) { logger.error(e.getMessage(), e); } } protected void doInitialise() throws InitialisationException { // MULE-1773: limit the number of dispatchers per endpoint to 1 until // there is a proper (Distributed)LockManager in place (MULE-2402). // We also override the setter to prevent "wrong" configuration for now. if (isOutputAppend()) { super.setMaxDispatchersActive(1); } } protected void doConnect() throws Exception { // template method, nothing to do } protected void doDisconnect() throws Exception { // template method, nothing to do } protected void doStart() throws MuleException { // template method, nothing to do } protected void doStop() throws MuleException { if (outputStream != null) { try { outputStream.close();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -