This imports magic.py from file-magic and merges magic_wrap.py into it... This also updates detect_from_filename to try w/ _COMPRESS, and if it returns an error, normal mode. This is necessary as [some?] zip files can be decompressed by gzip, but throws an error...main
@@ -0,0 +1,32 @@ | |||
[ | |||
{ | |||
"title": "gen ident", | |||
"cmd": [ "genident", "name=A Test User" ], | |||
"exit": 0 | |||
}, | |||
{ | |||
"special": "setup file", | |||
"file": "testfile.zip" | |||
}, | |||
{ | |||
"title": "import zip container", | |||
"cmd": [ "container", "testfile.zip" ] | |||
}, | |||
{ | |||
"special": "verify store object cnt", | |||
"comment": "should have one container and one file", | |||
"count": 2 | |||
}, | |||
{ | |||
"title": "verify correct files imported", | |||
"cmd": [ "dump" ], | |||
"stdout_check": [ | |||
{ "type": "identity" }, | |||
{ "files": [ "testfiles/newfile.txt", "testfiles/test.txt" ], | |||
"hashes": [ "sha512:90f8342520f0ac57fb5a779f5d331c2fa87aa40f8799940257f9ba619940951e67143a8d746535ed0284924b2b7bc1478f095198800ba96d01847d7b56ca465c", "sha512:7d5768d47b6bc27dc4fa7e9732cfa2de506ca262a2749cb108923e5dddffde842bbfee6cb8d692fb43aca0f12946c521cce2633887914ca1f96898478d10ad3f" ], | |||
"type": "container", | |||
"uri": "hash://sha512/ee141c9288d3c0d240addd9d688970481ff5107bf4b437782f79afcecbc6de207d6e0900d341125a6961918608b7e9f2c53f194600d3a8326e81b182eabb9e51" }, | |||
{ "type": "file", "hashes": [ "sha512:ee141c9288d3c0d240addd9d688970481ff5107bf4b437782f79afcecbc6de207d6e0900d341125a6961918608b7e9f2c53f194600d3a8326e81b182eabb9e51" ] } | |||
] | |||
} | |||
] |
@@ -33,7 +33,7 @@ from unittest import mock | |||
from .hostid import hostuuid | |||
from .tags import TagCache | |||
from . import orm | |||
from .magic_wrap import detect_from_filename | |||
from .magic import detect_from_filename | |||
from .btv import _TestCases as bttestcase, validate_file | |||
@@ -1493,6 +1493,7 @@ def handle_archive(fname, persona, objstr): | |||
_container_mapping = { | |||
'application/x-bittorrent': handle_bittorrent, | |||
'application/x-tar': handle_archive, | |||
'application/zip': handle_archive, | |||
} | |||
@init_datastructs | |||
@@ -2493,6 +2494,9 @@ class _TestCases(unittest.TestCase): | |||
elif special == 'delete files': | |||
for i in cmd['files']: | |||
os.unlink(i) | |||
elif special == 'setup file': | |||
shutil.copy(self.fixtures / | |||
cmd['file'], self.tempdir) | |||
elif special == 'setup tar file': | |||
shutil.copy(self.fixtures / | |||
'testfile.tar.gz', self.tempdir) | |||
@@ -34,13 +34,23 @@ Python bindings for libmagic | |||
''' | |||
import ctypes | |||
import pathlib | |||
import threading | |||
import unittest | |||
from collections import namedtuple | |||
from ctypes import * | |||
from ctypes.util import find_library | |||
from .utils import _debprint | |||
__all__ = [ | |||
'detect_from_filename', | |||
'detect_from_fobj', | |||
'detect_from_content', | |||
] | |||
def _init(): | |||
""" | |||
@@ -88,8 +98,8 @@ MAGIC_PARAM_ELF_NOTES_MAX = PARAM_ELF_NOTES_MAX = 4 | |||
MAGIC_PARAM_REGEX_MAX = PARAM_REGEX_MAX = 5 | |||
MAGIC_PARAM_BYTES_MAX = PARAM_BYTES_MAX = 6 | |||
FileMagic = namedtuple('FileMagic', ('mime_type', 'encoding', 'name')) | |||
FileMagic = namedtuple('FileMagic', ('mime_type', 'encoding', | |||
'name', 'compressed_type'), defaults=[ '' ]) | |||
class magic_set(Structure): | |||
pass | |||
@@ -152,6 +162,13 @@ _setparam = _libraries['magic'].magic_setparam | |||
_setparam.restype = c_int | |||
_setparam.argtypes = [magic_t, c_int, c_void_p] | |||
_mgp = _libraries['magic'].magic_getpath | |||
_mgp.restype = c_char_p | |||
_mgp.argtypes = [ c_char_p, c_int ] | |||
_mlb = _libraries['magic'].magic_load_buffers | |||
_mlb.restype = c_int | |||
_mlb.argtypes = [ magic_t, POINTER(c_void_p), POINTER(c_size_t), c_size_t ] | |||
class Magic(object): | |||
def __init__(self, ms): | |||
@@ -235,7 +252,10 @@ class Magic(object): | |||
Returns 0 on success and -1 on failure. | |||
""" | |||
return _load(self._magic_t, Magic.__tobytes(filename)) | |||
files = _mgp(None, 0).decode('utf-8') + '.mgc' + ':' + str(pathlib.Path(__file__).parent / 'magic') | |||
return _load(self._magic_t, files.encode('utf-8')) | |||
def compile(self, dbs): | |||
""" | |||
@@ -313,30 +333,40 @@ class error(Exception): | |||
class MagicDetect(object): | |||
def __init__(self): | |||
self.mime_magic = open(MAGIC_MIME) | |||
if self.mime_magic is None: | |||
raise error | |||
if self.mime_magic.load() == -1: | |||
self.mime_magic.close() | |||
self.mime_magic = None | |||
raise error | |||
self.none_magic = open(MAGIC_NONE) | |||
if self.none_magic is None: | |||
self.mime_magic.close() | |||
self.mime_magic = None | |||
raise error | |||
if self.none_magic.load() == -1: | |||
self.none_magic.close() | |||
self.none_magic = None | |||
self.mime_magic.close() | |||
self.mime_magic = None | |||
undo = [] | |||
self._loaded = [] | |||
err = None | |||
for attr, flags in [ | |||
('mime_magic', MAGIC_MIME), | |||
('none_magic', MAGIC_NONE), | |||
('mimecomp_magic', MAGIC_MIME|MAGIC_COMPRESS), | |||
('nonecomp_magic', MAGIC_NONE|MAGIC_COMPRESS), | |||
]: | |||
r = open(flags) | |||
if r is None: | |||
break | |||
if r.load() == -1: | |||
r.close() | |||
break | |||
setattr(self, attr, r) | |||
undo.append(attr) | |||
else: | |||
self._loaded = undo | |||
undo = [] | |||
for attr in undo: | |||
getattr(self, attr).close() | |||
setattr(self, attr, None) | |||
if undo: | |||
raise error | |||
def __del__(self): | |||
if self.mime_magic is not None: | |||
self.mime_magic.close() | |||
if self.none_magic is not None: | |||
self.none_magic.close() | |||
for attr in self._loaded: | |||
getattr(self, attr).close() | |||
setattr(self, attr, None) | |||
threadlocal = threading.local() | |||
@@ -349,13 +379,21 @@ def _detect_make(): | |||
def _create_filemagic(mime_detected, type_detected): | |||
try: | |||
mime_type, mime_encoding = mime_detected.split('; ') | |||
mime_type, mime_encoding = mime_detected.split('; ', 1) | |||
except ValueError: | |||
raise ValueError(mime_detected) | |||
return FileMagic(name=type_detected, mime_type=mime_type, | |||
encoding=mime_encoding.replace('charset=', '')) | |||
kwargs = {} | |||
try: | |||
mime_encoding, compressed_type = mime_encoding.split(' compressed-encoding=') | |||
except ValueError: | |||
pass | |||
else: | |||
compressed_type, _ = compressed_type.split('; ', 1) | |||
kwargs['compressed_type'] = compressed_type | |||
return FileMagic(name=type_detected, mime_type=mime_type, | |||
encoding=mime_encoding.replace('charset=', ''), **kwargs) | |||
def detect_from_filename(filename): | |||
'''Detect mime type, encoding and file type from a filename | |||
@@ -363,9 +401,15 @@ def detect_from_filename(filename): | |||
Returns a `FileMagic` namedtuple. | |||
''' | |||
x = _detect_make() | |||
return _create_filemagic(x.mime_magic.file(filename), | |||
x.none_magic.file(filename)) | |||
t = x.mimecomp_magic.file(filename) | |||
# if there's a decomp error, don't look at decomp | |||
if t.startswith('application/x-decompression-error'): | |||
return _create_filemagic(x.mime_magic.file(filename), | |||
x.none_magic.file(filename)) | |||
return _create_filemagic(t, x.nonecomp_magic.file(filename)) | |||
def detect_from_fobj(fobj): | |||
'''Detect mime type, encoding and file type from file-like object | |||
@@ -388,3 +432,12 @@ def detect_from_content(byte_content): | |||
x = _detect_make() | |||
return _create_filemagic(x.mime_magic.buffer(byte_content), | |||
x.none_magic.buffer(byte_content)) | |||
class _TestMagic(unittest.TestCase): | |||
def test_create_filemagic(self): | |||
a = _create_filemagic('application/x-tar; charset=binary compressed-encoding=application/gzip; charset=binary', 'foobar') | |||
self.assertEqual(a.mime_type, 'application/x-tar') | |||
self.assertEqual(a.encoding, 'binary') | |||
self.assertEqual(a.compressed_type, 'application/gzip') | |||
self.assertEqual(a.name, 'foobar') |
@@ -1,155 +0,0 @@ | |||
import functools | |||
import importlib | |||
import magic | |||
import os | |||
import pathlib | |||
import shutil | |||
import tempfile | |||
import unittest | |||
from .utils import _debprint | |||
__doc__ = ''' | |||
This is a number of hacks to the Python magic module so that it works | |||
better. These bugs should be fixed in the module, but I don't want to | |||
deal w/ forking and getting the fixed upstreamed. | |||
''' | |||
magic.FileMagic = magic.namedtuple('FileMagic', ('mime_type', 'encoding', | |||
'name', 'compressed_type'), defaults=[ '' ]) | |||
from magic import * | |||
__all__ = [ | |||
'detect_from_filename', | |||
'detect_from_content', | |||
] | |||
_mgc_data = {} | |||
_lcl_mgc_data = None | |||
# Wrapper magic.open so that we look past compression | |||
_real_magic_open = magic.open | |||
@functools.wraps(magic.open) | |||
def open(flags): | |||
return _real_magic_open(flags|magic.MAGIC_COMPRESS) | |||
magic.open = open | |||
def _create_filemagic(mime_detected, type_detected): | |||
try: | |||
mime_type, mime_encoding = mime_detected.split('; ', 1) | |||
except ValueError: | |||
raise ValueError(mime_detected) | |||
kwargs = {} | |||
try: | |||
mime_encoding, compressed_type = mime_encoding.split(' compressed-encoding=') | |||
except ValueError: | |||
pass | |||
else: | |||
compressed_type, _ = compressed_type.split('; ', 1) | |||
kwargs['compressed_type'] = compressed_type | |||
return FileMagic(name=type_detected, mime_type=mime_type, | |||
encoding=mime_encoding.replace('charset=', ''), **kwargs) | |||
magic._create_filemagic = _create_filemagic | |||
def _get_mgc_data(fname): | |||
try: | |||
return _mgc_data[fname] | |||
except KeyError: | |||
data = pathlib.Path(fname).read_bytes() | |||
_mgc_data[fname] = data | |||
return data | |||
def _compile_file(inp, out, tempdir): | |||
oldcwd = pathlib.Path.cwd() | |||
try: | |||
os.chdir(tempdir) | |||
mag = magic.open(magic.MAGIC_NONE) | |||
mag.compile(str(inp)) | |||
inp.with_suffix('.mgc').rename(out) | |||
finally: | |||
os.chdir(oldcwd) | |||
def _compile_lcl(): | |||
magicfile = importlib.resources.files('medashare') / 'magic' | |||
try: | |||
d = pathlib.Path(tempfile.mkdtemp()).resolve() | |||
# write out data | |||
inpfile = d / 'magic' | |||
inpfile.write_bytes(magicfile.read_bytes()) | |||
# where it'll go | |||
outfile = d / 'someotherfile' | |||
_compile_file(inpfile, outfile, tempdir=d) | |||
return outfile.read_bytes() | |||
finally: | |||
shutil.rmtree(d) | |||
def _get_mgc_res(): | |||
global _lcl_mgc_data | |||
if _lcl_mgc_data is None: | |||
try: | |||
mgcfile = importlib.resources.files('medashare') / 'magic.mgc' | |||
_lcl_mgc_data = mgcfile.read_bytes() | |||
except FileNotFoundError: | |||
_lcl_mgc_data = _compile_lcl() | |||
_debprint(repr(_lcl_mgc_data)) | |||
return _lcl_mgc_data | |||
# patch magic to load custom magic file | |||
_mgp = magic._libraries['magic'].magic_getpath | |||
_mgp.restype = magic.c_char_p | |||
_mgp.argtypes = [ magic.c_char_p, magic.c_int ] | |||
_mlb = magic._libraries['magic'].magic_load_buffers | |||
_mlb.restype = magic.c_int | |||
_mlb.argtypes = [ magic.magic_t, magic.POINTER(magic.c_void_p), magic.POINTER(magic.c_size_t), magic.c_size_t ] | |||
def _new_magic_load(self, fname=None): | |||
files = _mgp(None, 0).decode('utf-8') + '.mgc' + ':' + str(pathlib.Path(__file__).parent / 'magic') | |||
return magic._load(self._magic_t, files.encode('utf-8')) | |||
# XXX - for some reason this code isn't working | |||
files = [ _mgp(None, 0).decode('utf-8') + '.mgc' ] | |||
buffers = [ _get_mgc_data(x) for x in files ] + [ _get_mgc_res() ] | |||
#buffers.reverse() | |||
del buffers[0] | |||
cnt = len(buffers) | |||
mgcdatas = [ (magic.c_char * len(x))(*x) for x in buffers ] | |||
bufs = (magic.c_void_p * cnt)(*(magic.cast(magic.pointer(x), magic.c_void_p) for x in mgcdatas)) | |||
sizes = (magic.c_size_t * cnt)(*(len(x) for x in buffers)) | |||
_debprint('mg:', cnt, repr([len(x) for x in buffers])) | |||
r = _mlb(self._magic_t, bufs, sizes, cnt) | |||
return r | |||
magic.Magic.load = _new_magic_load | |||
class _TestMagic(unittest.TestCase): | |||
def test_create_filemagic(self): | |||
a = _create_filemagic('application/x-tar; charset=binary compressed-encoding=application/gzip; charset=binary', 'foobar') | |||
self.assertEqual(a.mime_type, 'application/x-tar') | |||
self.assertEqual(a.encoding, 'binary') | |||
self.assertEqual(a.compressed_type, 'application/gzip') | |||
self.assertEqual(a.name, 'foobar') |
@@ -6,4 +6,4 @@ from .cli import _TestMigrations | |||
from .tags import _TestTagCache | |||
from .mtree import Test | |||
from .server import _TestCases, _TestPostConfig | |||
from .magic_wrap import _TestMagic | |||
from .magic import _TestMagic |
@@ -64,7 +64,6 @@ setup( | |||
'orm', | |||
'pasn1 @ git+https://www.funkthat.com/gitea/jmg/pasn1.git@c6c64510b42292557ace2b77272eb32cb647399d#egg=pasn1', | |||
'python-libarchive @ git+https://www.funkthat.com/gitea/jmg/python-libarchive.git#egg=python-libarchive', | |||
'file-magic @ git+https://github.com/file/file.git#egg=file-magic&subdirectory=python', | |||
'pydantic[dotenv]', | |||
], | |||
include_package_data=True, | |||