⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tarfile.py

📁 python s60 1.4.5版本的源代码
💻 PY
📖 第 1 页 / 共 4 页
字号:
        if hasattr(os, "mkfifo"):
            os.mkfifo(targetpath)
        else:
            raise ExtractError, "fifo not supported by system"

    def makedev(self, tarinfo, targetpath):
        
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError, "special devices not supported by system"

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        # XXX This if statement should go away when
        # python-2.3a0-devicemacros patch succeeds.
        if hasattr(os, "makedev"):
            os.mknod(targetpath, mode,
                     os.makedev(tarinfo.devmajor, tarinfo.devminor))
        else:
            os.mknod(targetpath, mode,
                     tarinfo.devmajor, tarinfo.devminor)

    def makelink(self, tarinfo, targetpath):
        
        linkpath = tarinfo.linkname
        try:
            if tarinfo.issym():
                os.symlink(linkpath, targetpath)
            else:
                os.link(linkpath, targetpath)
        except AttributeError:
            if tarinfo.issym():
                linkpath = os.path.join(os.path.dirname(tarinfo.name),
                                        linkpath)
                linkpath = normpath(linkpath)

            try:
                self._extract_member(self.getmember(linkpath), targetpath)
            except (EnvironmentError, KeyError), e:
                linkpath = os.path.normpath(linkpath)
                try:
                    shutil.copy2(linkpath, targetpath)
                except EnvironmentError, e:
                    raise IOError, "link could not be created"

    def chown(self, tarinfo, targetpath):
        
        if pwd and hasattr(os, "geteuid") and os.geteuid() == 0:
            # We have to be root to do so.
            try:
                g = grp.getgrnam(tarinfo.gname)[2]
            except KeyError:
                try:
                    g = grp.getgrgid(tarinfo.gid)[2]
                except KeyError:
                    g = os.getgid()
            try:
                u = pwd.getpwnam(tarinfo.uname)[2]
            except KeyError:
                try:
                    u = pwd.getpwuid(tarinfo.uid)[2]
                except KeyError:
                    u = os.getuid()
            try:
                if tarinfo.issym() and hasattr(os, "lchown"):
                    os.lchown(targetpath, u, g)
                else:
                    os.chown(targetpath, u, g)
            except EnvironmentError, e:
                raise ExtractError, "could not change owner"

    def chmod(self, tarinfo, targetpath):
        
        try:
            os.chmod(targetpath, tarinfo.mode)
        except EnvironmentError, e:
            raise ExtractError, "could not change mode"

    def utime(self, tarinfo, targetpath):
        
        if sys.platform == "win32" and tarinfo.isdir():
            # According to msdn.microsoft.com, it is an error (EACCES)
            # to use utime() on directories.
            return
        try:
            os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime))
        except EnvironmentError, e:
            raise ExtractError, "could not change modification time"

    #--------------------------------------------------------------------------

    def next(self):
        
        self._check("ra")
        if self.firstmember is not None:
            m = self.firstmember
            self.firstmember = None
            return m

        # Read the next block.
        self.fileobj.seek(self.chunks[-1])
        while True:
            buf = self.fileobj.read(BLOCKSIZE)
            if not buf:
                return None
            try:
                tarinfo = TarInfo.frombuf(buf)
            except ValueError:
                if self.ignore_zeros:
                    if buf.count(NUL) == BLOCKSIZE:
                        adj = "empty"
                    else:
                        adj = "invalid"
                    self._dbg(2, "0x%X: %s block" % (self.offset, adj))
                    self.offset += BLOCKSIZE
                    continue
                else:
                    # Block is empty or unreadable.
                    if self.chunks[-1] == 0:
                        # If the first block is invalid. That does not
                        # look like a tar archive we can handle.
                        raise ReadError,"empty, unreadable or compressed file"
                    return None
            break

        # We shouldn't rely on this checksum, because some tar programs
        # calculate it differently and it is merely validating the
        # header block. We could just as well skip this part, which would
        # have a slight effect on performance...
        if tarinfo.chksum != calc_chksum(buf):
            self._dbg(1, "tarfile: Bad Checksum %r" % tarinfo.name)

        # Set the TarInfo object's offset to the current position of the
        # TarFile and set self.offset to the position where the data blocks
        # should begin.
        tarinfo.offset = self.offset
        self.offset += BLOCKSIZE

        # Check if the TarInfo object has a typeflag for which a callback
        # method is registered in the TYPE_METH. If so, then call it.
        if tarinfo.type in self.TYPE_METH:
            tarinfo = self.TYPE_METH[tarinfo.type](self, tarinfo)
        else:
            tarinfo.offset_data = self.offset
            if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
                # Skip the following data blocks.
                self.offset += self._block(tarinfo.size)

        if tarinfo.isreg() and tarinfo.name[:-1] == "/":
            # some old tar programs don't know DIRTYPE
            tarinfo.type = DIRTYPE

        self.members.append(tarinfo)
        self.membernames.append(tarinfo.name)
        self.chunks.append(self.offset)
        return tarinfo

    #--------------------------------------------------------------------------
    # Below are some methods which are called for special typeflags in the
    # next() method, e.g. for unwrapping GNU longname/longlink blocks. They
    # are registered in TYPE_METH below. You can register your own methods
    # with this mapping.
    # A registered method is called with a TarInfo object as only argument.
    #
    # During its execution the method MUST perform the following tasks:
    # 1. set tarinfo.offset_data to the position where the data blocks begin,
    #    if there is data to follow.
    # 2. set self.offset to the position where the next member's header will
    #    begin.
    # 3. return a valid TarInfo object.

    def proc_gnulong(self, tarinfo):
        
        buf = ""
        name = None
        linkname = None
        count = tarinfo.size
        while count > 0:
            block = self.fileobj.read(BLOCKSIZE)
            buf += block
            self.offset += BLOCKSIZE
            count -= BLOCKSIZE

        if tarinfo.type == GNUTYPE_LONGNAME:
            name = nts(buf)
        if tarinfo.type == GNUTYPE_LONGLINK:
            linkname = nts(buf)

        buf = self.fileobj.read(BLOCKSIZE)

        tarinfo = TarInfo.frombuf(buf)
        tarinfo.offset = self.offset
        self.offset += BLOCKSIZE
        tarinfo.offset_data = self.offset
        tarinfo.name = name or tarinfo.name
        tarinfo.linkname = linkname or tarinfo.linkname

        if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
            # Skip the following data blocks.
            self.offset += self._block(tarinfo.size)

        return tarinfo

    def proc_sparse(self, tarinfo):
        
        buf = tarinfo.tobuf()
        sp = _ringbuffer()
        pos = 386
        lastpos = 0L
        realpos = 0L
        # There are 4 possible sparse structs in the
        # first header.
        for i in xrange(4):
            try:
                offset = int(buf[pos:pos + 12], 8)
                numbytes = int(buf[pos + 12:pos + 24], 8)
            except ValueError:
                break
            if offset > lastpos:
                sp.append(_hole(lastpos, offset - lastpos))
            sp.append(_data(offset, numbytes, realpos))
            realpos += numbytes
            lastpos = offset + numbytes
            pos += 24

        isextended = ord(buf[482])
        origsize = int(buf[483:495], 8)

        # If the isextended flag is given,
        # there are extra headers to process.
        while isextended == 1:
            buf = self.fileobj.read(BLOCKSIZE)
            self.offset += BLOCKSIZE
            pos = 0
            for i in xrange(21):
                try:
                    offset = int(buf[pos:pos + 12], 8)
                    numbytes = int(buf[pos + 12:pos + 24], 8)
                except ValueError:
                    break
                if offset > lastpos:
                    sp.append(_hole(lastpos, offset - lastpos))
                sp.append(_data(offset, numbytes, realpos))
                realpos += numbytes
                lastpos = offset + numbytes
                pos += 24
            isextended = ord(buf[504])

        if lastpos < origsize:
            sp.append(_hole(lastpos, origsize - lastpos))

        tarinfo.sparse = sp

        tarinfo.offset_data = self.offset
        self.offset += self._block(tarinfo.size)
        tarinfo.size = origsize
        return tarinfo

    # The type mapping for the next() method. The keys are single character
    # strings, the typeflag. The values are methods which are called when
    # next() encounters such a typeflag.
    TYPE_METH = {
        GNUTYPE_LONGNAME: proc_gnulong,
        GNUTYPE_LONGLINK: proc_gnulong,
        GNUTYPE_SPARSE:   proc_sparse
    }

    #--------------------------------------------------------------------------
    # Little helper methods:

    def _block(self, count):
        
        blocks, remainder = divmod(count, BLOCKSIZE)
        if remainder:
            blocks += 1
        return blocks * BLOCKSIZE

    def _getmember(self, name, tarinfo=None):
        
        if tarinfo is None:
            end = len(self.members)
        else:
            end = self.members.index(tarinfo)

        for i in xrange(end - 1, -1, -1):
            if name == self.membernames[i]:
                return self.members[i]

    def _load(self):
        
        while True:
            tarinfo = self.next()
            if tarinfo is None:
                break
        self._loaded = True

    def _check(self, mode=None):
        
        if self.closed:
            raise IOError, "%s is closed" % self.__class__.__name__
        if mode is not None and self._mode not in mode:
            raise IOError, "bad operation for mode %r" % self._mode

    def __iter__(self):
        
        if self._loaded:
            return iter(self.members)
        else:
            return TarIter(self)

    def _create_gnulong(self, name, type):
        
        tarinfo = TarInfo()
        tarinfo.name = "././@LongLink"
        tarinfo.type = type
        tarinfo.mode = 0
        tarinfo.size = len(name)

        # write extended header
        self.fileobj.write(tarinfo.tobuf())
        # write name blocks
        self.fileobj.write(name)
        blocks, remainder = divmod(tarinfo.size, BLOCKSIZE)
        if remainder > 0:
            self.fileobj.write(NUL * (BLOCKSIZE - remainder))
            blocks += 1
        self.offset += blocks * BLOCKSIZE

    def _dbg(self, level, msg):
        
        if level <= self.debug:
            print >> sys.stderr, msg
# class TarFile

class TarIter:
    

    def __init__(self, tarfile):
        
        self.tarfile = tarfile
    def __iter__(self):
        
        return self
    def next(self):
        
        tarinfo = self.tarfile.next()
        if not tarinfo:
            self.tarfile._loaded = True
            raise StopIteration
        return tarinfo

# Helper classes for sparse file support
class _section:
    
    def __init__(self, offset, size):
        self.offset = offset
        self.size = size
    def __contains__(self, offset):
        return self.offset <= offset < self.offset + self.size

class _data(_section):
    
    def __init__(self, offset, size, realpos):
        _section.__init__(self, offset, size)
        self.realpos = realpos

class _hole(_section):
    
    pass

class _ringbuffer(list):
    
    def __init__(self):
        self.idx = 0
    def find(self, offset):
        idx = self.idx
        while True:
            item = self[idx]
            if offset in item:
                break
            idx += 1
            if idx == len(self):
                idx = 0
            if idx == self.idx:
                # End of File
                return None
        self.idx = idx
        return item

#---------------------------------------------
# zipfile compatible TarFile class
#---------------------------------------------
TAR_PLAIN = 0           # zipfile.ZIP_STORED
TAR_GZIPPED = 8         # zipfile.ZIP_DEFLATED
class TarFileCompat:
    
    def __init__(self, file, mode="r", compression=TAR_PLAIN):
        if compression == TAR_PLAIN:
            self.tarfile = TarFile.taropen(file, mode)
        elif compression == TAR_GZIPPED:
            self.tarfile = TarFile.gzopen(file, mode)
        else:
            raise ValueError, "unknown compression constant"
        if mode[0:1] == "r":
            members = self.tarfile.getmembers()
            for i in xrange(len(members)):
                m = members[i]
                m.filename = m.name
                m.file_size = m.size
                m.date_time = time.gmtime(m.mtime)[:6]
    def namelist(self):
        return map(lambda m: m.name, self.infolist())
    def infolist(self):
        return filter(lambda m: m.type in REGULAR_TYPES,
                      self.tarfile.getmembers())
    def printdir(self):
        self.tarfile.list()
    def testzip(self):
        return
    def getinfo(self, name):
        return self.tarfile.getmember(name)
    def read(self, name):
        return self.tarfile.extractfile(self.tarfile.getmember(name)).read()
    def write(self, filename, arcname=None, compress_type=None):
        self.tarfile.add(filename, arcname)
    def writestr(self, zinfo, bytes):
        import StringIO
        import calendar
        zinfo.name = zinfo.filename
        zinfo.size = zinfo.file_size
        zinfo.mtime = calendar.timegm(zinfo.date_time)
        self.tarfile.addfile(zinfo, StringIO.StringIO(bytes))
    def close(self):
        self.tarfile.close()
#class TarFileCompat

#--------------------
# exported functions
#--------------------
def is_tarfile(name):
    
    try:
        t = open(name)
        t.close()
        return True
    except TarError:
        return False

open = TarFile.open

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -