⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lftp_client.py

📁 CoralFTP是一款用Python语言编写的工作在GTK2环境下的FTP客户端软件
💻 PY
📖 第 1 页 / 共 2 页
字号:
#!/usr/bin/env python# -*- coding: utf-8 -*-# Copyright (C) 1994  Ling Li## This program is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2 of the License, or# (at your option) any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU Library General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program; if not, write to the Free Software# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.import stringfrom threading import Thread, Lock, Eventfrom Queue import Queueimport pexpectimport osimport threadfrom logging import debug, info, error, warningfrom transfer_view import *from log_view import LogViewUpdaterfrom configuration import config_key, config_valueimport ftp_enginefrom ftp_engine import *from ftp_exception import *from utils import *class LftpClient:    LFTP_PROGRAM = 'lftp'    PROMPT_STRING = "__coralftp__"    def __init__(self, ftp_engine):        self.__ftp_engine = ftp_engine        self.__config = self.__ftp_engine.config        # new debug info listener thread        self.__fifo_path = os.tmpnam()        self.__debug_listening = DebugListeningThread(self,                                                      ftp_engine.log_updater)    def __del__(self):        if self.__proc:            self.__proc.close(0)    def run(self):        self.__debug_listening.start()        logging.debug('lftp monitor thread %d' % thread.get_ident())        # start lftp program        self.__proc = pexpect.spawn("lftp")        if self.__proc.isalive():            logging.info("lftp started.")        else:            logging.error("failed to start lftp.")        # initial setup        self.__proc.sendline("set prompt " + LftpClient.PROMPT_STRING)        self.__proc.expect("\r" + LftpClient.PROMPT_STRING)        #print self.__proc.before        LftpCommand(self, "debug -o %s 5" % self.__fifo_path).execute()        LftpVersionCommand(self).execute()        # apply more configuration        # anonymous password        value = config_value(self.__config, 'general', 'email_address')        if value != '':            LftpCommand(self, 'set ftp:anon-pass "%s"' % value).execute()        # connect timeout        value = config_value(self.__config, 'general', 'connect_timeout')        if value > 0:            LftpCommand(self, 'set net:timeout %d' % value).execute()        # connect try delay        value = config_value(self.__config, 'general', 'connect_retry_delay')        if value > 0:            LftpCommand(self, 'set net:reconnect-interval-base %d' % value).execute()            LftpCommand(self, 'set net:reconnect-interval-max %d' % (value+1)).execute()            LftpCommand(self, 'set net:reconnect-interval-multiplier 1.0').execute()    def interrupt(self):        self.__proc.kill(2)    def quit(self):        LftpQuitCommand(self).execute()        def new_command(self, cmd_type, *args):        if cmd_type == ftp_engine.CMD_QUIT:            return LftpQuitCommand(self, *args)        elif cmd_type == ftp_engine.CMD_OPEN:            return LftpOpenCommand(self, *args)        elif cmd_type == ftp_engine.CMD_CD:            return LftpCdCommand(self, *args)        elif cmd_type == ftp_engine.CMD_LIST:            return LftpListCommand(self, *args)        elif cmd_type == ftp_engine.CMD_LISTD:            return LftpListdCommand(self, *args)        elif cmd_type == ftp_engine.CMD_PWD:            return LftpPwdCommand(self, *args)        elif cmd_type == ftp_engine.CMD_PUT:            return LftpPutCommand(self, *args)        elif cmd_type == ftp_engine.CMD_GET:            return LftpGetCommand(self, *args)        elif cmd_type == ftp_engine.CMD_MKDIR:            return LftpMkdirCommand(self, *args)        elif cmd_type == ftp_engine.CMD_DELETE:            return LftpDeleteCommand(self, *args)        elif cmd_type == ftp_engine.CMD_MOVE:            return LftpMoveCommand(self, *args)        else:            raise ValueError    def execute_command(self, command):        return command.execute()            def __getattr__(self, name):        if name == 'ftp_thread':            return self.__ftp_thread        elif name == 'proc':            return self.__proc        elif name == 'debug_fifo_path':            return self.__fifo_path        elif name == 'ftp_engine':            return self.__ftp_engine        else:            raise AttributeErrorclass DebugListeningThread(Thread):    def __init__(self, lftp, log_updater):        Thread.__init__(self)        self.__lftp = lftp        self.__path = lftp.debug_fifo_path        self.__log_updater = log_updater        os.mkfifo(self.__path)        return    def run(self):        logging.debug("Logging thread started")        re_quit = re.compile('.*---- Closing idle connection')        self.__fd = open(self.__path)        line = None        while line != '':# and not self.__lftp.ftp_engine.quit_event.isSet():            line = self.__fd.readline()            if re_quit.match(line):                gtk.idle_add(self.__lftp.ftp_engine.emit,                             'idle-connection-closed')            idle_add(self.__log_updater,                     unicode(line,                             self.__lftp.ftp_engine.site_info.remote_charset))        self.__fd.close()        os.unlink(self.__path)        logging.debug('fifo reading thread quit')class LftpCommand:    MSG_INTERRUPTED = 'Interrupted'    def __init__(self, lftp, command):        self.lftp = lftp        self.command = command    def execute(self):        self.proc = self.lftp.proc        if isinstance(self.command, unicode):            self.proc.sendline(self.command.encode(                self.lftp.ftp_engine.site_info.remote_charset))        else:            self.proc.sendline(self.command)        logging.debug('send command %s' % self.proc.readline()[:-1])        return self.parse_result()    def parse_result(self):        index = self.proc.expect(            [LftpClient.PROMPT_STRING,             self.MSG_INTERRUPTED             ])        if index == 0:            result = string.rstrip(self.proc.before)            logging.debug("return: " + result)            return result        elif index == 1:            raise FTPInteruptedException()class LftpVersionCommand(LftpCommand):    def __init__(self, lftp):        LftpCommand.__init__(self, lftp, "version")class LftpOpenCommand(LftpCommand):    UNKNOWN_HOST = 'open: (.*): Unknown host'    def __init__(self, lftp, site_info):        url = "ftp://%s:%d" % (site_info.server_name, site_info.server_port)        username = site_info.username        self.__password = site_info.password        if username == 'None' and (self.__password == '' or self.__password == None):            self.__password = 'unknown@coralstudio.org'        remote_path = site_info.remote_path        if remote_path == None or remote_path == '':            remote_path = '-'        if username != None:            cmd = 'open -e "cd %s" -u %s %s' % (remote_path, username, url)        else:            cmd = 'open -e "cd %s" %s' % (remote_path, url)        LftpCommand.__init__(self, lftp, cmd)    def parse_result(self):        index = self.proc.expect([            LftpClient.PROMPT_STRING,            LftpOpenCommand.UNKNOWN_HOST,            "Password"])        if index == 0:            return        elif index == 1:            s = self.proc.match.string            self.proc.expect(LftpClient.PROMPT_STRING)            raise FTPUnknownHostException(s)        elif index == 2:            self.proc.sendline(self.__password)            self.proc.readline()            return self.parse_result()class LftpCdCommand(LftpCommand):    def __init__(self, lftp, path):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -