📄 shell.py
字号:
# # ***** BEGIN LICENSE BLOCK *****# Source last modified: $Id: shell.py,v 1.21 2004/07/07 22:00:04 hubbe Exp $# # Portions Copyright (c) 1995-2004 RealNetworks, Inc. All Rights Reserved.# # The contents of this file, and the files included with this file,# are subject to the current version of the RealNetworks Public# Source License (the "RPSL") available at# http://www.helixcommunity.org/content/rpsl unless you have licensed# the file under the current version of the RealNetworks Community# Source License (the "RCSL") available at# http://www.helixcommunity.org/content/rcsl, in which case the RCSL# will apply. You may also obtain the license terms directly from# RealNetworks. You may not use this file except in compliance with# the RPSL or, if you have a valid RCSL with RealNetworks applicable# to this file, the RCSL. Please see the applicable RPSL or RCSL for# the rights, obligations and limitations governing use of the# contents of the file.# # Alternatively, the contents of this file may be used under the# terms of the GNU General Public License Version 2 or later (the# "GPL") in which case the provisions of the GPL are applicable# instead of those above. If you wish to allow use of your version of# this file only under the terms of the GPL, and not to allow others# to use your version of this file under the terms of either the RPSL# or RCSL, indicate your decision by deleting the provisions above# and replace them with the notice and other provisions required by# the GPL. If you do not delete the provisions above, a recipient may# use your version of this file under the terms of any one of the# RPSL, the RCSL or the GPL.# # This file is part of the Helix DNA Technology. RealNetworks is the# developer of the Original Code and owns the copyrights in the# portions it created.# # This file, and the files included with this file, is distributed# and made available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY# KIND, EITHER EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS# ALL SUCH WARRANTIES, INCLUDING WITHOUT LIMITATION, ANY WARRANTIES# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, QUIET# ENJOYMENT OR NON-INFRINGEMENT.# # Technology Compatibility Kit Test Suite(s) Location:# http://www.helixcommunity.org/content/tck# # Contributor(s):# # ***** END LICENSE BLOCK *****# """Python implementation of some common shell and file system utilitiesmissing from the os and os.path module."""import osimport sysimport stringimport getoptimport statimport reimport posixpathimport globimport typesimport time## unix-specifictry: import signal import select if sys.version[:3] < '2.2': import fcntl, FCNTL else: ## Python 2.2+ import fcntl FCNTL = fcntl # backwards compatibilityexcept: pass## macintosh-specific globalstry: import macfs import MacOSexcept: pass## win32 extensionstry: import win32api import win32security import win32pipe import win32process import win32file import win32event import pywintypesexcept: win32api = None## exceptionerror = "shell.py error"## maximum chunk to read at a time while copying filesMAX_READ = 8192## shell utility implementationclass ShellUtilities: def _open(self, path, mode): try: return open(path, mode) except IOError, e: raise error, 'open("%s","%s") failed with IOError="%s" in "%s"' % ( path, mode, str(e), os.getcwd()) def _popen(self, command, mode, dir=None): old_dir=os.getcwd(); if dir: os.chdir(dir) try: ret=os.popen(command, mode) except os.error, e: raise error, 'os.popen("%s") failed os.error="%s" in "%s"' % ( command, str(e), os.getcwd()) os.chdir(old_dir) return ret def rm(self, rm_path): try: files=glob.glob(rm_path) except: if os.path.exists(rm_path): files=[rm_path] else: files=[] for path in files: self._rm_single(path) def cp(self, source_path, destination_path): for path in glob.glob(source_path): if os.path.isfile(path): self._cp_single(path, destination_path) elif os.path.isdir(path): self._cp_dir(path, destination_path) def cpdir(self, source_path, destination_path): for path in glob.glob(source_path): if not os.path.isdir(path): raise error, 'cpdir() "%s" not a directory' % (path) bdir = os.path.basename(source_path) dpath = os.path.join(destination_path, bdir) self._cp_dir(path, dpath) def find(self, path): file_list, directory_list = self._find(path) return file_list, directory_list def mkdir(self, dir): if not dir or os.path.isdir(dir): return self.mkdir(os.path.dirname(dir)) try: os.mkdir(dir) except OSError: if not os.path.isdir(dir): raise def run(self, command, line_cb, timeout_seconds, dir = None): pipe = self._popen(command, "r", dir) output = "" lastline = "" while 1: try: tmp = pipe.read(4096) except: break if not tmp: break output = output + tmp if line_cb: lastline = lastline + tmp lines = string.split(lastline, "\n") lastline = lines[-1] for line in lines[:-1]: line_cb(line+"\n") if len(lastline): line_cb(lastline) status = pipe.close() return status, output ## Please do not use!!! def _make_path_relative(self, path): if not path: return while path[0] == os.sep: path = path[1:] return path def _rm_single(self, path): ## first take a wild stab at it try: os.remove(path) except os.error: pass else: return ## remove a directory if os.path.isdir(path): try: name_list = os.listdir(path) except os.error: name_list = [] for name in name_list: fullname = os.path.join(path, name) self._rm_single(fullname) ## remove the directory os.rmdir(path) return ## Try again try: try: os.chmod(path, 0666) except: pass os.remove(path) except os.error: pass else: return raise error, 'Failed to delete file/dir = %s' % path def _cp_dir(self, source_path, destination_path): ## does the directory exist if not os.path.isdir(source_path): raise error, 'cp_dir() cannot find source directory="%s"' ( source_path) self.mkdir(destination_path) src = [ [] ] for relparts in src: dir = apply(os.path.join, [ source_path ] + relparts) dest_dir = apply(os.path.join, [ destination_path ] + relparts) for file in os.listdir(dir): full_file = os.path.join(dir, file) full_dest = os.path.join(dest_dir, file) if os.path.isdir(full_file) and not os.path.islink(full_file): src.append( relparts + [ file ] ) # This makes it recursive self.mkdir( full_dest ) else: self._cp_single( full_file, full_dest ) def _cp_single(self, source_path, destination_path): ## does file exist? if not os.path.isfile(source_path): raise error, 'cannot find source="%s"' % (source_path) ## if the destination of the copy is a path, then we ## need to modify the destionation_path to have the ## source filename if os.path.isdir(destination_path): destination_path = os.path.join(destination_path, os.path.basename(source_path)) ## preform the copy self._platform_cp(source_path, destination_path) def _platform_cp(self, source_path, destination_path): ## copy file data source = self._open(source_path, "rb") destination = self._open(destination_path, "wb") self._raw_copy(source, destination) source.close() destination.close() ## stat the file; get file mode, atime, and mtime st = os.stat(source_path) if os.name == 'posix': os.chmod(destination_path, st[stat.ST_MODE]) os.utime(destination_path, (st[stat.ST_ATIME], st[stat.ST_MTIME])) def _raw_copy(self, source, destination): data = source.read(MAX_READ) while data: destination.write(data) data = source.read(MAX_READ) def _find(self, directory): directory_list = [] file_list = [] ## list directory and separate the files from directories for path in os.listdir(directory): path = os.path.join(directory, path) if os.path.isdir(path) and not os.path.islink(path): directory_list.append(path) else: file_list.append(path) ## recurse into directories for dir in directory_list: fl, dl = self._find(dir) file_list = file_list + fl directory_list = directory_list + dl return file_list, directory_listclass UNIXShellUtilities(ShellUtilities): thread_safe=1 class _pipe: "Wrapper for a unix fd which can wait() on a child process at close time." def __init__(self, fd, child_pid): self.fd = fd self.child_pid = child_pid def kill(self): try: os.kill(self.child_pid, 9) except os.error, e: pass def eof(self): pid, status = os.waitpid(self.child_pid, os.WNOHANG) if pid: os.close(self.fd) self.fd = -1 return status return None def close(self): if self.fd >= 0: os.close(self.fd) self.fd = -1 return os.waitpid(self.child_pid, 0)[1] return None def read(self, howmuch): return os.read(self.fd, howmuch)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -