Browse Source

Use open instead of file

test_fixup
Jessica Hair 5 years ago
parent
commit
bfe92166e5
2 changed files with 19 additions and 19 deletions
  1. +5
    -5
      libarchive/__init__.py
  2. +14
    -14
      tests.py

+ 5
- 5
libarchive/__init__.py View File

@@ -154,7 +154,7 @@ def is_archive(f, formats=(None, ), filters=(None, )):
This function will return True if the file can be opened as an archive using the given This function will return True if the file can be opened as an archive using the given
format(s)/filter(s).''' format(s)/filter(s).'''
if isinstance(f, str): if isinstance(f, str):
f = file(f, 'r')
f = open(f, 'r')
a = _libarchive.archive_read_new() a = _libarchive.archive_read_new()
for format in formats: for format in formats:
format = get_func(format, FORMATS, 0) format = get_func(format, FORMATS, 0)
@@ -392,7 +392,7 @@ class Archive(object):
self.blocksize = blocksize self.blocksize = blocksize
if isinstance(f, str): if isinstance(f, str):
self.filename = f self.filename = f
f = file(f, mode)
f = open(f, mode)
# Only close it if we opened it... # Only close it if we opened it...
self._defer_close = True self._defer_close = True
elif hasattr(f, 'fileno'): elif hasattr(f, 'fileno'):
@@ -524,7 +524,7 @@ class Archive(object):
basedir = os.path.basename(f) basedir = os.path.basename(f)
if not os.path.exists(basedir): if not os.path.exists(basedir):
os.makedirs(basedir) os.makedirs(basedir)
f = file(f, 'w')
f = open(f, 'w')
return _libarchive.archive_read_data_into_fd(self._a, f.fileno()) return _libarchive.archive_read_data_into_fd(self._a, f.fileno())


def readstream(self, size): def readstream(self, size):
@@ -550,7 +550,7 @@ class Archive(object):
member = self.entry_class.from_file(f, encoding=self.encoding) member = self.entry_class.from_file(f, encoding=self.encoding)
if isinstance(f, str): if isinstance(f, str):
if os.path.isfile(f): if os.path.isfile(f):
f = file(f, 'r')
f = open(f, 'r')
if pathname: if pathname:
member.pathname = pathname member.pathname = pathname
if folder and not member.isdir(): if folder and not member.isdir():
@@ -588,7 +588,7 @@ class SeekableArchive(Archive):
# Convert file to open file. We need this to reopen the archive. # Convert file to open file. We need this to reopen the archive.
mode = kwargs.setdefault('mode', 'r') mode = kwargs.setdefault('mode', 'r')
if isinstance(f, str): if isinstance(f, str):
f = file(f, mode)
f = open(f, mode)
super(SeekableArchive, self).__init__(f, **kwargs) super(SeekableArchive, self).__init__(f, **kwargs)
self.entries = [] self.entries = []
self.eof = False self.eof = False


+ 14
- 14
tests.py View File

@@ -47,7 +47,7 @@ FILENAMES = [
def make_temp_files(): def make_temp_files():
if not os.path.exists(ZIPPATH): if not os.path.exists(ZIPPATH):
for name in FILENAMES: for name in FILENAMES:
file(os.path.join(TMPDIR, name), 'w').write(''.join(random.sample(string.printable, 10)))
open(os.path.join(TMPDIR, name), 'w').write(''.join(random.sample(string.printable, 10)))




def make_temp_archive(): def make_temp_archive():
@@ -99,7 +99,7 @@ class TestZipRead(unittest.TestCase):
self.assertEqual(is_zipfile(ZIPPATH), True) self.assertEqual(is_zipfile(ZIPPATH), True)


def test_iterate(self): def test_iterate(self):
f = file(ZIPPATH, mode='r')
f = open(ZIPPATH, mode='r')
z = ZipFile(f, 'r') z = ZipFile(f, 'r')
count = 0 count = 0
for e in z: for e in z:
@@ -108,7 +108,7 @@ class TestZipRead(unittest.TestCase):


def test_deferred_close_by_archive(self): def test_deferred_close_by_archive(self):
""" Test archive deferred close without a stream. """ """ Test archive deferred close without a stream. """
f = file(ZIPPATH, mode='r')
f = open(ZIPPATH, mode='r')
z = ZipFile(f, 'r') z = ZipFile(f, 'r')
self.assertIsNotNone(z._a) self.assertIsNotNone(z._a)
self.assertIsNone(z._stream) self.assertIsNone(z._stream)
@@ -117,7 +117,7 @@ class TestZipRead(unittest.TestCase):


def test_deferred_close_by_stream(self): def test_deferred_close_by_stream(self):
""" Ensure archive closes self if stream is closed first. """ """ Ensure archive closes self if stream is closed first. """
f = file(ZIPPATH, mode='r')
f = open(ZIPPATH, mode='r')
z = ZipFile(f, 'r') z = ZipFile(f, 'r')
stream = z.readstream(FILENAMES[0]) stream = z.readstream(FILENAMES[0])
stream.close() stream.close()
@@ -131,7 +131,7 @@ class TestZipRead(unittest.TestCase):
def test_close_stream_first(self): def test_close_stream_first(self):
""" Ensure that archive stays open after being closed if a stream is """ Ensure that archive stays open after being closed if a stream is
open. Further, ensure closing the stream closes the archive. """ open. Further, ensure closing the stream closes the archive. """
f = file(ZIPPATH, mode='r')
f = open(ZIPPATH, mode='r')
z = ZipFile(f, 'r') z = ZipFile(f, 'r')
stream = z.readstream(FILENAMES[0]) stream = z.readstream(FILENAMES[0])
z.close() z.close()
@@ -146,7 +146,7 @@ class TestZipRead(unittest.TestCase):
self.assertIsNone(z._stream) self.assertIsNone(z._stream)


def test_filenames(self): def test_filenames(self):
f = file(ZIPPATH, mode='r')
f = open(ZIPPATH, mode='r')
z = ZipFile(f, 'r') z = ZipFile(f, 'r')
names = [] names = []
for e in z: for e in z:
@@ -165,7 +165,7 @@ class TestZipWrite(unittest.TestCase):
make_temp_files() make_temp_files()


def test_writepath(self): def test_writepath(self):
f = file(ZIPPATH, mode='w')
f = open(ZIPPATH, mode='w')
z = ZipFile(f, 'w') z = ZipFile(f, 'w')
for fname in FILENAMES: for fname in FILENAMES:
z.writepath(file(os.path.join(TMPDIR, fname), 'r')) z.writepath(file(os.path.join(TMPDIR, fname), 'r'))
@@ -174,14 +174,14 @@ class TestZipWrite(unittest.TestCase):
def test_writepath_directory(self): def test_writepath_directory(self):
""" Test writing a directory. """ """ Test writing a directory. """


f = file(ZIPPATH, mode='w')
f = open(ZIPPATH, mode='w')
z = ZipFile(f, 'w') z = ZipFile(f, 'w')
z.writepath(None, pathname='/testdir', folder=True) z.writepath(None, pathname='/testdir', folder=True)
z.writepath(None, pathname='/testdir/testinside', folder=True) z.writepath(None, pathname='/testdir/testinside', folder=True)
z.close() z.close()
f.close() f.close()


f = file(ZIPPATH, mode='r')
f = open(ZIPPATH, mode='r')
z = ZipFile(f, 'r') z = ZipFile(f, 'r')


entries = z.infolist() entries = z.infolist()
@@ -192,11 +192,11 @@ class TestZipWrite(unittest.TestCase):
f.close() f.close()


def test_writestream(self): def test_writestream(self):
f = file(ZIPPATH, mode='w')
f = open(ZIPPATH, mode='w')
z = ZipFile(f, 'w') z = ZipFile(f, 'w')
for fname in FILENAMES: for fname in FILENAMES:
full_path = os.path.join(TMPDIR, fname) full_path = os.path.join(TMPDIR, fname)
i = file(full_path)
i = open(full_path)
o = z.writestream(fname) o = z.writestream(fname)
while True: while True:
data = i.read(1) data = i.read(1)
@@ -208,11 +208,11 @@ class TestZipWrite(unittest.TestCase):
z.close() z.close()


def test_writestream_unbuffered(self): def test_writestream_unbuffered(self):
f = file(ZIPPATH, mode='w')
f = open(ZIPPATH, mode='w')
z = ZipFile(f, 'w') z = ZipFile(f, 'w')
for fname in FILENAMES: for fname in FILENAMES:
full_path = os.path.join(TMPDIR, fname) full_path = os.path.join(TMPDIR, fname)
i = file(full_path)
i = open(full_path)
o = z.writestream(fname, os.path.getsize(full_path)) o = z.writestream(fname, os.path.getsize(full_path))
while True: while True:
data = i.read(1) data = i.read(1)
@@ -225,7 +225,7 @@ class TestZipWrite(unittest.TestCase):


def test_deferred_close_by_archive(self): def test_deferred_close_by_archive(self):
""" Test archive deferred close without a stream. """ """ Test archive deferred close without a stream. """
f = file(ZIPPATH, mode='w')
f = open(ZIPPATH, mode='w')
z = ZipFile(f, 'w') z = ZipFile(f, 'w')
o = z.writestream(FILENAMES[0]) o = z.writestream(FILENAMES[0])
z.close() z.close()


Loading…
Cancel
Save