📄 zipfile.py
字号:
def infolist(self): """Return a list of class ZipInfo instances for files in the archive.""" return self.filelist def printdir(self): """Print a table of contents for the zip file.""" print "%-46s %19s %12s" % ("File Name", "Modified ", "Size") for zinfo in self.filelist: date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time print "%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size) def testzip(self): """Read all the files and check the CRC.""" for zinfo in self.filelist: try: self.read(zinfo.filename) # Check CRC-32 except: return zinfo.filename def getinfo(self, name): """Return the instance of ZipInfo given 'name'.""" return self.NameToInfo[name] def read(self, name): """Return file bytes (as a string) for name.""" if self.mode not in ("r", "a"): raise RuntimeError, 'read() requires mode "r" or "a"' if not self.fp: raise RuntimeError, \ "Attempt to read ZIP archive that was already closed" zinfo = self.getinfo(name) filepos = self.fp.tell() self.fp.seek(zinfo.file_offset, 0) bytes = self.fp.read(zinfo.compress_size) self.fp.seek(filepos, 0) if zinfo.compress_type == ZIP_STORED: pass elif zinfo.compress_type == ZIP_DEFLATED: if not zlib: raise RuntimeError, \ "De-compression requires the (missing) zlib module" # zlib compress/decompress code by Jeremy Hylton of CNRI dc = zlib.decompressobj(-15) bytes = dc.decompress(bytes) # need to feed in unused pad byte so that zlib won't choke ex = dc.decompress('Z') + dc.flush() if ex: bytes = bytes + ex else: raise BadZipfile, \ "Unsupported compression method %d for file %s" % \ (zinfo.compress_type, name) crc = binascii.crc32(bytes) if crc != zinfo.CRC: raise BadZipfile, "Bad CRC-32 for file %s" % name return bytes def _writecheck(self, zinfo): """Check for errors before writing a file to the archive.""" if self.NameToInfo.has_key(zinfo.filename): if self.debug: # Warning for duplicate names print "Duplicate name:", zinfo.filename if self.mode not in ("w", "a"): raise RuntimeError, 'write() requires mode "w" or "a"' if not self.fp: raise RuntimeError, \ "Attempt to write ZIP archive that was already closed" if zinfo.compress_type == ZIP_DEFLATED and not zlib: raise RuntimeError, \ "Compression requires the (missing) zlib module" if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED): raise RuntimeError, \ "That compression method is not supported" def write(self, filename, arcname=None, compress_type=None): """Put the bytes from filename into the archive under the name arcname.""" st = os.stat(filename) mtime = time.localtime(st[8]) date_time = mtime[0:6] # Create ZipInfo instance to store file information if arcname is None: zinfo = ZipInfo(filename, date_time) else: zinfo = ZipInfo(arcname, date_time) zinfo.external_attr = st[0] << 16 # Unix attributes if compress_type is None: zinfo.compress_type = self.compression else: zinfo.compress_type = compress_type self._writecheck(zinfo) fp = open(filename, "rb") zinfo.flag_bits = 0x00 zinfo.header_offset = self.fp.tell() # Start of header bytes # Must overwrite CRC and sizes with correct data later zinfo.CRC = CRC = 0 zinfo.compress_size = compress_size = 0 zinfo.file_size = file_size = 0 self.fp.write(zinfo.FileHeader()) zinfo.file_offset = self.fp.tell() # Start of file bytes if zinfo.compress_type == ZIP_DEFLATED: cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) else: cmpr = None while 1: buf = fp.read(1024 * 8) if not buf: break file_size = file_size + len(buf) CRC = binascii.crc32(buf, CRC) if cmpr: buf = cmpr.compress(buf) compress_size = compress_size + len(buf) self.fp.write(buf) fp.close() if cmpr: buf = cmpr.flush() compress_size = compress_size + len(buf) self.fp.write(buf) zinfo.compress_size = compress_size else: zinfo.compress_size = file_size zinfo.CRC = CRC zinfo.file_size = file_size # Seek backwards and write CRC and file sizes position = self.fp.tell() # Preserve current position in file self.fp.seek(zinfo.header_offset + 14, 0) self.fp.write(struct.pack("<lll", zinfo.CRC, zinfo.compress_size, zinfo.file_size)) self.fp.seek(position, 0) self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo def writestr(self, zinfo, bytes): """Write a file into the archive. The contents is the string 'bytes'.""" self._writecheck(zinfo) zinfo.file_size = len(bytes) # Uncompressed size zinfo.CRC = binascii.crc32(bytes) # CRC-32 checksum if zinfo.compress_type == ZIP_DEFLATED: co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) bytes = co.compress(bytes) + co.flush() zinfo.compress_size = len(bytes) # Compressed size else: zinfo.compress_size = zinfo.file_size zinfo.header_offset = self.fp.tell() # Start of header bytes self.fp.write(zinfo.FileHeader()) zinfo.file_offset = self.fp.tell() # Start of file bytes self.fp.write(bytes) if zinfo.flag_bits & 0x08: # Write CRC and file sizes after the file data self.fp.write(struct.pack("<lll", zinfo.CRC, zinfo.compress_size, zinfo.file_size)) self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo def __del__(self): """Call the "close()" method in case the user forgot.""" self.close() def close(self): """Close the file, and for mode "w" and "a" write the ending records.""" if self.fp is None: return if self.mode in ("w", "a"): # write ending records count = 0 pos1 = self.fp.tell() for zinfo in self.filelist: # write central directory count = count + 1 dt = zinfo.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) centdir = struct.pack(structCentralDir, stringCentralDir, zinfo.create_version, zinfo.create_system, zinfo.extract_version, zinfo.reserved, zinfo.flag_bits, zinfo.compress_type, dostime, dosdate, zinfo.CRC, zinfo.compress_size, zinfo.file_size, len(zinfo.filename), len(zinfo.extra), len(zinfo.comment), 0, zinfo.internal_attr, zinfo.external_attr, zinfo.header_offset) self.fp.write(centdir) self.fp.write(zinfo.filename) self.fp.write(zinfo.extra) self.fp.write(zinfo.comment) pos2 = self.fp.tell() # Write end-of-zip-archive record endrec = struct.pack(structEndArchive, stringEndArchive, 0, 0, count, count, pos2 - pos1, pos1, 0) self.fp.write(endrec) self.fp.flush() if not self._filePassed: self.fp.close() self.fp = Noneclass PyZipFile(ZipFile): """Class to create ZIP archives with Python library files and packages.""" def writepy(self, pathname, basename = ""): """Add all files from "pathname" to the ZIP archive. If pathname is a package directory, search the directory and all package subdirectories recursively for all *.py and enter the modules into the archive. If pathname is a plain directory, listdir *.py and enter all modules. Else, pathname must be a Python *.py file and the module will be put into the archive. Added modules are always module.pyo or module.pyc. This method will compile the module.py into module.pyc if necessary. """ dir, name = os.path.split(pathname) if os.path.isdir(pathname): initname = os.path.join(pathname, "__init__.py") if os.path.isfile(initname): # This is a package directory, add it if basename: basename = "%s/%s" % (basename, name) else: basename = name if self.debug: print "Adding package in", pathname, "as", basename fname, arcname = self._get_codename(initname[0:-3], basename) if self.debug: print "Adding", arcname self.write(fname, arcname) dirlist = os.listdir(pathname) dirlist.remove("__init__.py") # Add all *.py files and package subdirectories for filename in dirlist: path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if os.path.isdir(path): if os.path.isfile(os.path.join(path, "__init__.py")): # This is a package directory, add it self.writepy(path, basename) # Recursive call elif ext == ".py": fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print "Adding", arcname self.write(fname, arcname) else: # This is NOT a package directory, add its files at top level if self.debug: print "Adding files from directory", pathname for filename in os.listdir(pathname): path = os.path.join(pathname, filename) root, ext = os.path.splitext(filename) if ext == ".py": fname, arcname = self._get_codename(path[0:-3], basename) if self.debug: print "Adding", arcname self.write(fname, arcname) else: if pathname[-3:] != ".py": raise RuntimeError, \ 'Files added with writepy() must end with ".py"' fname, arcname = self._get_codename(pathname[0:-3], basename) if self.debug: print "Adding file", arcname self.write(fname, arcname) def _get_codename(self, pathname, basename): """Return (filename, archivename) for the path. Given a module name path, return the correct file path and archive name, compiling if necessary. For example, given /python/lib/string, return (/python/lib/string.pyc, string). """ file_py = pathname + ".py" file_pyc = pathname + ".pyc" file_pyo = pathname + ".pyo" if os.path.isfile(file_pyo) and \ os.stat(file_pyo)[8] >= os.stat(file_py)[8]: fname = file_pyo # Use .pyo file elif not os.path.isfile(file_pyc) or \ os.stat(file_pyc)[8] < os.stat(file_py)[8]: import py_compile if self.debug: print "Compiling", file_py py_compile.compile(file_py, file_pyc) fname = file_pyc else: fname = file_pyc archivename = os.path.split(fname)[1] if basename: archivename = "%s/%s" % (basename, archivename) return (fname, archivename)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -