@@ -30,10 +30,9 @@ import time | |||
import warnings | |||
from libarchive import _libarchive | |||
try: | |||
from io import StringIO | |||
except ImportError: | |||
from io import StringIO | |||
from io import StringIO | |||
PY3 = sys.version_info[0] == 3 | |||
# Suggested block size for libarchive. Libarchive may adjust it. | |||
BLOCK_SIZE = 10240 | |||
@@ -165,7 +164,7 @@ def is_archive(f, formats=(None, ), filters=(None, )): | |||
filter = get_func(filter, FILTERS, 0) | |||
if filter is None: | |||
return False | |||
list(filter(a)) | |||
filter(a) | |||
try: | |||
try: | |||
call_and_check(_libarchive.archive_read_open_fd, a, a, f.fileno(), BLOCK_SIZE) | |||
@@ -175,6 +174,7 @@ def is_archive(f, formats=(None, ), filters=(None, )): | |||
finally: | |||
_libarchive.archive_read_close(a) | |||
_libarchive.archive_read_free(a) | |||
f.close() | |||
class EntryReadStream(object): | |||
@@ -271,7 +271,7 @@ class EntryWriteStream(object): | |||
if self.buffer: | |||
self.buffer.write(data) | |||
else: | |||
_libarchive.archive_write_data_from_str(self.archive._a, data) | |||
_libarchive.archive_write_data_from_str(self.archive._a, data.encode('utf-8')) | |||
self.bytes += len(data) | |||
def close(self): | |||
@@ -280,7 +280,7 @@ class EntryWriteStream(object): | |||
if self.buffer: | |||
self.entry.size = self.buffer.tell() | |||
self.entry.to_archive(self.archive) | |||
_libarchive.archive_write_data_from_str(self.archive._a, self.buffer.getvalue()) | |||
_libarchive.archive_write_data_from_str(self.archive._a, self.buffer.getvalue().encode('utf-8')) | |||
_libarchive.archive_write_finish_entry(self.archive._a) | |||
# Call archive.close() with _defer True to let it know we have been | |||
@@ -312,8 +312,13 @@ class Entry(object): | |||
call_and_check(_libarchive.archive_read_next_header2, archive._a, archive._a, e) | |||
mode = _libarchive.archive_entry_filetype(e) | |||
mode |= _libarchive.archive_entry_perm(e) | |||
entry = cls( | |||
if PY3: | |||
pathname=_libarchive.archive_entry_pathname(e) | |||
else: | |||
pathname=_libarchive.archive_entry_pathname(e).decode(encoding), | |||
entry = cls( | |||
pathname=pathname, | |||
size=_libarchive.archive_entry_size(e), | |||
mtime=_libarchive.archive_entry_mtime(e), | |||
mode=mode, | |||
@@ -353,7 +358,10 @@ class Entry(object): | |||
'''Creates an archive header and writes it to the given archive.''' | |||
e = _libarchive.archive_entry_new() | |||
try: | |||
_libarchive.archive_entry_set_pathname(e, self.pathname.encode(self.encoding)) | |||
if PY3: | |||
_libarchive.archive_entry_set_pathname(e, self.pathname) | |||
else: | |||
_libarchive.archive_entry_set_pathname(e, self.pathname.encode(self.encoding)) | |||
_libarchive.archive_entry_set_filetype(e, stat.S_IFMT(self.mode)) | |||
_libarchive.archive_entry_set_perm(e, stat.S_IMODE(self.mode)) | |||
_libarchive.archive_entry_set_size(e, self.size) | |||
@@ -539,9 +547,12 @@ class Archive(object): | |||
if data: | |||
member.size = len(data) | |||
member.to_archive(self) | |||
if data: | |||
_libarchive.archive_write_data_from_str(self._a, data) | |||
if PY3: | |||
result = _libarchive.archive_write_data_from_str(self._a, data.encode('utf8')) | |||
else: | |||
result = _libarchive.archive_write_data_from_str(self._a, data) | |||
_libarchive.archive_write_finish_entry(self._a) | |||
def writepath(self, f, pathname=None, folder=False): | |||
@@ -614,7 +625,11 @@ class SeekableArchive(Archive): | |||
def getentry(self, pathname): | |||
'''Take a name or entry object and returns an entry object.''' | |||
for entry in self: | |||
if entry.pathname == pathname: | |||
if PY3: | |||
entry_pathname = entry.pathname | |||
if not PY3: | |||
entry_pathname = entry.pathname[0] | |||
if entry_pathname == pathname: | |||
return entry | |||
raise KeyError(pathname) | |||
@@ -739,7 +739,7 @@ SWIG_UnpackDataName(const char *c, void *ptr, size_t sz, const char *name) { | |||
#define PyString_Size(str) PyBytes_Size(str) | |||
#define PyString_InternFromString(key) PyUnicode_InternFromString(key) | |||
#define Py_TPFLAGS_HAVE_CLASS Py_TPFLAGS_BASETYPE | |||
#define PyString_AS_STRING(x) PyUnicode_AS_UNICODE(x) | |||
#define PyString_AS_STRING(x) PyBytes_AsString(x) | |||
#define _PyLong_FromSsize_t(x) PyLong_FromSsize_t(x) | |||
#endif | |||
@@ -3274,7 +3274,11 @@ SWIG_FromCharPtrAndSize(const char* carray, size_t size) | |||
return pchar_descriptor ? | |||
SWIG_InternalNewPointerObj((char *)(carray), pchar_descriptor, 0) : SWIG_Py_Void(); | |||
} else { | |||
return PyUnicode_FromStringAndSize(carray, (int)(size)); | |||
#if PY_VERSION_HEX >= 0x03000000 | |||
return PyUnicode_FromStringAndSize(carray, (int)(size)); | |||
#else | |||
return PyString_FromStringAndSize(carray, (int)(size)); | |||
#endif | |||
} | |||
} else { | |||
return SWIG_Py_Void(); | |||
@@ -3338,10 +3342,17 @@ SWIG_AsVal_unsigned_SS_short (PyObject * obj, unsigned short *val) | |||
PyObject *archive_read_data_into_str(struct archive *archive, int len) { | |||
PyObject *str = NULL; | |||
if (!(str = PyUnicode_FromStringAndSize(NULL, len))) { | |||
#if PY_VERSION_HEX >= 0x03000000 | |||
if (!(str = PyBytes_FromStringAndSize(NULL, len))) { | |||
PyErr_SetString(PyExc_MemoryError, "could not allocate string."); | |||
return NULL; | |||
} | |||
#else | |||
if (!(str = PyString_FromStringAndSize(NULL, len))) { | |||
PyErr_SetString(PyExc_MemoryError, "could not allocate string."); | |||
return NULL; | |||
} | |||
#endif | |||
if (len != archive_read_data(archive, PyString_AS_STRING(str), len)) { | |||
PyErr_SetString(PyExc_RuntimeError, "could not read requested data."); | |||
return NULL; | |||
@@ -26,11 +26,13 @@ | |||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |||
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |||
import os, unittest, tempfile, random, string, subprocess | |||
import os, unittest, tempfile, random, string, subprocess, sys | |||
from libarchive import is_archive_name, is_archive | |||
from libarchive.zip import is_zipfile, ZipFile, ZipEntry | |||
PY3 = sys.version_info[0] == 3 | |||
TMPDIR = tempfile.mkdtemp() | |||
ZIPCMD = '/usr/bin/zip' | |||
ZIPFILE = 'test.zip' | |||
@@ -47,7 +49,8 @@ FILENAMES = [ | |||
def make_temp_files(): | |||
if not os.path.exists(ZIPPATH): | |||
for name in FILENAMES: | |||
open(os.path.join(TMPDIR, name), 'w').write(''.join(random.sample(string.printable, 10))) | |||
with open(os.path.join(TMPDIR, name), 'w') as f: | |||
f.write(''.join(random.sample(string.ascii_letters, 10))) | |||
def make_temp_archive(): | |||
@@ -93,14 +96,17 @@ class TestIsArchiveTar(unittest.TestCase): | |||
class TestZipRead(unittest.TestCase): | |||
def setUp(self): | |||
make_temp_archive() | |||
self.f = open(ZIPPATH, mode='r') | |||
def tearDown(self): | |||
self.f.close() | |||
def test_iszipfile(self): | |||
self.assertEqual(is_zipfile('/dev/null'), False) | |||
self.assertEqual(is_zipfile(ZIPPATH), True) | |||
def test_iterate(self): | |||
f = open(ZIPPATH, mode='r') | |||
z = ZipFile(f, 'r') | |||
z = ZipFile(self.f, 'r') | |||
count = 0 | |||
for e in z: | |||
count += 1 | |||
@@ -108,8 +114,7 @@ class TestZipRead(unittest.TestCase): | |||
def test_deferred_close_by_archive(self): | |||
""" Test archive deferred close without a stream. """ | |||
f = open(ZIPPATH, mode='r') | |||
z = ZipFile(f, 'r') | |||
z = ZipFile(self.f, 'r') | |||
self.assertIsNotNone(z._a) | |||
self.assertIsNone(z._stream) | |||
z.close() | |||
@@ -117,8 +122,7 @@ class TestZipRead(unittest.TestCase): | |||
def test_deferred_close_by_stream(self): | |||
""" Ensure archive closes self if stream is closed first. """ | |||
f = open(ZIPPATH, mode='r') | |||
z = ZipFile(f, 'r') | |||
z = ZipFile(self.f, 'r') | |||
stream = z.readstream(FILENAMES[0]) | |||
stream.close() | |||
# Make sure archive stays open after stream is closed. | |||
@@ -131,8 +135,7 @@ class TestZipRead(unittest.TestCase): | |||
def test_close_stream_first(self): | |||
""" Ensure that archive stays open after being closed if a stream is | |||
open. Further, ensure closing the stream closes the archive. """ | |||
f = open(ZIPPATH, mode='r') | |||
z = ZipFile(f, 'r') | |||
z = ZipFile(self.f, 'r') | |||
stream = z.readstream(FILENAMES[0]) | |||
z.close() | |||
try: | |||
@@ -146,11 +149,13 @@ class TestZipRead(unittest.TestCase): | |||
self.assertIsNone(z._stream) | |||
def test_filenames(self): | |||
f = open(ZIPPATH, mode='r') | |||
z = ZipFile(f, 'r') | |||
z = ZipFile(self.f, 'r') | |||
names = [] | |||
for e in z: | |||
names.append(e.filename) | |||
if PY3: | |||
names.append(e.filename) | |||
else: | |||
names.append(e.filename[0]) | |||
self.assertEqual(names, FILENAMES, 'File names differ in archive.') | |||
#~ def test_non_ascii(self): | |||
@@ -163,23 +168,25 @@ class TestZipRead(unittest.TestCase): | |||
class TestZipWrite(unittest.TestCase): | |||
def setUp(self): | |||
make_temp_files() | |||
self.f = open(ZIPPATH, mode='w') | |||
def tearDown(self): | |||
self.f.close() | |||
def test_writepath(self): | |||
f = open(ZIPPATH, mode='w') | |||
z = ZipFile(f, 'w') | |||
z = ZipFile(self.f, 'w') | |||
for fname in FILENAMES: | |||
z.writepath(file(os.path.join(TMPDIR, fname), 'r')) | |||
with open(os.path.join(TMPDIR, fname), 'r') as f: | |||
z.writepath(f) | |||
z.close() | |||
def test_writepath_directory(self): | |||
""" Test writing a directory. """ | |||
f = open(ZIPPATH, mode='w') | |||
z = ZipFile(f, 'w') | |||
z = ZipFile(self.f, 'w') | |||
z.writepath(None, pathname='/testdir', folder=True) | |||
z.writepath(None, pathname='/testdir/testinside', folder=True) | |||
z.close() | |||
f.close() | |||
self.f.close() | |||
f = open(ZIPPATH, mode='r') | |||
z = ZipFile(f, 'r') | |||
@@ -192,8 +199,7 @@ class TestZipWrite(unittest.TestCase): | |||
f.close() | |||
def test_writestream(self): | |||
f = open(ZIPPATH, mode='w') | |||
z = ZipFile(f, 'w') | |||
z = ZipFile(self.f, 'w') | |||
for fname in FILENAMES: | |||
full_path = os.path.join(TMPDIR, fname) | |||
i = open(full_path) | |||
@@ -202,14 +208,16 @@ class TestZipWrite(unittest.TestCase): | |||
data = i.read(1) | |||
if not data: | |||
break | |||
o.write(data) | |||
if PY3: | |||
o.write(data) | |||
else: | |||
o.write(unicode(data)) | |||
o.close() | |||
i.close() | |||
z.close() | |||
def test_writestream_unbuffered(self): | |||
f = open(ZIPPATH, mode='w') | |||
z = ZipFile(f, 'w') | |||
z = ZipFile(self.f, 'w') | |||
for fname in FILENAMES: | |||
full_path = os.path.join(TMPDIR, fname) | |||
i = open(full_path) | |||
@@ -218,20 +226,25 @@ class TestZipWrite(unittest.TestCase): | |||
data = i.read(1) | |||
if not data: | |||
break | |||
o.write(data) | |||
if PY3: | |||
o.write(data) | |||
else: | |||
o.write(unicode(data)) | |||
o.close() | |||
i.close() | |||
z.close() | |||
def test_deferred_close_by_archive(self): | |||
""" Test archive deferred close without a stream. """ | |||
f = open(ZIPPATH, mode='w') | |||
z = ZipFile(f, 'w') | |||
z = ZipFile(self.f, 'w') | |||
o = z.writestream(FILENAMES[0]) | |||
z.close() | |||
self.assertIsNotNone(z._a) | |||
self.assertIsNotNone(z._stream) | |||
o.write('testdata') | |||
if PY3: | |||
o.write('testdata') | |||
else: | |||
o.write(unicode('testdata')) | |||
o.close() | |||
self.assertIsNone(z._a) | |||
self.assertIsNone(z._stream) | |||