You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

116 lines
3.0 KiB

  1. import os
  2. from libarchive import is_archive, Entry, SeekableArchive
  3. from tarfile import DEFAULT_FORMAT, USTAR_FORMAT, GNU_FORMAT, PAX_FORMAT, ENCODING
  4. from tarfile import REGTYPE, SYMTYPE, DIRTYPE, FIFOTYPE, CHRTYPE, BLKTYPE
  5. FORMAT_CONVERSION = {
  6. USTAR_FORMAT: 'tar',
  7. GNU_FORMAT: 'gnu',
  8. PAX_FORMAT: 'pax',
  9. }
  10. def is_tarfile(filename):
  11. return is_archive(filename, formats=('tar', 'gnu', 'pax'))
  12. def open(**kwargs):
  13. return TarFile(**kwargs)
  14. class TarInfo(Entry):
  15. def __init__(self, name):
  16. super(TarInfo, self).__init__(pathname=name)
  17. fromtarfile = Entry.from_archive
  18. def get_name(self):
  19. return self.pathname
  20. def set_name(self, value):
  21. self.pathname = value
  22. name = property(get_name, set_name)
  23. @property
  24. def get_type(self):
  25. for attr, type in (
  26. ('isdir', DIRTYPE),
  27. ('isfile', REGTYPE),
  28. ('issym', SYMTYPE),
  29. ('isfifo', FIFOTYPE),
  30. ('ischr', CHRTYPE),
  31. ('isblk', BLKTYPE),
  32. ):
  33. if getattr(self, attr)():
  34. return type
  35. def _get_missing(self):
  36. raise NotImplemented()
  37. def _set_missing(self, value):
  38. raise NotImplemented()
  39. pax_headers = property(_get_missing, _set_missing)
  40. class TarFile(SeekableArchive):
  41. getmember = SeekableArchive.getentry
  42. list = SeekableArchive.printlist
  43. extract = SeekableArchive.readpath
  44. extractfile = SeekableArchive.readstream
  45. def __init__(self, name=None, mode='r', fileobj=None, format=DEFAULT_FORMAT, tarinfo=TarInfo, encoding=ENCODING):
  46. if name:
  47. f = name
  48. elif fileobj:
  49. f = fileobj
  50. try:
  51. format = FORMAT_CONVERSION.get(format)
  52. except KeyError:
  53. raise Exception('Invalid tar format: %s' % format)
  54. super(TarFile, self).__init__(f, mode=mode, format=format, entry_class=tarinfo, encoding=encoding)
  55. def getmembers(self):
  56. return list(self)
  57. def getnames(self):
  58. return list(self.iterpaths)
  59. def __next__(self):
  60. raise NotImplementedError
  61. pass # TODO: how to do this?
  62. def extract(self, member, path=None):
  63. if path is None:
  64. path = os.getcwd()
  65. if isinstance(member, str):
  66. f = os.path.join(path, member)
  67. else:
  68. f = os.path.join(path, member.pathname)
  69. return self.readpath(member, f)
  70. def add(self, name, arcname, recursive=True, exclude=None, filter=None):
  71. pass # TODO: implement this.
  72. def addfile(self, tarinfo, fileobj):
  73. return self.writepath(fileobj, tarinfo)
  74. def gettarinfo(self, name=None, arcname=None, fileobj=None):
  75. if name:
  76. f = name
  77. elif fileobj:
  78. f = fileobj
  79. entry = self.entry_class.from_file(f)
  80. if arcname:
  81. entry.pathname = arcname
  82. return entry
  83. def _get_missing(self):
  84. raise NotImplemented()
  85. def _set_missing(self, value):
  86. raise NotImplemented()
  87. pax_headers = property(_get_missing, _set_missing)