import os from libarchive import is_archive, Entry, SeekableArchive from tarfile import DEFAULT_FORMAT, USTAR_FORMAT, GNU_FORMAT, PAX_FORMAT, \ ENCODING from tarfile import REGTYPE, SYMTYPE, DIRTYPE, FIFOTYPE, CHRTYPE, BLKTYPE FORMAT_CONVERSION = { USTAR_FORMAT: 'tar', GNU_FORMAT: 'gnu', PAX_FORMAT: 'pax', } def is_tarfile(filename): return is_archive(filename, formats=('tar', 'gnu', 'pax')) def open(**kwargs): return TarFile(**kwargs) class TarInfo(Entry): def __init__(self, name): super(TarInfo, self).__init__(pathname=name) fromtarfile = Entry.from_archive def get_name(self): return self.pathname def set_name(self, value): self.pathname = value name = property(get_name, set_name) @property def get_type(self): for attr, type in ( ('isdir', DIRTYPE), ('isfile', REGTYPE), ('issym', SYMTYPE), ('isfifo', FIFOTYPE), ('ischr', CHRTYPE), ('isblk', BLKTYPE), ): if getattr(self, attr)(): return type def _get_missing(self): raise NotImplemented() def _set_missing(self, value): raise NotImplemented() pax_headers = property(_get_missing, _set_missing) class TarFile(SeekableArchive): getmember = SeekableArchive.getentry list = SeekableArchive.printlist extract = SeekableArchive.readpath extractfile = SeekableArchive.readstream def __init__(self, name=None, mode='r', fileobj=None, format=DEFAULT_FORMAT, tarinfo=TarInfo, encoding=ENCODING): if name: f = name elif fileobj: f = fileobj try: format = FORMAT_CONVERSION.get(format) except KeyError: raise Exception('Invalid tar format: %s' % format) super(TarFile, self).__init__(f, mode=mode, format=format, entry_class=tarinfo, encoding=encoding) def getmembers(self): return list(self) def getnames(self): return list(self.iterpaths) def next(self): raise NotImplementedError pass # TODO: how to do this? def extract(self, member, path=None): if path is None: path = os.getcwd() if isinstance(member, basestring): f = os.path.join(path, member) else: f = os.path.join(path, member.pathname) return self.readpath(member, f) def add(self, name, arcname, recursive=True, exclude=None, filter=None): pass # TODO: implement this. def addfile(self, tarinfo, fileobj): return self.writepath(fileobj, tarinfo) def gettarinfo(self, name=None, arcname=None, fileobj=None): if name: f = name elif fileobj: f = fileobj entry = self.entry_class.from_file(f) if arcname: entry.pathname = arcname return entry def _get_missing(self): raise NotImplemented() def _set_missing(self, value): raise NotImplemented() pax_headers = property(_get_missing, _set_missing)