📄 tarfile.py
字号:
# Portions Copyright (c) 2005 Nokia Corporation
#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
#-------------------------------------------------------------------
# tarfile.py
#
# Module for reading and writing .tar and tar.gz files.
#
# Needs at least Python version 2.2.
#
# Please consult the html documentation in this distribution
# for further details on how to use tarfile.
#
#-------------------------------------------------------------------
# Copyright (C) 2002 Lars Gust鋌el <lars@gustaebel.de>
# All rights reserved.
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
__version__ = "$Revision: 1.2 $"
# $Source: /isource/cvsroot/amaretto-python/python-port-s60/symbian_python/Lib/tarfile.py,v $
version = "0.6.7"
__author__ = "Lars Gust鋌el (lars@gustaebel.de)"
__date__ = "$Date: 2004/04/22 12:09:21 $"
__cvsid__ = "$Id: tarfile.py,v 1.2 2004/04/22 12:09:21 kpetrlai Exp $"
__credits__ = "Gustavo Niemeyer, Niels Gust鋌el, Richard Townsend"
#---------
# Imports
#---------
import sys
import os
import shutil
import stat
import errno
import time
import struct
try:
import grp, pwd
except ImportError:
grp = pwd = None
# We won't need this anymore in Python 2.3
#
# We import the _tarfile extension, that contains
# some useful functions to handle devices and symlinks.
# We inject them into os module, as if we were under 2.3.
#
try:
import _tarfile
if _tarfile.mknod is None:
_tarfile = None
except ImportError:
_tarfile = None
if _tarfile and not hasattr(os, "mknod"):
os.mknod = _tarfile.mknod
if _tarfile and not hasattr(os, "major"):
os.major = _tarfile.major
if _tarfile and not hasattr(os, "minor"):
os.minor = _tarfile.minor
if _tarfile and not hasattr(os, "makedev"):
os.makedev = _tarfile.makedev
if _tarfile and not hasattr(os, "lchown"):
os.lchown = _tarfile.lchown
# XXX remove for release (2.3)
try:
True
False
except NameError:
True = 1
False = 0
# from tarfile import *
__all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError"]
#---------------------------------------------------------
# tar constants
#---------------------------------------------------------
NUL = "\0" # the null character
BLOCKSIZE = 512 # length of processing blocks
RECORDSIZE = BLOCKSIZE * 20 # length of records
MAGIC = "ustar" # magic tar string
VERSION = "00" # version number
LENGTH_NAME = 100 # maximum length of a filename
LENGTH_LINK = 100 # maximum length of a linkname
LENGTH_PREFIX = 155 # maximum length of the prefix field
MAXSIZE_MEMBER = 077777777777L # maximum size of a file (11 octal digits)
REGTYPE = "0" # regular file
AREGTYPE = "\0" # regular file
LNKTYPE = "1" # link (inside tarfile)
SYMTYPE = "2" # symbolic link
CHRTYPE = "3" # character special device
BLKTYPE = "4" # block special device
DIRTYPE = "5" # directory
FIFOTYPE = "6" # fifo special device
CONTTYPE = "7" # contiguous file
GNUTYPE_LONGNAME = "L" # GNU tar extension for longnames
GNUTYPE_LONGLINK = "K" # GNU tar extension for longlink
GNUTYPE_SPARSE = "S" # GNU tar extension for sparse file
#---------------------------------------------------------
# tarfile constants
#---------------------------------------------------------
SUPPORTED_TYPES = (REGTYPE, AREGTYPE, LNKTYPE, # file types that tarfile
SYMTYPE, DIRTYPE, FIFOTYPE, # can cope with.
CONTTYPE, CHRTYPE, BLKTYPE,
GNUTYPE_LONGNAME, GNUTYPE_LONGLINK,
GNUTYPE_SPARSE)
REGULAR_TYPES = (REGTYPE, AREGTYPE, # file types that somehow
CONTTYPE, GNUTYPE_SPARSE) # represent regular files
#---------------------------------------------------------
# Bits used in the mode field, values in octal.
#---------------------------------------------------------
S_IFLNK = 0120000 # symbolic link
S_IFREG = 0100000 # regular file
S_IFBLK = 0060000 # block device
S_IFDIR = 0040000 # directory
S_IFCHR = 0020000 # character device
S_IFIFO = 0010000 # fifo
TSUID = 04000 # set UID on execution
TSGID = 02000 # set GID on execution
TSVTX = 01000 # reserved
TUREAD = 0400 # read by owner
TUWRITE = 0200 # write by owner
TUEXEC = 0100 # execute/search by owner
TGREAD = 0040 # read by group
TGWRITE = 0020 # write by group
TGEXEC = 0010 # execute/search by group
TOREAD = 0004 # read by other
TOWRITE = 0002 # write by other
TOEXEC = 0001 # execute/search by other
#---------------------------------------------------------
# Some useful functions
#---------------------------------------------------------
def nts(s):
return s.split(NUL, 1)[0]
def calc_chksum(buf):
chk = 256 # chksum field is treated as blanks,
# so the initial value is 8 * ord(" ")
for c in buf[:148]: chk += ord(c) # sum up all bytes before chksum
for c in buf[156:]: chk += ord(c) # sum up all bytes after chksum
return chk
def copyfileobj(src, dst, length=None):
if length == 0:
return
if length is None:
shutil.copyfileobj(src, dst)
return
BUFSIZE = 16 * 1024
blocks, remainder = divmod(length, BUFSIZE)
for b in xrange(blocks):
buf = src.read(BUFSIZE)
if len(buf) < BUFSIZE:
raise IOError, "end of file reached"
dst.write(buf)
if remainder != 0:
buf = src.read(remainder)
if len(buf) < remainder:
raise IOError, "end of file reached"
dst.write(buf)
return
filemode_table = (
(S_IFLNK, "l",
S_IFREG, "-",
S_IFBLK, "b",
S_IFDIR, "d",
S_IFCHR, "c",
S_IFIFO, "p"),
(TUREAD, "r"),
(TUWRITE, "w"),
(TUEXEC, "x", TSUID, "S", TUEXEC|TSUID, "s"),
(TGREAD, "r"),
(TGWRITE, "w"),
(TGEXEC, "x", TSGID, "S", TGEXEC|TSGID, "s"),
(TOREAD, "r"),
(TOWRITE, "w"),
(TOEXEC, "x", TSVTX, "T", TOEXEC|TSVTX, "t"))
def filemode(mode):
s = ""
for t in filemode_table:
while True:
if mode & t[0] == t[0]:
s += t[1]
elif len(t) > 2:
t = t[2:]
continue
else:
s += "-"
break
return s
if os.sep != "/":
normpath = lambda path: os.path.normpath(path).replace(os.sep, "/")
else:
normpath = os.path.normpath
class TarError(Exception):
pass
class ExtractError(TarError):
pass
class ReadError(TarError):
pass
class CompressionError(TarError):
pass
class StreamError(TarError):
pass
#---------------------------
# internal stream interface
#---------------------------
class _LowLevelFile:
def __init__(self, name, mode):
mode = {
"r": os.O_RDONLY,
"w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
}[mode]
if hasattr(os, "O_BINARY"):
mode |= os.O_BINARY
self.fd = os.open(name, mode)
def close(self):
os.close(self.fd)
def read(self, size):
return os.read(self.fd, size)
def write(self, s):
os.write(self.fd, s)
class _Stream:
def __init__(self, name, mode, comptype, fileobj, bufsize):
self._extfileobj = True
if fileobj is None:
fileobj = _LowLevelFile(name, mode)
self._extfileobj = False
if comptype == '*':
# Enable transparent compression detection for the
# stream interface
fileobj = _StreamProxy(fileobj)
comptype = fileobj.getcomptype()
self.name = name or ""
self.mode = mode
self.comptype = comptype
self.fileobj = fileobj
self.bufsize = bufsize
self.buf = ""
self.pos = 0L
self.closed = False
if comptype == "gz":
try:
import zlib
except ImportError:
raise CompressionError, "zlib module is not available"
self.zlib = zlib
self.crc = zlib.crc32("")
if mode == "r":
self._init_read_gz()
else:
self._init_write_gz()
if comptype == "bz2":
try:
import bz2
except ImportError:
raise CompressionError, "bz2 module is not available"
if mode == "r":
self.dbuf = ""
self.cmp = bz2.BZ2Decompressor()
else:
self.cmp = bz2.BZ2Compressor()
def __del__(self):
if hasattr(self, "closed") and not self.closed:
self.close()
def _init_write_gz(self):
self.cmp = self.zlib.compressobj(9, self.zlib.DEFLATED,
-self.zlib.MAX_WBITS,
self.zlib.DEF_MEM_LEVEL,
0)
timestamp = struct.pack("<L", long(time.time()))
self.__write("\037\213\010\010%s\002\377" % timestamp)
if self.name.endswith(".gz"):
self.name = self.name[:-3]
self.__write(self.name + NUL)
def write(self, s):
if self.comptype == "gz":
self.crc = self.zlib.crc32(s, self.crc)
self.pos += len(s)
if self.comptype != "tar":
s = self.cmp.compress(s)
self.__write(s)
def __write(self, s):
self.buf += s
while len(self.buf) > self.bufsize:
self.fileobj.write(self.buf[:self.bufsize])
self.buf = self.buf[self.bufsize:]
def close(self):
if self.closed:
return
if self.mode == "w" and self.buf:
if self.comptype != "tar":
self.buf += self.cmp.flush()
self.fileobj.write(self.buf)
self.buf = ""
if self.comptype == "gz":
self.fileobj.write(struct.pack("<l", self.crc))
self.fileobj.write(struct.pack("<L", self.pos))
if not self._extfileobj:
self.fileobj.close()
self.closed = True
def _init_read_gz(self):
self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS)
self.dbuf = ""
# taken from gzip.GzipFile with some alterations
if self.__read(2) != "\037\213":
raise ReadError, "not a gzip file"
if self.__read(1) != "\010":
raise CompressionError, "unsupported compression method"
flag = ord(self.__read(1))
self.__read(6)
if flag & 4:
xlen = ord(self.__read(1)) + 256 * ord(self.__read(1))
self.read(xlen)
if flag & 8:
while True:
s = self.__read(1)
if not s or s == NUL:
break
if flag & 16:
while True:
s = self.__read(1)
if not s or s == NUL:
break
if flag & 2:
self.__read(2)
def tell(self):
return self.pos
def seek(self, pos=0):
if pos - self.pos >= 0:
blocks, remainder = divmod(pos - self.pos, self.bufsize)
for i in xrange(blocks):
self.read(self.bufsize)
self.read(remainder)
else:
raise StreamError, "seeking backwards is not allowed"
return self.pos
def read(self, size=None):
if size is None:
t = []
while True:
buf = self._read(self.bufsize)
if not buf:
break
t.append(buf)
buf = "".join(t)
else:
buf = self._read(size)
self.pos += len(buf)
return buf
def _read(self, size):
if self.comptype == "tar":
return self.__read(size)
c = len(self.dbuf)
t = [self.dbuf]
while c < size:
buf = self.__read(self.bufsize)
if not buf:
break
buf = self.cmp.decompress(buf)
t.append(buf)
c += len(buf)
t = "".join(t)
self.dbuf = t[size:]
return t[:size]
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -