📄 ftpconnector.java
字号:
/* * $Id: FtpConnector.java 12426 2008-07-29 20:19:58Z tcarlson $ * -------------------------------------------------------------------------------------- * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com * * The software in this package is published under the terms of the CPAL v1.0 * license, a copy of which has been included with this distribution in the * LICENSE.txt file. */package org.mule.transport.ftp;import org.mule.api.MuleException;import org.mule.api.MuleMessage;import org.mule.api.MuleRuntimeException;import org.mule.api.endpoint.EndpointURI;import org.mule.api.endpoint.ImmutableEndpoint;import org.mule.api.endpoint.InboundEndpoint;import org.mule.api.lifecycle.InitialisationException;import org.mule.api.service.Service;import org.mule.api.transport.ConnectorException;import org.mule.api.transport.DispatchException;import org.mule.api.transport.MessageReceiver;import org.mule.config.i18n.CoreMessages;import org.mule.config.i18n.MessageFactory;import org.mule.model.streaming.CallbackOutputStream;import org.mule.transport.AbstractConnector;import org.mule.transport.file.FilenameParser;import org.mule.transport.file.SimpleFilenameParser;import org.mule.util.ClassUtils;import java.io.IOException;import java.io.OutputStream;import java.text.MessageFormat;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import org.apache.commons.net.ftp.FTPClient;import org.apache.commons.net.ftp.FTPFile;import org.apache.commons.pool.ObjectPool;import org.apache.commons.pool.impl.GenericObjectPool;public class FtpConnector extends AbstractConnector{ public static final String FTP = "ftp"; // endpoint properties public static final String PROPERTY_POLLING_FREQUENCY = "pollingFrequency"; // inbound only public static final int DEFAULT_POLLING_FREQUENCY = 1000; public static final String PROPERTY_OUTPUT_PATTERN = "outputPattern"; // outbound only public static final String PROPERTY_PASSIVE_MODE = "passive"; public static final String PROPERTY_BINARY_TRANSFER = "binary"; // message properties public static final String PROPERTY_FILENAME = "filename"; /** * TODO it makes sense to have a type-safe adapter for FTP specifically, but without * Java 5's covariant return types the benefits are diminished. Keeping it simple for now. */ public static final String DEFAULT_FTP_CONNECTION_FACTORY_CLASS = "org.mule.transport.ftp.FtpConnectionFactory"; /** * Time in milliseconds to poll. On each poll the poll() method is called */ private long pollingFrequency; private String outputPattern; private FilenameParser filenameParser = new SimpleFilenameParser(); private boolean passive = true; private boolean binary = true; /** * Whether to test FTP connection on each take from pool. */ private boolean validateConnections = true; private Map pools = new HashMap(); private String connectionFactoryClass = DEFAULT_FTP_CONNECTION_FACTORY_CLASS; public String getProtocol() { return FTP; } public MessageReceiver createReceiver(Service service, InboundEndpoint endpoint) throws Exception { List args = getReceiverArguments(endpoint.getProperties()); return serviceDescriptor.createMessageReceiver(this, service, endpoint, args.toArray()); } protected List getReceiverArguments(Map endpointProperties) { List args = new ArrayList(); long polling = getPollingFrequency(); if (endpointProperties != null) { // Override properties on the endpoint for the specific endpoint String tempPolling = (String) endpointProperties.get(PROPERTY_POLLING_FREQUENCY); if (tempPolling != null) { polling = Long.parseLong(tempPolling); } } if (polling <= 0) { polling = DEFAULT_POLLING_FREQUENCY; } logger.debug("set polling frequency to " + polling); args.add(new Long(polling)); return args; } /** * @return Returns the pollingFrequency. */ public long getPollingFrequency() { return pollingFrequency; } /** * @param pollingFrequency The pollingFrequency to set. */ public void setPollingFrequency(long pollingFrequency) { this.pollingFrequency = pollingFrequency; } /** * Getter for property 'connectionFactoryClass'. * * @return Value for property 'connectionFactoryClass'. */ public String getConnectionFactoryClass() { return connectionFactoryClass; } /** * Setter for property 'connectionFactoryClass'. Should be an instance of * {@link FtpConnectionFactory}. * * @param connectionFactoryClass Value to set for property 'connectionFactoryClass'. */ public void setConnectionFactoryClass(final String connectionFactoryClass) { this.connectionFactoryClass = connectionFactoryClass; } public FTPClient getFtp(EndpointURI uri) throws Exception { if (logger.isDebugEnabled()) { logger.debug(">>> retrieving client for " + uri); } return (FTPClient) getFtpPool(uri).borrowObject(); } public void releaseFtp(EndpointURI uri, FTPClient client) throws Exception { if (logger.isDebugEnabled()) { logger.debug("<<< releasing client for " + uri); } if (dispatcherFactory.isCreateDispatcherPerRequest()) { destroyFtp(uri, client); } else { getFtpPool(uri).returnObject(client); } } public void destroyFtp(EndpointURI uri, FTPClient client) throws Exception { if (logger.isDebugEnabled()) { logger.debug("<<< destroying client for " + uri); } try { getFtpPool(uri).invalidateObject(client); } catch (Exception e) { // no way to test if pool is closed except try to access it logger.debug(e.getMessage()); } } protected synchronized ObjectPool getFtpPool(EndpointURI uri) { if (logger.isDebugEnabled()) { logger.debug("=== get pool for " + uri); } String key = uri.getUser() + ":" + uri.getPassword() + "@" + uri.getHost() + ":" + uri.getPort(); ObjectPool pool = (ObjectPool) pools.get(key); if (pool == null) { try { FtpConnectionFactory connectionFactory = (FtpConnectionFactory) ClassUtils.instanciateClass(getConnectionFactoryClass(), new Object[] {uri}, getClass()); pool = new GenericObjectPool(connectionFactory); ((GenericObjectPool) pool).setTestOnBorrow(this.validateConnections); pools.put(key, pool); } catch (Exception ex) { throw new MuleRuntimeException( MessageFactory.createStaticMessage("Hmm, couldn't instanciate FTP connection factory."), ex); } } return pool; } protected void doInitialise() throws InitialisationException { try { Class objectFactoryClass = ClassUtils.loadClass(this.connectionFactoryClass, getClass()); if (!FtpConnectionFactory.class.isAssignableFrom(objectFactoryClass)) { throw new InitialisationException(MessageFactory.createStaticMessage( "FTP connectionFactoryClass is not an instance of org.mule.transport.ftp.FtpConnectionFactory"), this); } } catch (ClassNotFoundException e) { throw new InitialisationException(e, this); } } protected void doDispose() { // template method } protected void doConnect() throws Exception { // template method } protected void doDisconnect() throws Exception { // template method } protected void doStart() throws MuleException { // template method } protected void doStop() throws MuleException { if (logger.isDebugEnabled()) { logger.debug("!!! stopping all pools"); } try { for (Iterator it = pools.values().iterator(); it.hasNext();) { ObjectPool pool = (ObjectPool)it.next(); pool.close(); } } catch (Exception e) { throw new ConnectorException(CoreMessages.failedToStop("FTP Connector"), this, e); } } /** * @return Returns the outputPattern. */ public String getOutputPattern() { return outputPattern; } /** * @param outputPattern The outputPattern to set. */ public void setOutputPattern(String outputPattern) { this.outputPattern = outputPattern; } /** * @return Returns the filenameParser. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -