常用模块:
-
模块介绍
-
time &datetime模块
-
random
-
os
-
sys
-
shutil
-
json & picle
-
shelve
-
xml处理
-
yaml处理
-
configparser
-
hashlib
-
subprocess
-
logging模块
模块分为三种:
-
自定义模块
-
内置标准模块(又称标准库)
-
开源模块
#_*_coding:utf-8_*_ __author__ = ‘Alex Li‘ import time # print(time.clock()) #返回处理器时间,3.3开始已废弃 , 改成了time.process_time()测量处理器运算时间,不包括sleep时间,不稳定,mac上测不出来 # print(time.altzone) #返回与utc时间的时间差,以秒计算\\ # print(time.asctime()) #返回时间格式"Fri Aug 19 11:14:16 2016", # print(time.localtime()) #返回本地时间 的struct time对象格式 # print(time.gmtime(time.time()-800000)) #返回utc时间的struc时间对象格式 # print(time.asctime(time.localtime())) #返回时间格式"Fri Aug 19 11:14:16 2016", #print(time.ctime()) #返回Fri Aug 19 12:38:29 2016 格式, 同上 # 日期字符串 转成 时间戳 # string_2_struct = time.strptime("2016/05/22","%Y/%m/%d") #将 日期字符串 转成 struct时间对象格式 # print(string_2_struct) # #
# struct_2_stamp = time.mktime(string_2_struct) #将struct时间对象转成时间戳 # print(struct_2_stamp) #将时间戳转为字符串格式 # print(time.gmtime(time.time()-86640)) #将utc时间戳转换成struct_time格式 # print(time.strftime("%Y-%m-%d %H:%M:%S",time.gmtime()) ) #将utc struct_time格式转成指定的字符串格式 #时间加减 import datetime # print(datetime.datetime.now()) #返回 2016-08-19 12:47:03.941925 #print(datetime.date.fromtimestamp(time.time()) ) # 时间戳直接转成日期格式 2016-08-19 # print(datetime.datetime.now() ) # print(datetime.datetime.now() + datetime.timedelta(3)) #当前时间+3天 # print(datetime.datetime.now() + datetime.timedelta(-3)) #当前时间-3天 # print(datetime.datetime.now() + datetime.timedelta(hours=3)) #当前时间+3小时 # print(datetime.datetime.now() + datetime.timedelta(minutes=30)) #当前时间+30分 # # c_time = datetime.datetime.now() # print(c_time.replace(minute=3,hour=2)) #时间替换
Directive | Meaning | Notes |
---|---|---|
%a |
Locale’s abbreviated weekday name. | |
%A |
Locale’s full weekday name. | |
%b |
Locale’s abbreviated month name. | |
%B |
Locale’s full month name. | |
%c |
Locale’s appropriate date and time representation. | |
%d |
Day of the month as a decimal number [01,31]. | |
%H |
Hour (24-hour clock) as a decimal number [00,23]. | |
%I |
Hour (12-hour clock) as a decimal number [01,12]. | |
%j |
Day of the year as a decimal number [001,366]. | |
%m |
Month as a decimal number [01,12]. | |
%M |
Minute as a decimal number [00,59]. | |
%p |
Locale’s equivalent of either AM or PM. | (1) |
%S |
Second as a decimal number [00,61]. | (2) |
%U |
Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Sunday are considered to be in week 0. | (3) |
%w |
Weekday as a decimal number [0(Sunday),6]. | |
%W |
Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Monday are considered to be in week 0. | (3) |
%x |
Locale’s appropriate date representation. | |
%X |
Locale’s appropriate time representation. | |
%y |
Year without century as a decimal number [00,99]. | |
%Y |
Year with century as a decimal number. | |
%z |
Time zone offset indicating a positive or negative time difference from UTC/GMT of the form +HHMM or -HHMM, where H represents decimal hour digits and M represents decimal minute digits [-23:59, +23:59]. | |
%Z |
Time zone name (no characters if no time zone exists). | |
%% |
A literal ‘%‘ character. |
时间格式转换方式
random模块
随机数
import random print random.random() print random.randint(1,2) print random.randrange(1,10)
生成随机验证码
import random checkcode = ‘‘ for i in range(4): current = random.randrange(0,4) if current != i: temp = chr(random.randint(65,90)) else: temp = random.randint(0,9) checkcode += str(temp) print checkcode
OS模块
import os os.getcwd() 获取当前工作目录,即当前python脚本工作的目录路径 os.chdir("dirname") 改变当前脚本工作目录;相当于shell下cd os.curdir 返回当前目录: (‘.‘) os.pardir 获取当前目录的父目录字符串名:(‘..‘) os.makedirs(‘dirname1/dirname2‘) 可生成多层递归目录 os.removedirs(‘dirname1‘) 若目录为空,则删除,并递归到上一级目录,如若也为空,则删除,依此类推 os.mkdir(‘dirname‘) 生成单级目录;相当于shell中mkdir dirname os.rmdir(‘dirname‘) 删除单级空目录,若目录不为空则无法删除,报错;相当于shell中rmdir dirname os.listdir(‘dirname‘) 列出指定目录下的所有文件和子目录,包括隐藏文件,并以列表方式打印 os.remove() 删除一个文件 os.rename("oldname","newname") 重命名文件/目录 os.stat(‘path/filename‘) 获取文件/目录信息 os.sep 输出操作系统特定的路径分隔符,win下为"\\\\",Linux下为"/" os.linesep 输出当前平台使用的行终止符,win下为"\\t\\n",Linux下为"\\n" os.pathsep 输出用于分割文件路径的字符串 os.name 输出字符串指示当前使用平台。win->‘nt‘; Linux->‘posix‘ os.system("bash command") 运行shell命令,直接显示 os.environ 获取系统环境变量 os.path.abspath(path) 返回path规范化的绝对路径 os.path.split(path) 将path分割成目录和文件名二元组返回 os.path.dirname(path) 返回path的目录。其实就是os.path.split(path)的第一个元素 os.path.basename(path) 返回path最后的文件名。如何path以/或\\结尾,那么就会返回空值。即os.path.split(path)的第二个元素 os.path.exists(path) 如果path存在,返回True;如果path不存在,返回False os.path.isabs(path) 如果path是绝对路径,返回True os.path.isfile(path) 如果path是一个存在的文件,返回True。否则返回False os.path.isdir(path) 如果path是一个存在的目录,则返回True。否则返回False os.path.join(path1[, path2[, ...]]) 将多个路径组合后返回,第一个绝对路径之前的参数将被忽略 os.path.getatime(path) 返回path所指向的文件或者目录的最后存取时间 os.path.getmtime(path) 返回path所指向的文件或者目录的最后修改时间
sys模块
sys.argv 命令行参数List,第一个元素是程序本身路径 sys.exit(n) 退出程序,正常退出时exit(0) sys.version 获取Python解释程序的版本信息 sys.maxint 最大的Int值 sys.path 返回模块的搜索路径,初始化时使用PYTHONPATH环境变量的值 sys.platform 返回操作系统平台名称 sys.stdout.write(‘please:‘) val = sys.stdin.readline()[:-1]
shutil模块
高级的 文件、文件夹、压缩包 处理模块
shutil.copyfileobj(fsrc, fdst[, length])
将文件内容拷贝到另一个文件中,可以部分内容
#先来看看其源代码。 def copyfileobj(fsrc, fdst, length=16*1024): """copy data from file-like object fsrc to file-like object fdst""" while 1: buf = fsrc.read(length) if not buf: break fdst.write(buf) #注意! 在其中fsrc,fdst都是文件对象,都需要打开后才能进行复制操作
测试情况
shutil.copyfile(src,dst) #copy文件内容
def copyfile(src, dst, *, follow_symlinks=True): """Copy data from src to dst. If follow_symlinks is not set and src is a symbolic link, a new symlink will be created instead of copying the file it points to. """ if _samefile(src, dst): raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) for fn in [src, dst]: try: st = os.stat(fn) except OSError: # File most likely does not exist pass else: # XXX What about other special files? (sockets, devices...) if stat.S_ISFIFO(st.st_mode): raise SpecialFileError("`%s` is a named pipe" % fn) if not follow_symlinks and os.path.islink(src): os.symlink(os.readlink(src), dst) else: with open(src, ‘rb‘) as fsrc: with open(dst, ‘wb‘) as fdst: copyfileobj(fsrc, fdst) return dst
测试代码
shutil.copymode(src,dst) #仅copy权限,不更改文件内容,组和用户
def copymode(src, dst, *, follow_symlinks=True): """Copy mode bits from src to dst. If follow_symlinks is not set, symlinks aren‘t followed if and only if both `src` and `dst` are symlinks. If `lchmod` isn‘t available (e.g. Linux) this method does nothing. """ if not follow_symlinks and os.path.islink(src) and os.path.islink(dst): if hasattr(os, ‘lchmod‘): stat_func, chmod_func = os.lstat, os.lchmod else: return elif hasattr(os, ‘chmod‘): stat_func, chmod_func = os.stat, os.chmod else: return st = stat_func(src) chmod_func(dst, stat.S_IMODE(st.st_mode)) 查看源代码
测试代码源:https://www.cnblogs.com/MnCu8261/p/5494807.html
#先看两个文件的权限 [[email protected] python_test]# ls -l total 4 -rw-r--r--. 1 root root 79 May 14 05:17 test1 -rwxr-xr-x. 1 root root 0 May 14 19:10 test2 #运行命令 >>> import shutil >>> shutil.copymode(‘test1‘,‘test2‘) #查看结果 [[email protected] python_test]# ls -l total 4 -rw-r--r--. 1 root root 79 May 14 05:17 test1 -rw-r--r--. 1 root root 0 May 14 19:10 test2 #当我们将目标文件换为一个不存在的文件时报错 >>> shutil.copymode(‘test1‘,‘test3‘) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/python/lib/python3.4/shutil.py", line 132, in copymode chmod_func(dst, stat.S_IMODE(st.st_mode)) FileNotFoundError: [Errno 2] No such file or directory: ‘test233‘ 实例
shutil.copystat(src,dst) #复制所有的状态信息,包括权限,组,用户,时间等
def copystat(src, dst, *, follow_symlinks=True): """Copy all stat info (mode bits, atime, mtime, flags) from src to dst. If the optional flag `follow_symlinks` is not set, symlinks aren‘t followed if and only if both `src` and `dst` are symlinks. """ def _nop(*args, ns=None, follow_symlinks=None): pass # follow symlinks (aka don‘t not follow symlinks) follow = follow_symlinks or not (os.path.islink(src) and os.path.islink(dst)) if follow: # use the real function if it exists def lookup(name): return getattr(os, name, _nop) else: # use the real function only if it exists # *and* it supports follow_symlinks def lookup(name): fn = getattr(os, name, _nop) if fn in os.supports_follow_symlinks: return fn return _nop st = lookup("stat")(src, follow_symlinks=follow) mode = stat.S_IMODE(st.st_mode) lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), follow_symlinks=follow) try: lookup("chmod")(dst, mode, follow_symlinks=follow) except NotImplementedError: # if we got a NotImplementedError, it‘s because # * follow_symlinks=False, # * lchown() is unavailable, and # * either # * fchownat() is unavailable or # * fchownat() doesn‘t implement AT_SYMLINK_NOFOLLOW. # (it returned ENOSUP.) # therefore we‘re out of options--we simply cannot chown the # symlink. give up, suppress the error. # (which is what shutil always did in this circumstance.) pass if hasattr(st, ‘st_flags‘): try: lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) except OSError as why: for err in ‘EOPNOTSUPP‘, ‘ENOTSUP‘: if hasattr(errno, err) and why.errno == getattr(errno, err): break else: raise _copyxattr(src, dst, follow_symlinks=follow) 查看源代码
shutil.copy(src,dst) #复制文件的内容以及权限,先copyfile后copymode
def copy(src, dst, *, follow_symlinks=True): """Copy data and mode bits ("cp src dst"). Return the file‘s destination. The destination may be a directory. If follow_symlinks is false, symlinks won‘t be followed. This resembles GNU‘s "cp -P src dst". If source and destination are the same file, a SameFileError will be raised. """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) copyfile(src, dst, follow_symlinks=follow_symlinks) copymode(src, dst, follow_symlinks=follow_symlinks) return dst 查看源代码
shutil.copy2(src,dst) #复制文件的内容以及文件的所有状态信息。先copyfile后copystat
def copy2(src, dst, *, follow_symlinks=True): """Copy data and all stat info ("cp -p src dst"). Return the file‘s destination." The destination may be a directory. If follow_symlinks is false, symlinks won‘t be followed. This resembles GNU‘s "cp -P src dst". """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) copyfile(src, dst, follow_symlinks=follow_symlinks) copystat(src, dst, follow_symlinks=follow_symlinks) return dst 查看源代码
shutil.copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,ignore_dangling_symlinks=False) #递归的复制文件内容及状态信息
def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, ignore_dangling_symlinks=False): """Recursively copy a directory tree. The destination directory must not already exist. If exception(s) occur, an Error is raised with a list of reasons. If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic links are copied. If the file pointed by the symlink doesn‘t exist, an exception will be added in the list of errors raised in an Error exception at the end of the copy process. You can set the optional ignore_dangling_symlinks flag to true if you want to silence this exception. Notice that this has no effect on platforms that don‘t support os.symlink. The optional ignore argument is a callable. If given, it is called with the `src` parameter, which is the directory being visited by copytree(), and `names` which is the list of `src` contents, as returned by os.listdir(): callable(src, names) -> ignored_names Since copytree() is called recursively, the callable will be called once for each directory that is copied. It returns a list of names relative to the `src` directory that should not be copied. The optional copy_function argument is a callable that will be used to copy each file. It will be called with the source path and the destination path as arguments. By default, copy2() is used, but any function that supports the same signature (like copy()) can be used. """ names = os.listdir(src) if ignore is not None: ignored_names = ignore(src, names) else: ignored_names = set() os.makedirs(dst) errors = [] for name in names: if name in ignored_names: continue srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if os.path.islink(srcname): linkto = os.readlink(srcname) if symlinks: # We can‘t just leave it to `copy_function` because legacy # code with a custom `copy_function` may rely on copytree # doing the right thing. os.symlink(linkto, dstname) copystat(srcname, dstname, follow_symlinks=not symlinks) else: # ignore dangling symlink if the flag is on if not os.path.exists(linkto) and ignore_dangling_symlinks: continue # otherwise let the copy occurs. copy2 will raise an error if os.path.isdir(srcname): copytree(srcname, dstname, symlinks, ignore, copy_function) else: copy_function(srcname, dstname) elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks, ignore, copy_function) else: # Will raise a SpecialFileError for unsupported file types copy_function(srcname, dstname) # catch the Error from the recursive copytree so that we can # continue with other files except Error as err: errors.extend(err.args[0]) except OSError as why: errors.append((srcname, dstname, str(why))) try: copystat(src, dst) except OSError as why: # Copying file access times may fail on Windows if getattr(why, ‘winerror‘, None) is None: errors.append((src, dst, str(why))) if errors: raise Error(errors) return dst # version vulnerable to race conditions 查看源代码
测试代码源:https://www.cnblogs.com/MnCu8261/p/5494807.html
[ro[email protected] python_test]# tree copytree_test/ copytree_test/ └── test ├── test1 ├── test2 └── hahaha [[email protected] test]# ls -l total 0 -rw-r--r--. 1 python python 0 May 14 19:36 hahaha -rw-r--r--. 1 python python 0 May 14 19:36 test1 -rw-r--r--. 1 root root 0 May 14 19:36 test2 >>> shutil.copytree(‘copytree_test‘,‘copytree_copy‘) ‘copytree_copy‘ [[email protected] python_test]# ls -l total 12 drwxr-xr-x. 3 root root 4096 May 14 19:36 copytree_copy drwxr-xr-x. 3 root root 4096 May 14 19:36 copytree_test -rw-r--r--. 1 python python 79 May 14 05:17 test1 -rw-r--r--. 1 root root 0 May 14 19:10 test2 [[email protected] python_test]# tree copytree_copy/ copytree_copy/ └── test ├── hahaha ├── test1 └── test2 实例
shutil.rmtree(path, ignore_errors=False, onerror=None) #递归地删除文件
def rmtree(path, ignore_errors=False, onerror=None): """Recursively delete a directory tree. If ignore_errors is set, errors are ignored; otherwise, if onerror is set, it is called to handle the error with arguments (func, path, exc_info) where func is platform and implementation dependent; path is the argument to that function that caused it to fail; and exc_info is a tuple returned by sys.exc_info(). If ignore_errors is false and onerror is None, an exception is raised. """ if ignore_errors: def onerror(*args): pass elif onerror is None: def onerror(*args): raise if _use_fd_functions: # While the unsafe rmtree works fine on bytes, the fd based does not. if isinstance(path, bytes): path = os.fsdecode(path) # Note: To guard against symlink races, we use the standard # lstat()/open()/fstat() trick. try: orig_st = os.lstat(path) except Exception: onerror(os.lstat, path, sys.exc_info()) return try: fd = os.open(path, os.O_RDONLY) except Exception: onerror(os.lstat, path, sys.exc_info()) return try: if os.path.samestat(orig_st, os.fstat(fd)): _rmtree_safe_fd(fd, path, onerror) try: os.rmdir(path) except OSError: onerror(os.rmdir, path, sys.exc_info()) else: try: # symlinks to directories are forbidden, see bug #1669 raise OSError("Cannot call rmtree on a symbolic link") except OSError: onerror(os.path.islink, path, sys.exc_info()) finally: os.close(fd) else: return _rmtree_unsafe(path, onerror) # Allow introspection of whether or not the hardening against symlink # attacks is supported on the current platform rmtree.avoids_symlink_attacks = _use_fd_functions
测试效果
shutil.move(src, dst) #递归的移动文件
def move(src, dst): """Recursively move a file or directory to another location. This is similar to the Unix "mv" command. Return the file or directory‘s destination. If the destination is a directory or a symlink to a directory, the source is moved inside the directory. The destination path must not already exist. If the destination already exists but is not a directory, it may be overwritten depending on os.rename() semantics. If the destination is on our current filesystem, then rename() is used. Otherwise, src is copied to the destination and then removed. Symlinks are recreated under the new name if os.rename() fails because of cross filesystem renames. A lot more could be done here... A look at a mv.c shows a lot of the issues this implementation glosses over. """ real_dst = dst if os.path.isdir(dst): if _samefile(src, dst): # We might be on a case insensitive filesystem, # perform the rename anyway. os.rename(src, dst) return real_dst = os.path.join(dst, _basename(src)) if os.path.exists(real_dst): raise Error("Destination path ‘%s‘ already exists" % real_dst) try: os.rename(src, real_dst) except OSError: if os.path.islink(src): linkto = os.readlink(src) os.symlink(linkto, real_dst) os.unlink(src) elif os.path.isdir(src): if _destinsrc(src, dst): raise Error("Cannot move a directory ‘%s‘ into itself ‘%s‘." % (src, dst)) copytree(src, real_dst, symlinks=True) rmtree(src) else: copy2(src, real_dst) os.unlink(src) return real_dst 查看源代码
make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,dry_run=0, owner=None, group=None, logger=None) #压缩打包
def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, dry_run=0, owner=None, group=None, logger=None): """Create an archive file (eg. zip or tar). ‘base_name‘ is the name of the file to create, minus any format-specific extension; ‘format‘ is the archive format: one of "zip", "tar", "bztar" or "gztar". ‘root_dir‘ is a directory that will be the root directory of the archive; ie. we typically chdir into ‘root_dir‘ before creating the archive. ‘base_dir‘ is the directory where we start archiving from; ie. ‘base_dir‘ will be the common prefix of all files and directories in the archive. ‘root_dir‘ and ‘base_dir‘ both default to the current directory. Returns the name of the archive file. ‘owner‘ and ‘group‘ are used when creating a tar archive. By default, uses the current owner and group. """ save_cwd = os.getcwd() if root_dir is not None: if logger is not None: logger.debug("changing into ‘%s‘", root_dir) base_name = os.path.abspath(base_name) if not dry_run: os.chdir(root_dir) if base_dir is None: base_dir = os.curdir kwargs = {‘dry_run‘: dry_run, ‘logger‘: logger} try: format_info = _ARCHIVE_FORMATS[format] except KeyError: raise ValueError("unknown archive format ‘%s‘" % format) func = format_info[0] for arg, val in format_info[1]: kwargs[arg] = val if format != ‘zip‘: kwargs[‘owner‘] = owner kwargs[‘group‘] = group try: filename = func(base_name, base_dir, **kwargs) finally: if root_dir is not None: if logger is not None: logger.debug("changing back to ‘%s‘", save_cwd) os.chdir(save_cwd) return filename 查看源代码
base_name: 压缩打包后的文件名或者路径名
format: 压缩或者打包格式 "zip", "tar", "bztar"or "gztar"
root_dir : 将哪个目录或者文件打包(也就是源文件)
shutil 对压缩包的处理是调用 ZipFile 和 TarFile 两个模块来进行的,详细:http://www.cnblogs.com/wupeiqi/articles/4963027.html
import zipfile # 压缩 z = zipfile.ZipFile(‘laxi.zip‘, ‘w‘) z.write(‘a.log‘) z.write(‘data.data‘) z.close() # 解压 z = zipfile.ZipFile(‘laxi.zip‘, ‘r‘) z.extractall() z.close() zipfile 压缩解压
import tarfile # 压缩 tar = tarfile.open(‘your.tar‘,‘w‘) tar.add(‘/Users/wupeiqi/PycharmProjects/bbs2.zip‘, arcname=‘bbs2.zip‘) tar.add(‘/Users/wupeiqi/PycharmProjects/cmdb.zip‘, arcname=‘cmdb.zip‘) tar.close() # 解压 tar = tarfile.open(‘your.tar‘,‘r‘) tar.extractall() # 可设置解压地址 tar.close() tarfile 压缩解压
class ZipFile(object): """ Class with methods to open, read, write, close, list zip files. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=False) file: Either the path to the file, or a file-like object. If it is a path, the file will be opened and closed by ZipFile. mode: The mode can be either read "r", write "w" or append "a". compression: ZIP_STORED (no compression) or ZIP_DEFLATED (requires zlib). allowZip64: if True ZipFile will create files with ZIP64 extensions when needed, otherwise it will raise an exception when this would be necessary. """ fp = None # Set here since __del__ checks it def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False): """Open the ZIP file with mode read "r", write "w" or append "a".""" if mode not in ("r", "w", "a"): raise RuntimeError(‘ZipFile() requires mode "r", "w", or "a"‘) if compression == ZIP_STORED: pass elif compression == ZIP_DEFLATED: if not zlib: raise RuntimeError, "Compression requires the (missing) zlib module" else: raise RuntimeError, "That compression method is not supported" self._allowZip64 = allowZip64 self._didModify = False self.debug = 0 # Level of printing: 0 through 3 self.NameToInfo = {} # Find file info given name self.filelist = [] # List of ZipInfo instances for archive self.compression = compression # Method of compression self.mode = key = mode.replace(‘b‘, ‘‘)[0] self.pwd = None self._comment = ‘‘ # Check if we were passed a file-like object if isinstance(file, basestring): self._filePassed = 0 self.filename = file modeDict = {‘r‘ : ‘rb‘, ‘w‘: ‘wb‘, ‘a‘ : ‘r+b‘} try: self.fp = open(file, modeDict[mode]) except IOError: if mode == ‘a‘: mode = key = ‘w‘ self.fp = open(file, modeDict[mode]) else: raise else: self._filePassed = 1 self.fp = file self.filename = getattr(file, ‘name‘, None) try: if key == ‘r‘: self._RealGetContents() elif key == ‘w‘: # set the modified flag so central directory gets written # even if no files are added to the archive self._didModify = True elif key == ‘a‘: try: # See if file is a zip file self._RealGetContents() # seek to start of directory and overwrite self.fp.seek(self.start_dir, 0) except BadZipfile: # file is not a zip file, just append self.fp.seek(0, 2) # set the modified flag so central directory gets written # even if no files are added to the archive self._didModify = True else: raise RuntimeError(‘Mode must be "r", "w" or "a"‘) except: fp = self.fp self.fp = None if not self._filePassed: fp.close() raise def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def _RealGetContents(self): """Read in the table of contents for the ZIP file.""" fp = self.fp try: endrec = _EndRecData(fp) except IOError: raise BadZipfile("File is not a zip file") if not endrec: raise BadZipfile, "File is not a zip file" if self.debug > 1: print endrec size_cd = endrec[_ECD_SIZE] # bytes in central directory offset_cd = endrec[_ECD_OFFSET] # offset of central directory self._comment = endrec[_ECD_COMMENT] # archive comment # "concat" is zero, unless zip was concatenated to another file concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_SIGNATURE] == stringEndArchive64: # If Zip64 extension structures are present, account for them concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print "given, inferred, offset", offset_cd, inferred, concat # self.start_dir: Position of start of central directory self.start_dir = offset_cd + concat fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = cStringIO.StringIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if len(centdir) != sizeCentralDir: raise BadZipfile("Truncated central directory") centdir = struct.unpack(structCentralDir, centdir) if centdir[_CD_SIGNATURE] != stringCentralDir: raise BadZipfile("Bad magic number for central directory") if self.debug > 2: print centdir filename = fp.read(centdir[_CD_FILENAME_LENGTH]) # Create ZipInfo instance to store file information x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Convert date/time code to (year, month, day, hour, min, sec) x._raw_time = t x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F, t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) x._decodeExtra() x.header_offset = x.header_offset + concat x.filename = x._decodeFilename() self.filelist.append(x) self.NameToInfo[x.filename] = x # update total bytes read from central directory total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH]) if self.debug > 2: print "total", total def namelist(self): """Return a list of file names in the archive.""" l = [] for data in self.filelist: l.append(data.filename) return l def infolist(self): """Return a list of class ZipInfo instances for files in the archive.""" return self.filelist def printdir(self): """Print a table of contents for the zip file.""" print "%-46s %19s %12s" % ("File Name", "Modified ", "Size") for zinfo in self.filelist: date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6] print "%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size) def testzip(self): """Read all the files and check the CRC.""" chunk_size = 2 ** 20 for zinfo in self.filelist: try: # Read by chunks, to avoid an OverflowError or a # MemoryError with very large embedded files. with self.open(zinfo.filename, "r") as f: while f.read(chunk_size): # Check CRC-32 pass except BadZipfile: return zinfo.filename def getinfo(self, name): """Return the instance of ZipInfo given ‘name‘.""" info = self.NameToInfo.get(name) if info is None: raise KeyError( ‘There is no item named %r in the archive‘ % name) return info def setpassword(self, pwd): """Set default password for encrypted files.""" self.pwd = pwd @property def comment(self): """The comment text associated with the ZIP file.""" return self._comment @comment.setter def comment(self, comment): # check for valid comment length if len(comment) > ZIP_MAX_COMMENT: import warnings warnings.warn(‘Archive comment is too long; truncating to %d bytes‘ % ZIP_MAX_COMMENT, stacklevel=2) comment = comment[:ZIP_MAX_COMMENT] self._comment = comment self._didModify = True def read(self, name, pwd=None): """Return file bytes (as a string) for name.""" return self.open(name, "r", pwd).read() def open(self, name, mode="r", pwd=None): """Return file-like object for ‘name‘.""" if mode not in ("r", "U", "rU"): raise RuntimeError, ‘open() requires mode "r", "U", or "rU"‘ if not self.fp: raise RuntimeError, "Attempt to read ZIP archive that was already closed" # Only open a new file for instances where we were not # given a file object in the constructor if self._filePassed: zef_file = self.fp should_close = False else: zef_file = open(self.filename, ‘rb‘) should_close = True try: # Make sure we have an info object if isinstance(name, ZipInfo): # ‘name‘ is already an info object zinfo = name else: # Get info object for name zinfo = self.getinfo(name) zef_file.seek(zinfo.header_offset, 0) # Skip the file header: fheader = zef_file.read(sizeFileHeader) if len(fheader) != sizeFileHeader: raise BadZipfile("Truncated file header") fheader = struct.unpack(structFileHeader, fheader) if fheader[_FH_SIGNATURE] != stringFileHeader: raise BadZipfile("Bad magic number for file header") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if fname != zinfo.orig_filename: raise BadZipfile, ‘File name in directory "%s" and header "%s" differ.‘ % ( zinfo.orig_filename, fname) # check for encrypted flag & handle password is_encrypted = zinfo.flag_bits & 0x1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError, "File %s is encrypted, " "password required for extraction" % name zd = _ZipDecrypter(pwd) # The first 12 bytes in the cypher stream is an encryption header # used to strengthen the algorithm. The first 11 bytes are # completely random, while the 12th contains the MSB of the CRC, # or the MSB of the file time depending on the header type # and is used to check the correctness of the password. bytes = zef_file.read(12) h = map(zd, bytes[0:12]) if zinfo.flag_bits & 0x8: # compare against the file type from extended local headers check_byte = (zinfo._raw_time >> 8) & 0xff else: # compare against the CRC otherwise check_byte = (zinfo.CRC >> 24) & 0xff if ord(h[11]) != check_byte: raise RuntimeError("Bad password for file", name) return ZipExtFile(zef_file, mode, zinfo, zd, close_fileobj=should_close) except: if should_close: zef_file.close() raise def extract(self, member, path=None, pwd=None): """Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. `member‘ may be a filename or a ZipInfo object. You can specify a different directory using `path‘. """ if not isinstance(member, ZipInfo): member = self.getinfo(member) if path is None: path = os.getcwd() return self._extract_member(member, path, pwd) def extractall(self, path=None, members=None, pwd=None): """Extract all members from the archive to the current working directory. `path‘ specifies a different directory to extract to. `members‘ is optional and must be a subset of the list returned by namelist(). """ if members is None: members = self.namelist() for zipinfo in members: self.extract(zipinfo, path, pwd) def _extract_member(self, member, targetpath, pwd): """Extract the ZipInfo object ‘member‘ to a physical file on the path targetpath. """ # build the destination pathname, replacing # forward slashes to platform specific separators. arcname = member.filename.replace(‘/‘, os.path.sep) if os.path.altsep: arcname = arcname.replace(os.path.altsep, os.path.sep) # interpret absolute pathname as relative, remove drive letter or # UNC path, redundant separators, "." and ".." components. arcname = os.path.splitdrive(arcname)[1] arcname = os.path.sep.join(x for x in arcname.split(os.path.sep) if x not in (‘‘, os.path.curdir, os.path.pardir)) if os.path.sep == ‘\\\\‘: # filter illegal characters on Windows illegal = ‘:<>|"?*‘ if isinstance(arcname, unicode): table = {ord(c): ord(‘_‘) for c in illegal} else: table = string.maketrans(illegal, ‘_‘ * len(illegal)) arcname = arcname.translate(table) # remove trailing dots arcname = (x.rstrip(‘.‘) for x in arcname.split(os.path.sep)) arcname = os.path.sep.join(x for x in arcname if x) targetpath = os.path.join(targetpath, arcname) targetpath = os.path.normpath(targetpath) # Create all upper directories if necessary. upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): os.makedirs(upperdirs) if member.filename[-1] == ‘/‘: if not os.path.isdir(targetpath): os.mkdir(targetpath) return targetpath with self.open(member, pwd=pwd) as source, file(targetpath, "wb") as target: shutil.copyfileobj(source, target) return targetpath def _writecheck(self, zinfo): """Check for errors before writing a file to the archive.""" if zinfo.filename in self.NameToInfo: import warnings warnings.warn(‘Duplicate name: %r‘ % zinfo.filename, stacklevel=3) if self.mode not in ("w", "a"): raise RuntimeError, ‘write() requires mode "w" or "a"‘ if not self.fp: raise RuntimeError, "Attempt to write ZIP archive that was already closed" if zinfo.compress_type == ZIP_DEFLATED and not zlib: raise RuntimeError, "Compression requires the (missing) zlib module" if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED): raise RuntimeError, "That compression method is not supported" if not self._allowZip64: requires_zip64 = None if len(self.filelist) >= ZIP_FILECOUNT_LIMIT: requires_zip64 = "Files count" elif zinfo.file_size > ZIP64_LIMIT: requires_zip64 = "Filesize" elif zinfo.header_offset > ZIP64_LIMIT: requires_zip64 = "Zipfile size" if requires_zip64: raise LargeZipFile(requires_zip64 + " would require ZIP64 extensions") def write(self, filename, arcname=None, compress_type=None): """Put the bytes from filename into the archive under the name arcname.""" if not self.fp: raise RuntimeError( "Attempt to write to ZIP archive that was already closed") st = os.stat(filename) isdir = stat.S_ISDIR(st.st_mode) mtime = time.localtime(st.st_mtime) date_time = mtime[0:6] # Create ZipInfo instance to store file information if arcname is None: arcname = filename arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) while arcname[0] in (os.sep, os.altsep): arcname = arcname[1:] if isdir: arcname += ‘/‘ zinfo = ZipInfo(arcname, date_time) zinfo.external_attr = (st[0] & 0xFFFF) << 16L # Unix attributes if compress_type is None: zinfo.compress_type = self.compression else: zinfo.compress_type = compress_type zinfo.file_size = st.st_size zinfo.flag_bits = 0x00 zinfo.header_offset = self.fp.tell() # Start of header bytes self._writecheck(zinfo) self._didModify = True if isdir: zinfo.file_size = 0 zinfo.compress_size = 0 zinfo.CRC = 0 zinfo.external_attr |= 0x10 # MS-DOS directory flag self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo self.fp.write(zinfo.FileHeader(False)) return with open(filename, "rb") as fp: # Must overwrite CRC and sizes with correct data later zinfo.CRC = CRC = 0 zinfo.compress_size = compress_size = 0 # Compressed size can be larger than uncompressed size zip64 = self._allowZip64 and zinfo.file_size * 1.05 > ZIP64_LIMIT self.fp.write(zinfo.FileHeader(zip64)) if zinfo.compress_type == ZIP_DEFLATED: cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) else: cmpr = None file_size = 0 while 1: buf = fp.read(1024 * 8) if not buf: break file_size = file_size + len(buf) CRC = crc32(buf, CRC) & 0xffffffff if cmpr: buf = cmpr.compress(buf) compress_size = compress_size + len(buf) self.fp.write(buf) if cmpr: buf = cmpr.flush() compress_size = compress_size + len(buf) self.fp.write(buf) zinfo.compress_size = compress_size else: zinfo.compress_size = file_size zinfo.CRC = CRC zinfo.file_size = file_size if not zip64 and self._allowZip64: if file_size > ZIP64_LIMIT: raise RuntimeError(‘File size has increased during compressing‘) if compress_size > ZIP64_LIMIT: raise RuntimeError(‘Compressed size larger than uncompressed size‘) # Seek backwards and write file header (which will now include # correct CRC and file sizes) position = self.fp.tell() # Preserve current position in file self.fp.seek(zinfo.header_offset, 0) self.fp.write(zinfo.FileHeader(zip64)) self.fp.seek(position, 0) self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo def writestr(self, zinfo_or_arcname, bytes, compress_type=None): """Write a file into the archive. The contents is the string ‘bytes‘. ‘zinfo_or_arcname‘ is either a ZipInfo instance or the name of the file in the archive.""" if not isinstance(zinfo_or_arcname, ZipInfo): zinfo = ZipInfo(filename=zinfo_or_arcname, date_time=time.localtime(time.time())[:6]) zinfo.compress_type = self.compression if zinfo.filename[-1] == ‘/‘: zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x zinfo.external_attr |= 0x10 # MS-DOS directory flag else: zinfo.external_attr = 0o600 << 16 # ?rw------- else: zinfo = zinfo_or_arcname if not self.fp: raise RuntimeError( "Attempt to write to ZIP archive that was already closed") if compress_type is not None: zinfo.compress_type = compress_type zinfo.file_size = len(bytes) # Uncompressed size zinfo.header_offset = self.fp.tell() # Start of header bytes self._writecheck(zinfo) self._didModify = True zinfo.CRC = crc32(bytes) & 0xffffffff # CRC-32 checksum if zinfo.compress_type == ZIP_DEFLATED: co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) bytes = co.compress(bytes) + co.flush() zinfo.compress_size = len(bytes) # Compressed size else: zinfo.compress_size = zinfo.file_size zip64 = zinfo.file_size > ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT if zip64 and not self._allowZip64: raise LargeZipFile("Filesize would require ZIP64 extensions") self.fp.write(zinfo.FileHeader(zip64)) self.fp.write(bytes) if zinfo.flag_bits & 0x08: # Write CRC and file sizes after the file data fmt = ‘<LQQ‘ if zip64 else ‘<LLL‘ self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size)) self.fp.flush() self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo def __del__(self): """Call the "close()" method in case the user forgot.""" self.close() def close(self): """Close the file, and for mode "w" and "a" write the ending records.""" if self.fp is None: return try: if self.mode in ("w", "a") and self._didModify: # write ending records pos1 = self.fp.tell() for zinfo in self.filelist: # write central directory dt = zinfo.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) extra = [] if zinfo.file_size > ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT: extra.append(zinfo.file_size) extra.append(zinfo.compress_size) file_size = 0xffffffff compress_size = 0xffffffff else: file_size = zinfo.file_size compress_size = zinfo.compress_size if zinfo.header_offset > ZIP64_LIMIT: extra.append(zinfo.header_offset) header_offset = 0xffffffffL else: header_offset = zinfo.header_offset extra_data = zinfo.extra if extra: # Append a ZIP64 field to the extra‘s extra_data = struct.pack( ‘<HH‘ + ‘Q‘*len(extra), 1, 8*len(extra), *extra) + extra_data extract_version = max(45, zinfo.extract_version) create_version = max(45, zinfo.create_version) else: extract_version = zinfo.extract_version create_version = zinfo.create_version try: filename, flag_bits = zinfo._encodeFilenameFlags() centdir = struct.pack(structCentralDir, stringCentralDir, create_version, zinfo.create_system, extract_version, zinfo.reserved, flag_bits, zinfo.compress_type, dostime, dosdate, zinfo.CRC, compress_size, file_size, len(filename), len(extra_data), len(zinfo.comment), 0, zinfo.internal_attr, zinfo.external_attr, header_offset) except DeprecationWarning: print >>sys.stderr, (structCentralDir, stringCentralDir, create_version, zinfo.create_system, extract_version, zinfo.reserved, zinfo.flag_bits, zinfo.compress_type, dostime, dosdate, zinfo.CRC, compress_size, file_size, len(zinfo.filename), len(extra_data), len(zinfo.comment), 0, zinfo.internal_attr, zinfo.external_attr, header_offset) raise self.fp.write(centdir) self.fp.write(filename) self.fp.write(extra_data) self.fp.write(zinfo.comment) pos2 = self.fp.tell() # Write end-of-zip-archive record centDirCount = len(self.filelist) centDirSize = pos2 - pos1 centDirOffset = pos1 requires_zip64 = None if centDirCount > ZIP_FILECOUNT_LIMIT: requires_zip64 = "Files count" elif centDirOffset > ZIP64_LIMIT: requires_zip64 = "Central directory offset" elif centDirSize > ZIP64_LIMIT: requires_zip64 = "Central directory size" if requires_zip64: # Need to write the ZIP64 end-of-archive records if not self._allowZip64: raise LargeZipFile(requires_zip64 + " would require ZIP64 extensions") zip64endrec = struct.pack( structEndArchive64, stringEndArchive64, 44, 45, 45, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset) self.fp.write(zip64endrec) zip64locrec = struct.pack( structEndArchive64Locator, stringEndArchive64Locator, 0, pos2, 1) self.fp.write(zip64locrec) centDirCount = min(centDirCount, 0xFFFF) centDirSize = min(centDirSize, 0xFFFFFFFF) centDirOffset = min(centDirOffset, 0xFFFFFFFF) endrec = struct.pack(structEndArchive, stringEndArchive, 0, 0, centDirCount, centDirCount, centDirSize, centDirOffset, len(self._comment)) self.fp.write(endrec) self.fp.write(self._comment) self.fp.flush() finally: fp = self.fp self.fp = None if not self._filePassed: fp.close() ZipFile
class TarFile(object): """The TarFile Class provides an interface to tar archives. """ debug = 0 # May be set from 0 (no msgs) to 3 (all msgs) dereference = False # If true, add content of linked file to the # tar file, else the link. ignore_zeros = False # If true, skips empty or invalid blocks and # continues processing. errorlevel = 1 # If 0, fatal errors only appear in debug # messages (if debug >= 0). If > 0, errors # are passed to the caller as exceptions. format = DEFAULT_FORMAT # The format to use when creating an archive. encoding = ENCODING # Encoding for 8-bit character strings. errors = None # Error handler for unicode conversion. tarinfo = TarInfo # The default TarInfo class to use. fileobject = ExFileObject # The default ExFileObject class to use. def __init__(self, name=None, mode="r", fileobj=None, format=None, tarinfo=None, dereference=None, ignore_zeros=None, encoding=None, errors=None, pax_headers=None, debug=None, errorlevel=None): """Open an (uncompressed) tar archive `name‘. `mode‘ is either ‘r‘ to read from an existing archive, ‘a‘ to append data to an existing file or ‘w‘ to create a new file overwriting an existing one. `mode‘ defaults to ‘r‘. If `fileobj‘ is given, it is used for reading or writing data. If it can be determined, `mode‘ is overridden by `fileobj‘s mode. `fileobj‘ is not closed, when TarFile is closed. """ modes = {"r": "rb", "a": "r+b", "w": "wb"} if mode not in modes: raise ValueError("mode must be ‘r‘, ‘a‘ or ‘w‘") self.mode = mode self._mode = modes[mode] if not fileobj: if self.mode == "a" and not os.path.exists(name): # Create nonexistent files in append mode. self.mode = "w" self._mode = "wb" fileobj = bltn_open(name, self._mode) self._extfileobj = False else: if name is None and hasattr(fileobj, "name"): name = fileobj.name if hasattr(fileobj, "mode"): self._mode = fileobj.mode self._extfileobj = True self.name = os.path.abspath(name) if name else None self.fileobj = fileobj # Init attributes. if format is not None: self.format = format if tarinfo is not None: self.tarinfo = tarinfo if dereference is not None: self.dereference = dereference if ignore_zeros is not None: self.ignore_zeros = ignore_zeros if encoding is not None: self.encoding = encoding if errors is not None: self.errors = errors elif mode == "r": self.errors = "utf-8" else: self.errors = "strict" if pax_headers is not None and self.format == PAX_FORMAT: self.pax_headers = pax_headers else: self.pax_headers = {} if debug is not None: self.debug = debug if errorlevel is not None: self.errorlevel = errorlevel # Init datastructures. self.closed = False self.members = [] # list of members as TarInfo objects self._loaded = False # flag if all members have been read self.offset = self.fileobj.tell() # current position in the archive file self.inodes = {} # dictionary caching the inodes of # archive members already added try: if self.mode == "r": self.firstmember = None self.firstmember = self.next() if self.mode == "a": # Move to the end of the archive, # before the first empty block. while True: self.fileobj.seek(self.offset) try: tarinfo = self.tarinfo.fromtarfile(self) self.members.append(tarinfo) except EOFHeaderError: self.fileobj.seek(self.offset) break except HeaderError, e: raise ReadError(str(e)) if self.mode in "aw": self._loaded = True if self.pax_headers: buf = self.tarinfo.create_pax_global_header(self.pax_headers.copy()) self.fileobj.write(buf) self.offset += len(buf) except: if not self._extfileobj: self.fileobj.close() self.closed = True raise def _getposix(self): return self.format == USTAR_FORMAT def _setposix(self, value): import warnings warnings.warn("use the format attribute instead", DeprecationWarning, 2) if value: self.format = USTAR_FORMAT else: self.format = GNU_FORMAT posix = property(_getposix, _setposix) #-------------------------------------------------------------------------- # Below are the classmethods which act as alternate constructors to the # TarFile class. The open() method is the only one that is needed for # public use; it is the "super"-constructor and is able to select an # adequate "sub"-constructor for a particular compression using the mapping # from OPEN_METH. # # This concept allows one to subclass TarFile without losing the comfort of # the super-constructor. A sub-constructor is registered and made available # by adding it to the mapping in OPEN_METH. @classmethod def open(cls, name=None, mode="r", fileobj=None, bufsize=RECORDSIZE, **kwargs): """Open a tar archive for reading, writing or appending. Return an appropriate TarFile class. mode: ‘r‘ or ‘r:*‘ open for reading with transparent compression ‘r:‘ open for reading exclusively uncompressed ‘r:gz‘ open for reading with gzip compression ‘r:bz2‘ open for reading with bzip2 compression ‘a‘ or ‘a:‘ open for appending, creating the file if necessary ‘w‘ or ‘w:‘ open for writing without compression ‘w:gz‘ open for writing with gzip compression ‘w:bz2‘ open for writing with bzip2 compression ‘r|*‘ open a stream of tar blocks with transparent compression ‘r|‘ open an uncompressed stream of tar blocks for reading ‘r|gz‘ open a gzip compressed stream of tar blocks ‘r|bz2‘ open a bzip2 compressed stream of tar blocks ‘w|‘ open an uncompressed stream for writing ‘w|gz‘ open a gzip compressed stream for writing ‘w|bz2‘ open a bzip2 compressed stream for writing """ if not name and not fileobj: raise ValueError("nothing to open") if mode in ("r", "r:*"): # Find out which *open() is appropriate for opening the file. for comptype in cls.OPEN_METH: func = getattr(cls, cls.OPEN_METH[comptype]) if fileobj is not None: saved_pos = fileobj.tell() try: return func(name, "r", fileobj, **kwargs) except (ReadError, CompressionError), e: if fileobj is not None: fileobj.seek(saved_pos) continue raise ReadError("file could not be opened successfully") elif ":" in mode: filemode, comptype = mode.split(":", 1) filemode = filemode or "r" comptype = comptype or "tar" # Select the *open() function according to # given compression. if comptype in cls.OPEN_METH: func = getattr(cls, cls.OPEN_METH[comptype]) else: raise CompressionError("unknown compression type %r" % comptype) return func(name, filemode, fileobj, **kwargs) elif "|" in mode: filemode, comptype = mode.split("|", 1) filemode = filemode or "r" comptype = comptype or "tar" if filemode not in ("r", "w"): raise ValueError("mode must be ‘r‘ or ‘w‘") stream = _Stream(name, filemode, comptype, fileobj, bufsize) try: t = cls(name, filemode, stream, **kwargs) except: stream.close() raise t._extfileobj = False return t elif mode in ("a", "w"): return cls.taropen(name, mode, fileobj, **kwargs) raise ValueError("undiscernible mode") @classmethod def taropen(cls, name, mode="r", fileobj=None, **kwargs): """Open uncompressed tar archive name for reading or writing. """ if mode not in ("r", "a", "w"): raise ValueError("mode must be ‘r‘, ‘a‘ or ‘w‘") return cls(name, mode, fileobj, **kwargs) @classmethod def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open gzip compressed tar archive name for reading or writing. Appending is not allowed. """ if mode not in ("r", "w"): raise ValueError("mode must be ‘r‘ or ‘w‘") try: import gzip gzip.GzipFile except (ImportError, AttributeError): raise CompressionError("gzip module is not available") try: fileobj = gzip.GzipFile(name, mode, compresslevel, fileobj) except OSError: if fileobj is not None and mode == ‘r‘: raise ReadError("not a gzip file") raise try: t = cls.taropen(name, mode, fileobj, **kwargs) except IOError: fileobj.close() if mode == ‘r‘: raise ReadError("not a gzip file") raise except: fileobj.close() raise t._extfileobj = False return t @classmethod def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open bzip2 compressed tar archive name for reading or writing. Appending is not allowed. """ if mode not in ("r", "w"): raise ValueError("mode must be ‘r‘ or ‘w‘.") try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") if fileobj is not None: fileobj = _BZ2Proxy(fileobj, mode) else: fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (IOError, EOFError): fileobj.close() if mode == ‘r‘: raise ReadError("not a bzip2 file") raise except: fileobj.close() raise t._extfileobj = False return t # All *open() methods are registered here. OPEN_METH = { "tar": "taropen", # uncompressed tar "gz": "gzopen", # gzip compressed tar "bz2": "bz2open" # bzip2 compressed tar } #-------------------------------------------------------------------------- # The public methods which TarFile provides: def close(self): """Close the TarFile. In write-mode, two finishing zero blocks are appended to the archive. """ if self.closed: return if self.mode in "aw": self.fileobj.write(NUL * (BLOCKSIZE * 2)) self.offset += (BLOCKSIZE * 2) # fill up the end with zero-blocks # (like option -b20 for tar does) blocks, remainder = divmod(self.offset, RECORDSIZE) if remainder > 0: self.fileobj.write(NUL * (RECORDSIZE - remainder)) if not self._extfileobj: self.fileobj.close() self.closed = True def getmember(self, name): """Return a TarInfo object for member `name‘. If `name‘ can not be found in the archive, KeyError is raised. If a member occurs more than once in the archive, its last occurrence is assumed to be the most up-to-date version. """ tarinfo = self._getmember(name) if tarinfo is None: raise KeyError("filename %r not found" % name) return tarinfo def getmembers(self): """Return the members of the archive as a list of TarInfo objects. The list has the same order as the members in the archive. """ self._check() if not self._loaded: # if we want to obtain a list of self._load() # all members, we first have to # scan the whole archive. return self.members def getnames(self): """Return the members of the archive as a list of their names. It has the same order as the list returned by getmembers(). """ return [tarinfo.name for tarinfo in self.getmembers()] def gettarinfo(self, name=None, arcname=None, fileobj=None): """Create a TarInfo object for either the file `name‘ or the file object `fileobj‘ (using os.fstat on its file descriptor). You can modify some of the TarInfo‘s attributes before you add it using addfile(). If given, `arcname‘ specifies an alternative name for the file in the archive. """ self._check("aw") # When fileobj is given, replace name by # fileobj‘s real name. if fileobj is not None: name = fileobj.name # Building the name of the member in the archive. # Backward slashes are converted to forward slashes, # Absolute paths are turned to relative paths. if arcname is None: arcname = name drv, arcname = os.path.splitdrive(arcname) arcname = arcname.replace(os.sep, "/") arcname = arcname.lstrip("/") # Now, fill the TarInfo object with # information specific for the file. tarinfo = self.tarinfo() tarinfo.tarfile = self # Use os.stat or os.lstat, depending on platform # and if symlinks shall be resolved. if fileobj is None: if hasattr(os, "lstat") and not self.dereference: statres = os.lstat(name) else: statres = os.stat(name) else: statres = os.fstat(fileobj.fileno()) linkname = "" stmd = statres.st_mode if stat.S_ISREG(stmd): inode = (statres.st_ino, statres.st_dev) if not self.dereference and statres.st_nlink > 1 and inode in self.inodes and arcname != self.inodes[inode]: # Is it a hardlink to an already # archived file? type = LNKTYPE linkname = self.inodes[inode] else: # The inode is added only if its valid. # For win32 it is always 0. type = REGTYPE if inode[0]: self.inodes[inode] = arcname elif stat.S_ISDIR(stmd): type = DIRTYPE elif stat.S_ISFIFO(stmd): type = FIFOTYPE elif stat.S_ISLNK(stmd): type = SYMTYPE linkname = os.readlink(name) elif stat.S_ISCHR(stmd): type = CHRTYPE elif stat.S_ISBLK(stmd): type = BLKTYPE else: return None # Fill the TarInfo object with all # information we can get. tarinfo.name = arcname tarinfo.mode = stmd tarinfo.uid = statres.st_uid tarinfo.gid = statres.st_gid if type == REGTYPE: tarinfo.size = statres.st_size else: tarinfo.size = 0L tarinfo.mtime = statres.st_mtime tarinfo.type = type tarinfo.linkname = linkname if pwd: try: tarinfo.uname = pwd.getpwuid(tarinfo.uid)[0] except KeyError: pass if grp: try: tarinfo.gname = grp.getgrgid(tarinfo.gid)[0] except KeyError: pass if type in (CHRTYPE, BLKTYPE): if hasattr(os, "major") and hasattr(os, "minor"): tarinfo.devmajor = os.major(statres.st_rdev) tarinfo.devminor = os.minor(statres.st_rdev) return tarinfo def list(self, verbose=True): """Print a table of contents to sys.stdout. If `verbose‘ is False, only the names of the members are printed. If it is True, an `ls -l‘-like output is produced. """ self._check() for tarinfo in self: if verbose: print filemode(tarinfo.mode), print "%s/%s" % (tarinfo.uname or tarinfo.uid, tarinfo.gname or tarinfo.gid), if tarinfo.ischr() or tarinfo.isblk(): print "%10s" % ("%d,%d" % (tarinfo.devmajor, tarinfo.devminor)), else: print "%10d" % tarinfo.size, print "%d-%02d-%02d %02d:%02d:%02d" % time.localtime(tarinfo.mtime)[:6], print tarinfo.name + ("/" if tarinfo.isdir() else ""), if verbose: if tarinfo.issym(): print "->", tarinfo.linkname, if tarinfo.islnk(): print "link to", tarinfo.linkname, print def add(self, name, arcname=None, recursive=True, exclude=None, filter=None): """Add the file `name‘ to the archive. `name‘ may be any type of file (directory, fifo, symbolic link, etc.). If given, `arcname‘ specifies an alternative name for the file in the archive. Directories are added recursively by default. This can be avoided by setting `recursive‘ to False. `exclude‘ is a function that should return True for each filename to be excluded. `filter‘ is a function that expects a TarInfo object argument and returns the changed TarInfo object, if it returns None the TarInfo object will be excluded from the archive. """ self._check("aw") if arcname is None: arcname = name # Exclude pathnames. if exclude is not None: import warnings warnings.warn("use the filter argument instead", DeprecationWarning, 2) if exclude(name): self._dbg(2, "tarfile: Excluded %r" % name) return # Skip if somebody tries to archive the archive... if self.name is not None and os.path.abspath(name) == self.name: self._dbg(2, "tarfile: Skipped %r" % name) return self._dbg(1, name) # Create a TarInfo object from the file. tarinfo = self.gettarinfo(name, arcname) if tarinfo is None: self._dbg(1, "tarfile: Unsupported type %r" % name) return # Change or exclude the TarInfo object. if filter is not None: tarinfo = filter(tarinfo) if tarinfo is None: self._dbg(2, "tarfile: Excluded %r" % name) return # Append the tar header and data to the archive. if tarinfo.isreg(): with bltn_open(name, "rb") as f: self.addfile(tarinfo, f) elif tarinfo.isdir(): self.addfile(tarinfo) if recursive: for f in os.listdir(name): self.add(os.path.join(name, f), os.path.join(arcname, f), recursive, exclude, filter) else: self.addfile(tarinfo) def addfile(self, tarinfo, fileobj=None): """Add the TarInfo object `tarinfo‘ to the archive. If `fileobj‘ is given, tarinfo.size bytes are read from it and added to the archive. You can create TarInfo objects using gettarinfo(). On Windows platforms, `fileobj‘ should always be opened with mode ‘rb‘ to avoid irritation about the file size. """ self._check("aw") tarinfo = copy.copy(tarinfo) buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) # If there‘s data to follow, append it. if fileobj is not None: copyfileobj(fileobj, self.fileobj, tarinfo.size) blocks, remainder = divmod(tarinfo.size, BLOCKSIZE) if remainder > 0: self.fileobj.write(NUL * (BLOCKSIZE - remainder)) blocks += 1 self.offset += blocks * BLOCKSIZE self.members.append(tarinfo) def extractall(self, path=".", members=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path‘ specifies a different directory to extract to. `members‘ is optional and must be a subset of the list returned by getmembers(). """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0700 self.extract(tarinfo, path) # Reverse sort directories. directories.sort(key=operator.attrgetter(‘name‘)) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError, e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e) def extract(self, member, path=""): """Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. `member‘ may be a filename or a TarInfo object. You can specify a different directory using `path‘. """ self._check("r") if isinstance(member, basestring): tarinfo = self.getmember(member) else: tarinfo = member # Prepare the link target for makelink(). if tarinfo.islnk(): tarinfo._link_target = os.path.join(path, tarinfo.linkname) try: self._extract_member(tarinfo, os.path.join(path, tarinfo.name)) except EnvironmentError, e: if self.errorlevel > 0: raise else: if e.filename is None: self._dbg(1, "tarfile: %s" % e.strerror) else: self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename)) except ExtractError, e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e) def extractfile(self, member): """Extract a member from the archive as a file object. `member‘ may be a filename or a TarInfo object. If `member‘ is a regular file, a file-like object is returned. If `member‘ is a link, a file-like object is constructed from the link‘s target. If `member‘ is none of the above, None is returned. The file-like object is read-only and provides the following methods: read(), readline(), readlines(), seek() and tell() """ self._check("r") if isinstance(member, basestring): tarinfo = self.getmember(member) else: tarinfo = member if tarinfo.isreg(): return self.fileobject(self, tarinfo) elif tarinfo.type not in SUPPORTED_TYPES: # If a member‘s type is unknown, it is treated as a # regular file. return self.fileobject(self, tarinfo) elif tarinfo.islnk() or tarinfo.issym(): if isinstance(self.fileobj, _Stream): # A small but ugly workaround for the case that someone tries # to extract a (sym)link as a file-object from a non-seekable # stream of tar blocks. raise StreamError("cannot extract (sym)link as file object") else: # A (sym)link‘s file object is its target‘s file object. return self.extractfile(self._find_link_target(tarinfo)) else: # If there‘s no data associated with the member (directory, chrdev, # blkdev, etc.), return None instead of a file object. return None def _extract_member(self, tarinfo, targetpath): """Extract the TarInfo object tarinfo to a physical file called targetpath. """ # Fetch the TarInfo object for the given name # and build the destination pathname, replacing # forward slashes to platform specific separators. targetpath = targetpath.rstrip("/") targetpath = targetpath.replace("/", os.sep) # Create all upper directories. upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): # Create directories that are not part of the archive with # default permissions. os.makedirs(upperdirs) if tarinfo.islnk() or tarinfo.issym(): self._dbg(1, "%s -> %s" % (tarinfo.name, tarinfo.linkname)) else: self._dbg(1, tarinfo.name) if tarinfo.isreg(): self.makefile(tarinfo, targetpath) elif tarinfo.isdir(): self.makedir(tarinfo, targetpath) elif tarinfo.isfifo(): self.makefifo(tarinfo, targetpath) elif tarinfo.ischr() or tarinfo.isblk(): self.makedev(tarinfo, targetpath) elif tarinfo.islnk() or tarinfo.issym(): self.makelink(tarinfo, targetpath) elif tarinfo.type not in SUPPORTED_TYPES: self.makeunknown(tarinfo, targetpath) else: self.makefile(tarinfo, targetpath) self.chown(tarinfo, targetpath) if not tarinfo.issym(): self.chmod(tarinfo, targetpath) self.utime(tarinfo, targetpath) #-------------------------------------------------------------------------- # Below are the different file methods. They are called via # _extract_member() when extract() is called. They can be replaced in a # subclass to implement other functionality. def makedir(self, tarinfo, targetpath): """Make a directory called targetpath. """ try: # Use a safe mode for the directory, the real mode is set # later in _extract_member(). os.mkdir(targetpath, 0700) except EnvironmentError, e: if e.errno != errno.EEXIST: raise def makefile(self, tarinfo, targetpath): """Make a file called targetpath. """ source = self.extractfile(tarinfo) try: with bltn_open(targetpath, "wb") as target: copyfileobj(source, target) finally: source.close() def makeunknown(self, tarinfo, targetpath): """Make a file from a TarInfo object with an unknown type at targetpath. """ self.makefile(tarinfo, targetpath) self._dbg(1, "tarfile: Unknown file type %r, " "extracted as regular file." % tarinfo.type) def makefifo(self, tarinfo, targetpath): """Make a fifo called targetpath. """ if hasattr(os, "mkfifo"): os.mkfifo(targetpath) else: raise ExtractError("fifo not supported by system") def makedev(self, tarinfo, targetpath): """Make a character or block device called targetpath. """ if not hasattr(os, "mknod") or not hasattr(os, "makedev"): raise ExtractError("special devices not supported by system") mode = tarinfo.mode if tarinfo.isblk(): mode |= stat.S_IFBLK else: mode |= stat.S_IFCHR os.mknod(targetpath, mode, os.makedev(tarinfo.devmajor, tarinfo.devminor)) def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ if hasattr(os, "symlink") and hasattr(os, "link"): # For systems that support symbolic and hard links. if tarinfo.issym(): if os.path.lexists(targetpath): os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): if os.path.lexists(targetpath): os.unlink(targetpath) os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) else: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive") def chown(self, tarinfo, targetpath): """Set owner of targetpath according to tarinfo. """ if pwd and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: g = tarinfo.gid try: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: u = tarinfo.uid try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: if sys.platform != "os2emx": os.chown(targetpath, u, g) except EnvironmentError, e: raise ExtractError("could not change owner") def chmod(self, tarinfo, targetpath): """Set file permissions of targetpath according to tarinfo. """ if hasattr(os, ‘chmod‘): try: os.chmod(targetpath, tarinfo.mode) except EnvironmentError, e: raise ExtractError("could not change mode") def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ if not hasattr(os, ‘utime‘): return try: os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) except EnvironmentError, e: raise ExtractError("could not change modification time") #-------------------------------------------------------------------------- def next(self): """Return the next member of the archive as a TarInfo object, when TarFile is opened for reading. Return None if there is no more available. """ self._check("ra") if self.firstmember is not None: m = self.firstmember self.firstmember = None return m # Read the next block. self.fileobj.seek(self.offset) tarinfo = None while True: try: tarinfo = self.tarinfo.fromtarfile(self) except EOFHeaderError, e: if self.ignore_zeros: self._dbg(2, "0x%X: %s" % (self.offset, e)) self.offset += BLOCKSIZE continue except InvalidHeaderError, e: if self.ignore_zeros: self._dbg(2, "0x%X: %s" % (self.offset, e)) self.offset += BLOCKSIZE continue elif self.offset == 0: raise ReadError(str(e)) except EmptyHeaderError: if self.offset == 0: raise ReadError("empty file") except TruncatedHeaderError, e: if self.offset == 0: raise ReadError(str(e)) except SubsequentHeaderError, e: raise ReadError(str(e)) break if tarinfo is not None: self.members.append(tarinfo) else: self._loaded = True return tarinfo #-------------------------------------------------------------------------- # Little helper methods: def _getmember(self, name, tarinfo=None, normalize=False): """Find an archive member by name from bottom to top. If tarinfo is given, it is used as the starting point. """ # Ensure that all members have been loaded. members = self.getmembers() # Limit the member search list up to tarinfo. if tarinfo is not None: members = members[:members.index(tarinfo)] if normalize: name = os.path.normpath(name) for member in reversed(members): if normalize: member_name = os.path.normpath(member.name) else: member_name = member.name if name == member_name: return member def _load(self): """Read through the entire archive file and look for readable members. """ while True: tarinfo = self.next() if tarinfo is None: break self._loaded = True def _check(self, mode=None): """Check if TarFile is still open, and if the operation‘s mode corresponds to TarFile‘s mode. """ if self.closed: raise IOError("%s is closed" % self.__class__.__name__) if mode is not None and self.mode not in mode: raise IOError("bad operation for mode %r" % self.mode) def _find_link_target(self, tarinfo): """Find the target member of a symlink or hardlink member in the archive. """ if tarinfo.issym(): # Always search the entire archive. linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname))) limit = None else: # Search the archive before the link, because a hard link is # just a reference to an already archived file. linkname = tarinfo.linkname limit = tarinfo member = self._getmember(linkname, tarinfo=limit, normalize=True) if member is None: raise KeyError("linkname %r not found" % linkname) return member def __iter__(self): """Provide an iterator object. """ if self._loaded: return iter(self.members) else: return TarIter(self) def _dbg(self, level, msg): """Write debugging output to sys.stderr. """ if level <= self.debug: print >> sys.stderr, msg def __enter__(self): self._check() return self def __exit__(self, type, value, traceback): if type is None: self.close() else: # An exception occurred. We must not call close() because # it would try to write end-of-archive blocks and padding. if not self._extfileobj: self.fileobj.close() self.closed = True # class TarFile TarFile