⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 shell.py

📁 linux下的一款播放器
💻 PY
📖 第 1 页 / 共 2 页
字号:
# # ***** BEGIN LICENSE BLOCK *****# Source last modified: $Id: shell.py,v 1.21 2004/07/07 22:00:04 hubbe Exp $# # Portions Copyright (c) 1995-2004 RealNetworks, Inc. All Rights Reserved.# # The contents of this file, and the files included with this file,# are subject to the current version of the RealNetworks Public# Source License (the "RPSL") available at# http://www.helixcommunity.org/content/rpsl unless you have licensed# the file under the current version of the RealNetworks Community# Source License (the "RCSL") available at# http://www.helixcommunity.org/content/rcsl, in which case the RCSL# will apply. You may also obtain the license terms directly from# RealNetworks.  You may not use this file except in compliance with# the RPSL or, if you have a valid RCSL with RealNetworks applicable# to this file, the RCSL.  Please see the applicable RPSL or RCSL for# the rights, obligations and limitations governing use of the# contents of the file.# # Alternatively, the contents of this file may be used under the# terms of the GNU General Public License Version 2 or later (the# "GPL") in which case the provisions of the GPL are applicable# instead of those above. If you wish to allow use of your version of# this file only under the terms of the GPL, and not to allow others# to use your version of this file under the terms of either the RPSL# or RCSL, indicate your decision by deleting the provisions above# and replace them with the notice and other provisions required by# the GPL. If you do not delete the provisions above, a recipient may# use your version of this file under the terms of any one of the# RPSL, the RCSL or the GPL.# # This file is part of the Helix DNA Technology. RealNetworks is the# developer of the Original Code and owns the copyrights in the# portions it created.# # This file, and the files included with this file, is distributed# and made available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY# KIND, EITHER EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS# ALL SUCH WARRANTIES, INCLUDING WITHOUT LIMITATION, ANY WARRANTIES# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, QUIET# ENJOYMENT OR NON-INFRINGEMENT.# # Technology Compatibility Kit Test Suite(s) Location:#    http://www.helixcommunity.org/content/tck# # Contributor(s):# # ***** END LICENSE BLOCK *****# """Python implementation of some common shell and file system utilitiesmissing from the os and os.path module."""import osimport sysimport stringimport getoptimport statimport reimport posixpathimport globimport typesimport time## unix-specifictry:    import signal    import select    if sys.version[:3] < '2.2':        import fcntl, FCNTL    else:            ## Python 2.2+        import fcntl        FCNTL = fcntl # backwards compatibilityexcept:    pass## macintosh-specific globalstry:    import macfs    import MacOSexcept:    pass## win32 extensionstry:    import win32api    import win32security    import win32pipe    import win32process    import win32file    import win32event    import pywintypesexcept:    win32api = None## exceptionerror = "shell.py error"## maximum chunk to read at a time while copying filesMAX_READ = 8192## shell utility implementationclass ShellUtilities:    def _open(self, path, mode):        try:            return open(path, mode)        except IOError, e:            raise error, 'open("%s","%s") failed with IOError="%s" in "%s"' % (                path, mode, str(e), os.getcwd())    def _popen(self, command, mode, dir=None):        old_dir=os.getcwd();        if dir:            os.chdir(dir)        try:            ret=os.popen(command, mode)        except os.error, e:            raise error, 'os.popen("%s") failed os.error="%s" in "%s"' % (                command, str(e), os.getcwd())        os.chdir(old_dir)        return ret    def rm(self, rm_path):        try:            files=glob.glob(rm_path)        except:            if os.path.exists(rm_path):                files=[rm_path]            else:                files=[]        for path in files:            self._rm_single(path)    def cp(self, source_path, destination_path):        for path in glob.glob(source_path):            if os.path.isfile(path):                self._cp_single(path, destination_path)            elif os.path.isdir(path):                self._cp_dir(path, destination_path)    def cpdir(self, source_path, destination_path):        for path in glob.glob(source_path):            if not os.path.isdir(path):                raise error, 'cpdir() "%s" not a directory' % (path)            bdir = os.path.basename(source_path)            dpath = os.path.join(destination_path, bdir)            self._cp_dir(path, dpath)    def find(self, path):        file_list, directory_list = self._find(path)        return file_list, directory_list    def mkdir(self, dir):        if not dir or os.path.isdir(dir):            return        self.mkdir(os.path.dirname(dir))        try:            os.mkdir(dir)        except OSError:            if not os.path.isdir(dir):                raise    def run(self, command, line_cb, timeout_seconds, dir = None):        pipe = self._popen(command, "r", dir)        output = ""        lastline = ""        while 1:            try:                tmp = pipe.read(4096)            except:                break            if not tmp:                break            output = output + tmp            if line_cb:                lastline = lastline + tmp                lines = string.split(lastline, "\n")                lastline = lines[-1]                for line in lines[:-1]:                    line_cb(line+"\n")        if len(lastline):            line_cb(lastline)        status = pipe.close()        return status, output    ## Please do not use!!!    def _make_path_relative(self, path):        if not path:            return        while path[0] == os.sep:            path = path[1:]        return path    def _rm_single(self, path):        ## first take a wild stab at it        try:            os.remove(path)        except os.error:            pass        else:            return        ## remove a directory        if os.path.isdir(path):            try:                name_list = os.listdir(path)            except os.error:                name_list = []            for name in name_list:                fullname = os.path.join(path, name)                self._rm_single(fullname)            ## remove the directory            os.rmdir(path)            return        ## Try again        try:            try:                os.chmod(path, 0666)            except:                pass            os.remove(path)        except os.error:            pass        else:            return        raise error, 'Failed to delete file/dir = %s' % path    def _cp_dir(self, source_path, destination_path):        ## does the directory exist        if not os.path.isdir(source_path):            raise error, 'cp_dir() cannot find source directory="%s"' (                source_path)        self.mkdir(destination_path)        src = [ [] ]        for relparts in src:            dir = apply(os.path.join, [ source_path ] + relparts)            dest_dir = apply(os.path.join, [ destination_path ] + relparts)            for file in os.listdir(dir):                full_file = os.path.join(dir, file)                full_dest = os.path.join(dest_dir, file)                if os.path.isdir(full_file) and not os.path.islink(full_file):                    src.append( relparts + [ file ] )  # This makes it recursive                    self.mkdir( full_dest )                else:                    self._cp_single( full_file, full_dest )    def _cp_single(self, source_path, destination_path):        ## does file exist?        if not os.path.isfile(source_path):            raise error, 'cannot find source="%s"' % (source_path)        ## if the destination of the copy is a path, then we        ## need to modify the destionation_path to have the        ## source filename        if os.path.isdir(destination_path):            destination_path = os.path.join(destination_path,                                            os.path.basename(source_path))        ## preform the copy        self._platform_cp(source_path, destination_path)    def _platform_cp(self, source_path, destination_path):        ## copy file data        source = self._open(source_path, "rb")        destination = self._open(destination_path, "wb")        self._raw_copy(source, destination)        source.close()        destination.close()        ## stat the file; get file mode, atime, and mtime        st = os.stat(source_path)        if os.name == 'posix':            os.chmod(destination_path, st[stat.ST_MODE])        os.utime(destination_path, (st[stat.ST_ATIME], st[stat.ST_MTIME]))    def _raw_copy(self, source, destination):        data = source.read(MAX_READ)        while data:            destination.write(data)            data = source.read(MAX_READ)    def _find(self, directory):        directory_list = []        file_list = []        ## list directory and separate the files from directories        for path in os.listdir(directory):            path = os.path.join(directory, path)            if os.path.isdir(path) and not os.path.islink(path):                directory_list.append(path)            else:                file_list.append(path)        ## recurse into directories        for dir in directory_list:            fl, dl = self._find(dir)            file_list = file_list + fl            directory_list = directory_list + dl        return file_list, directory_listclass UNIXShellUtilities(ShellUtilities):    thread_safe=1    class _pipe:        "Wrapper for a unix fd which can wait() on a child process at close time."        def __init__(self, fd, child_pid):            self.fd = fd            self.child_pid = child_pid        def kill(self):            try:                os.kill(self.child_pid, 9)            except os.error, e:                pass        def eof(self):            pid, status = os.waitpid(self.child_pid, os.WNOHANG)            if pid:                os.close(self.fd)                self.fd = -1                return status            return None        def close(self):            if self.fd >= 0:                os.close(self.fd)                self.fd = -1                return os.waitpid(self.child_pid, 0)[1]            return None        def read(self, howmuch):            return os.read(self.fd, howmuch)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -