Browse Source

Merge branch 'test_fixup'

master
John-Mark Gurney 2 years ago
parent
commit
e5aab54cfd
2 changed files with 146 additions and 69 deletions
  1. +9
    -9
      libarchive/__init__.py
  2. +137
    -60
      tests.py

+ 9
- 9
libarchive/__init__.py View File

@@ -24,6 +24,7 @@
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import pathlib
import stat
import sys
import time
@@ -438,21 +439,20 @@ class Archive(object):
self.encoding = encoding
self.blocksize = blocksize
self.password = password
try:
if isinstance(f, pathlib.PurePath):
f = str(f)
except:
pass
if isinstance(f, str):
self.filename = f
f = open(f, mode)
# Only close it if we opened it...
self._defer_close = True
self._doclose = True
elif hasattr(f, 'fileno'):
self.filename = getattr(f, 'name', None)
# Leave the fd alone, caller should manage it...
self._defer_close = False
self._doclose = False
else:
raise Exception('Provided file is not path or open file.')
self._defer_close = False
self.f = f
self.mode = mode
# Guess the format/filter from file name (if not provided)
@@ -547,7 +547,9 @@ class Archive(object):
if _defer:
# This call came from our open stream.
self._stream = None
return
if not self._defer_close:
# We are not yet ready to close.
return
if self._stream is not None:
# We have a stream open! don't close, but remember we were asked to.
self._defer_close = True
@@ -565,7 +567,7 @@ class Archive(object):
if hasattr(self.f, "fileno"):
os.fsync(self.f.fileno())
# and then close it, if we opened it...
if getattr(self.f, 'close', None):
if self._doclose and getattr(self.f, 'close', None):
self.f.close()

@property
@@ -667,8 +669,6 @@ class SeekableArchive(Archive):
self._stream = None
# Convert file to open file. We need this to reopen the archive.
mode = kwargs.setdefault('mode', 'r')
if isinstance(f, str):
f = open(f, mode)
super(SeekableArchive, self).__init__(f, **kwargs)
self.entries = []
self.eof = False


+ 137
- 60
tests.py View File

@@ -27,16 +27,15 @@
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os, unittest, tempfile, random, string, sys
import zipfile
import hashlib
import io
import pathlib
import shutil
import zipfile

from libarchive import Archive, is_archive_name, is_archive
from libarchive.zip import is_zipfile, ZipFile, ZipEntry

TMPDIR = tempfile.mkdtemp(suffix='.python-libarchive')
ZIPFILE = 'test.zip'
ZIPPATH = os.path.join(TMPDIR, ZIPFILE)

FILENAMES = [
'test1.txt',
'foo',
@@ -44,19 +43,31 @@ FILENAMES = [
#'álért.txt',
]

class MakeTempMixIn:
def setUp(self):
self.TMPDIR = tempfile.mkdtemp(suffix='.python-libarchive')
self.ZIPFILE = 'test.zip'
self.ZIPPATH = os.path.join(self.TMPDIR, self.ZIPFILE)

def make_temp_files():
if not os.path.exists(ZIPPATH):
for name in FILENAMES:
with open(os.path.join(TMPDIR, name), 'w') as f:
f.write(''.join(random.sample(string.ascii_letters, 10)))
def tearDown(self):
shutil.rmtree(self.TMPDIR)

self.TMPDIR = None
self.ZIPFILE = None
self.ZIPPATH = None

def make_temp_archive():
make_temp_files()
with zipfile.ZipFile(ZIPPATH, mode="w") as z:
for name in FILENAMES:
z.write(os.path.join(TMPDIR, name), arcname=name)
def make_temp_files(self):
if not os.path.exists(self.ZIPPATH):
for name in FILENAMES:
with open(os.path.join(self.TMPDIR, name), 'w') as f:
f.write(''.join(random.sample(string.ascii_letters, 10)))


def make_temp_archive(self):
self.make_temp_files()
with zipfile.ZipFile(self.ZIPPATH, mode="w") as z:
for name in FILENAMES:
z.write(os.path.join(self.TMPDIR, name), arcname=name)


class TestIsArchiveName(unittest.TestCase):
@@ -72,14 +83,18 @@ class TestIsArchiveName(unittest.TestCase):
self.assertEqual(is_archive_name('foo.rpm'), 'cpio')


class TestIsArchiveZip(unittest.TestCase):
class TestIsArchiveZip(unittest.TestCase, MakeTempMixIn):
def setUp(self):
make_temp_archive()
MakeTempMixIn.setUp(self)
self.make_temp_archive()

def tearDown(self):
MakeTempMixIn.tearDown(self)

def test_zip(self):
self.assertEqual(is_archive(ZIPPATH), True)
self.assertEqual(is_archive(ZIPPATH, formats=('zip',)), True)
self.assertEqual(is_archive(ZIPPATH, formats=('tar',)), False)
self.assertEqual(is_archive(self.ZIPPATH), True)
self.assertEqual(is_archive(self.ZIPPATH, formats=('zip',)), True)
self.assertEqual(is_archive(self.ZIPPATH, formats=('tar',)), False)


class TestIsArchiveTar(unittest.TestCase):
@@ -89,17 +104,19 @@ class TestIsArchiveTar(unittest.TestCase):

# TODO: incorporate tests from:
# http://hg.python.org/cpython/file/a6e1d926cd98/Lib/test/test_zipfile.py
class TestZipRead(unittest.TestCase):
class TestZipRead(unittest.TestCase, MakeTempMixIn):
def setUp(self):
make_temp_archive()
self.f = open(ZIPPATH, mode='r')
MakeTempMixIn.setUp(self)
self.make_temp_archive()
self.f = open(self.ZIPPATH, mode='r')

def tearDown(self):
self.f.close()
MakeTempMixIn.tearDown(self)

def test_iszipfile(self):
self.assertEqual(is_zipfile('/dev/null'), False)
self.assertEqual(is_zipfile(ZIPPATH), True)
self.assertEqual(is_zipfile(self.ZIPPATH), True)

def test_iterate(self):
z = ZipFile(self.f, 'r')
@@ -158,18 +175,20 @@ class TestZipRead(unittest.TestCase):
pass


class TestZipWrite(unittest.TestCase):
class TestZipWrite(unittest.TestCase, MakeTempMixIn):
def setUp(self):
make_temp_files()
self.f = open(ZIPPATH, mode='w')
MakeTempMixIn.setUp(self)
self.make_temp_files()
self.f = open(self.ZIPPATH, mode='w')

def tearDown(self):
self.f.close()
MakeTempMixIn.tearDown(self)

def test_writepath(self):
z = ZipFile(self.f, 'w')
for fname in FILENAMES:
with open(os.path.join(TMPDIR, fname), 'r') as f:
with open(os.path.join(self.TMPDIR, fname), 'r') as f:
z.writepath(f)
z.close()

@@ -182,7 +201,7 @@ class TestZipWrite(unittest.TestCase):
z.close()
self.f.close()

f = open(ZIPPATH, mode='r')
f = open(self.ZIPPATH, mode='r')
z = ZipFile(f, 'r')

entries = z.infolist()
@@ -195,7 +214,7 @@ class TestZipWrite(unittest.TestCase):
def test_writestream(self):
z = ZipFile(self.f, 'w')
for fname in FILENAMES:
full_path = os.path.join(TMPDIR, fname)
full_path = os.path.join(self.TMPDIR, fname)
i = open(full_path)
o = z.writestream(fname)
while True:
@@ -210,7 +229,7 @@ class TestZipWrite(unittest.TestCase):
def test_writestream_unbuffered(self):
z = ZipFile(self.f, 'w')
for fname in FILENAMES:
full_path = os.path.join(TMPDIR, fname)
full_path = os.path.join(self.TMPDIR, fname)
i = open(full_path)
o = z.writestream(fname, os.path.getsize(full_path))
while True:
@@ -257,72 +276,75 @@ ITEM_NAME='test.txt'

ZIP1_PWD='pwd'
ZIP2_PWD='12345'
def create_file_from_content():
with open(ZIPPATH, mode='wb') as f:
f.write(base64.b64decode(ZIP_CONTENT))

class TestProtectedReading(unittest.TestCase, MakeTempMixIn):
def create_file_from_content(self):
with open(self.ZIPPATH, mode='wb') as f:
f.write(base64.b64decode(ZIP_CONTENT))

def create_protected_zip():
z = ZipFile(ZIPPATH, mode='w', password=ZIP2_PWD)
z.writestr(ITEM_NAME, ITEM_CONTENT)
z.close()


class TestProtectedReading(unittest.TestCase):
def setUp(self):
create_file_from_content()
MakeTempMixIn.setUp(self)
self.create_file_from_content()

def tearDown(self):
os.remove(ZIPPATH)
MakeTempMixIn.tearDown(self)

def test_read_with_password(self):
z = ZipFile(ZIPPATH, 'r', password=ZIP1_PWD)
z = ZipFile(self.ZIPPATH, 'r', password=ZIP1_PWD)
self.assertEqual(z.read(ITEM_NAME), bytes(ITEM_CONTENT, 'utf-8'))
z.close()

def test_read_without_password(self):
z = ZipFile(ZIPPATH, 'r')
z = ZipFile(self.ZIPPATH, 'r')
self.assertRaises(RuntimeError, z.read, ITEM_NAME)
z.close()

def test_read_with_wrong_password(self):
z = ZipFile(ZIPPATH, 'r', password='wrong')
z = ZipFile(self.ZIPPATH, 'r', password='wrong')
self.assertRaises(RuntimeError, z.read, ITEM_NAME)
z.close()

class TestProtectedWriting(unittest.TestCase):
class TestProtectedWriting(unittest.TestCase, MakeTempMixIn):
def create_protected_zip(self):
z = ZipFile(self.ZIPPATH, mode='w', password=ZIP2_PWD)
z.writestr(ITEM_NAME, ITEM_CONTENT)
z.close()

def setUp(self):
create_protected_zip()
MakeTempMixIn.setUp(self)
self.create_protected_zip()

def tearDown(self):
os.remove(ZIPPATH)
MakeTempMixIn.tearDown(self)

def test_read_with_password(self):
z = ZipFile(ZIPPATH, 'r', password=ZIP2_PWD)
z = ZipFile(self.ZIPPATH, 'r', password=ZIP2_PWD)
self.assertEqual(z.read(ITEM_NAME), bytes(ITEM_CONTENT, 'utf-8'))
z.close()

def test_read_without_password(self):
z = ZipFile(ZIPPATH, 'r')
z = ZipFile(self.ZIPPATH, 'r')
self.assertRaises(RuntimeError, z.read, ITEM_NAME)
z.close()

def test_read_with_wrong_password(self):
z = ZipFile(ZIPPATH, 'r', password='wrong')
z = ZipFile(self.ZIPPATH, 'r', password='wrong')
self.assertRaises(RuntimeError, z.read, ITEM_NAME)
z.close()

def test_read_with_password_list(self):
z = ZipFile(ZIPPATH, 'r', password=[ZIP1_PWD, ZIP2_PWD])
z = ZipFile(self.ZIPPATH, 'r', password=[ZIP1_PWD, ZIP2_PWD])
self.assertEqual(z.read(ITEM_NAME), bytes(ITEM_CONTENT, 'utf-8'))
z.close()



class TestHighLevelAPI(unittest.TestCase):
class TestHighLevelAPI(unittest.TestCase, MakeTempMixIn):
def setUp(self):
make_temp_archive()
MakeTempMixIn.setUp(self)
self.make_temp_archive()

def tearDown(self):
MakeTempMixIn.tearDown(self)

def _test_listing_content(self, f):
"""Test helper capturing file paths while iterating the archive."""
@@ -335,19 +357,74 @@ class TestHighLevelAPI(unittest.TestCase):

def test_open_by_name(self):
"""Test an archive opened directly by name."""
self._test_listing_content(ZIPPATH)
self._test_listing_content(self.ZIPPATH)

def test_open_by_named_fobj(self):
"""Test an archive using a file-like object opened by name."""
with open(ZIPPATH, 'rb') as f:
with open(self.ZIPPATH, 'rb') as f:
self._test_listing_content(f)

def test_open_by_unnamed_fobj(self):
"""Test an archive using file-like object opened by fileno()."""
with open(ZIPPATH, 'rb') as zf:
with open(self.ZIPPATH, 'rb') as zf:
with io.FileIO(zf.fileno(), mode='r', closefd=False) as f:
self._test_listing_content(f)

_defaulthash = 'sha512'

def _readfp(fp):
while True:
r = fp.read(64*1024)
# libarchive returns None on EOF
if r == b'' or r is None:
return

yield r

def _hashfp(fp):
hash = getattr(hashlib, _defaulthash)()
for r in _readfp(fp):
hash.update(r)

return '%s:%s' % (_defaulthash, hash.hexdigest())


class TestArchive(unittest.TestCase):
def setUp(self):
self.fixtures = pathlib.Path(__file__).parent / 'fixtures'

def test_closed(self):
fname = self.fixtures / 'testfile.tar.gz'

with Archive(fname) as arch:
origfp = arch.f

hashes = []

for i in arch:
if not i.isfile():
continue

with arch.readstream(i.size) as fp:
hashes.append(_hashfp(fp))

self.assertTrue(fp.closed)
self.assertIsNone(arch._stream)

self.assertEqual(hashes, [ 'sha512:90f8342520f0ac57fb5a779f5d331c2fa87aa40f8799940257f9ba619940951e67143a8d746535ed0284924b2b7bc1478f095198800ba96d01847d7b56ca465c', 'sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f' ])

self.assertTrue(arch.f.closed)

def test_noclose(self):
fname = self.fixtures / 'testfile.tar.gz'

with open(fname) as fp:
with Archive(fp) as arch:
pass

self.assertFalse(fp.closed)

self.assertTrue(fp.closed)

if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save