@@ -5,12 +5,17 @@ from .BitField import BitField
from fractions import Fraction
from fractions import Fraction
from io import BytesIO
from io import BytesIO
import bisect
import enum
import enum
import hashlib
import io
import io
import itertools
import itertools
import json
import os
import pathlib
import pathlib
import string
import string
import struct
import struct
import unittest
# Various specifications:
# Various specifications:
# CRW: https://web.archive.org/web/20081230095207/http://xyrion.org/ciff/CIFFspecV1R04.pdf
# CRW: https://web.archive.org/web/20081230095207/http://xyrion.org/ciff/CIFFspecV1R04.pdf
@@ -53,6 +58,7 @@ class TagName:
cls.__init__(r, name, tag)
cls.__init__(r, name, tag)
cls._cache[tag] = r
cls._cache[tag] = r
cls._cache[name] = r
cls._names.add(name)
cls._names.add(name)
return r
return r
@@ -139,8 +145,9 @@ _tags = {
'CAMERA_IDENT': (1, 16), # Camera Owner
'CAMERA_IDENT': (1, 16), # Camera Owner
'CAMERA_MAKEMODEL': (1, 10), # Camera Make and Model
'CAMERA_MAKEMODEL': (1, 10), # Camera Make and Model
'CAMERA_SHUTTERCOUNT': (3, 4),
'CAMERA_SHUTTERCOUNT': (3, 4),
'CAMERA_SERIALNO': (3, 11),
'CAMERA_MODELID': (3, 0x34), # Camera model id
'CAMERA_BODYID': (3, 0x0b),
'CAMERA_SERIALNO': (3, 0x17),
'CAMERA_MODELID': (3, 0x34), # Camera model id
'CAMERA_DECODERTABLE': (3, 0x35),
'CAMERA_DECODERTABLE': (3, 0x35),
'MANUFACTURE_COUNTRY': (1, 13), # Camera Counter
'MANUFACTURE_COUNTRY': (1, 13), # Camera Counter
@@ -697,6 +704,18 @@ def getstructs(fh, aoff, len, endian, fmt):
def printcode(code, data):
def printcode(code, data):
print("0x%04x: %s" % (code, data))
print("0x%04x: %s" % (code, data))
def defascii(code, fh, aoff, len, endian, cnt=1):
fh.seek(aoff)
data = fh.read(len)
#print('da:', len, repr(data))
strs = tuple(x.decode('ascii') for x in data.split(b'\x00', cnt)[:cnt])
if cnt == 1:
return strs[0]
return strs
def defdata(code, fh, aoff, len, endian):
def defdata(code, fh, aoff, len, endian):
fh.seek(aoff)
fh.seek(aoff)
data = fh.read(len)
data = fh.read(len)
@@ -780,23 +799,19 @@ def doexposeinfo(*args):
datahandles = [
datahandles = [
#(4, 7, "Thumbnail Big", None),
#(4, 7, "Thumbnail Big", None),
#(4, 8, "Thumbnail Small", None),
#(4, 8, "Thumbnail Small", None),
(1, 0x16, "File CRW", None),
(1, 0x17, "File THM", None),
# spec says TIFF like, but there isn't double NUL at end
(1, 0x0a, "Make and Model", lambda *args: defascii(*args, 2)),
(2, 0x29, "Focal length (mm), sensor x, y in mils", dofocal),
(2, 0x29, "Focal length (mm), sensor x, y in mils", dofocal),
(2, 0x2a, 'ISO, Aperture, Shutter, WBI', doexposeinfo),
(2, 0x2a, 'ISO, Aperture, Shutter, WBI', doexposeinfo),
(2, 0x2d, 'Lots of settings', docamerasettings),
(2, 0x2d, 'Lots of settings', docamerasettings),
(0, None, "def byte alignment", defbyte),
(0, None, "def byte alignment", defbyte),
(1, None, "def character string", defdata ),
(1, None, "def character string", defascii ),
(2, None, "def half words", defhalf),
(2, None, "def half words", defhalf),
(3, None, "def words", defword),
(3, None, "def words", defword),
(4, None, "def arbitrary structure", defdata),
(4, None, "def arbitrary structure", defdata),
]
]
THMB_BIG = 0x2007
THMB_SML = 0x2008
FILE_CRW = 0x0816
FILE_THM = 0x0817
#0805 desc str
#0805 desc str
#0815 fmt:desc str
#0815 fmt:desc str
#080a make NUL model NUL
#080a make NUL model NUL
@@ -844,6 +859,9 @@ class heapcontainer(list):
def find(self, k):
def find(self, k):
r = self.searchheap(lambda k, v, matk=k: k == matk)
r = self.searchheap(lambda k, v, matk=k: k == matk)
if not len(r):
raise ValueError('tag %s not found' % repr(k))
if len(r) != 1:
if len(r) != 1:
raise RuntimeError('more than one found: %s' % repr(k))
raise RuntimeError('more than one found: %s' % repr(k))
@@ -979,7 +997,7 @@ def tiff_ifd(fh, endian, off):
yield (None, nextifd, None)
yield (None, nextifd, None)
def parse_ciff(fh, offset, length, endian):
def parse_ciff(fh, offset, length, endian, ignoretags ):
ret = heapcontainer()
ret = heapcontainer()
#print offset, length
#print offset, length
fh.seek(offset + length - struct.calcsize("I"))
fh.seek(offset + length - struct.calcsize("I"))
@@ -1017,16 +1035,24 @@ def parse_ciff(fh, offset, length, endian):
#print (dataType, code), aoff, len
#print (dataType, code), aoff, len
if dataType in heapRECs:
if dataType in heapRECs:
#print 'recursing in parse_ciff, type: %d, code: 0x%04x' % (dataType, code)
#print 'recursing in parse_ciff, type: %d, code: 0x%04x' % (dataType, code)
ret.append((dtc, parse_ciff(fh, aoff, olen, endian)))
ret.append((dtc, parse_ciff(fh, aoff, olen, endian, ignoretags )))
#print 'back'
#print 'back'
continue
continue
if dtc in ignoretags:
#print('ignoring:', repr(dtc))
continue
for itype, iidcode, string, fun in datahandles:
for itype, iidcode, string, fun in datahandles:
if itype == dataType and iidcode == None or iidcode == code:
#print "found:", string
if itype == dataType and ( iidcode == None or iidcode == code) :
#print('found:', string, repr(dtc))
if fun:
if fun:
ret.append((dtc, fun(getIDCode(type), fh, aoff, len, endian)))
ret.append((dtc, fun(getIDCode(type), fh, aoff, len, endian)))
#else:
# print('ukn dtc:', repr(dtc))
break
break
else:
raise NotImplementedError('unhandled: %s' % repr(dtc))
return ret
return ret
@@ -1045,7 +1071,9 @@ def getendian(val):
return endian
return endian
def idcrw(fh):
def idcrw(fh, ignoretags=()):
ignoretags = set(TagName.find(x) for x in ignoretags)
fh.seek(0)
fh.seek(0)
isjpeg = False
isjpeg = False
try:
try:
@@ -1056,7 +1084,7 @@ def idcrw(fh):
data = fh.read(2)
data = fh.read(2)
if data != b'\xff\xd8':
if data != b'\xff\xd8':
raise x
raise ValueError('not a TIFF style (CRW/CR2) or JPEG file')
# Find Exif marker
# Find Exif marker
pos = 2
pos = 2
@@ -1114,7 +1142,7 @@ def idcrw(fh):
if version != 0x00010002:
if version != 0x00010002:
raise ValueError('incorrect version: %08x' % version)
raise ValueError('incorrect version: %08x' % version)
fh.seek(0, io.SEEK_END)
fh.seek(0, io.SEEK_END)
return parse_ciff(fh, hlen, fh.tell() - hlen, endian)
return parse_ciff(fh, hlen, fh.tell() - hlen, endian, ignoretags )
else:
else:
raise ValueError('unknown value: %d' % hlen)
raise ValueError('unknown value: %d' % hlen)
@@ -1155,12 +1183,421 @@ if __name__ == '__main__':
else:
else:
pprint.pprint(ci)
pprint.pprint(ci)
import unittest
class SparseCover:
'''Provides a range coverage function. It allows checks if
[pos, cnt) has been added.
'''
def __init__(self, l=[]):
self._sc = []
for i in l:
self.add(*i)
def add(self, pos, cnt):
'''Add [pos, cnt) as present.'''
sc = self._sc
scidx = bisect.bisect_right(sc, (pos, float('inf')))
#print('add:', repr(sc), pos, cnt, scidx)
# possibly extend last entry
if scidx - 1 >= 0:
pos1, cnt1 = sc[scidx - 1]
pos1end = pos1 + cnt1
if pos <= pos1end:
# merge
sc[scidx - 1] = (pos1, max(pos1end - pos1, pos + cnt - pos1))
return
# possibly prefix to entry
posend = pos + cnt
if scidx < len(sc) and posend >= sc[scidx][0]:
pos1, cnt1 = sc[scidx]
sc[scidx] = (pos, max(pos1 + cnt1 - pos, posend - pos))
return
sc[scidx:scidx] = [ (pos, cnt) ]
def _check(self, pos1, cnt1, pos2, cnt2):
# check if 2 is within 1
pos1end = pos1 + cnt1
pos2end = pos2 + cnt2
if pos2 >= pos1 and pos2end <= pos1end:
return True
return False
def tolist(self):
'''Return a copy of the list. Can be used to recreate or
serialize.
That is:
SparseCover(sc.tolist()) == sc
'''
return self._sc[:]
def covered(self, pos, cnt):
'''Check to see if [pos, cnt) is covered.
'''
sc = self._sc
if len(sc) == 0:
return False
scidx = bisect.bisect_right(sc, (pos, float('inf')))
#print('f:', repr(sc), pos, cnt, scidx)
if scidx - 1 >= 0 and self._check(*sc[scidx - 1], pos, cnt):
return True
if scidx >= len(sc):
return False
return False
class _TestSparseCover(unittest.TestCase):
def test_sc_wronginput(self):
# wrong order
self.assertEqual(SparseCover([(10, 1), (1, 5)]).tolist(), [ (1, 5), (10, 1) ])
# over lapping, no extension
self.assertEqual(SparseCover([(10, 10), (11, 5)]).tolist(), [ (10, 10) ])
# extending
self.assertEqual(SparseCover([(10, 10), (11, 10)]).tolist(), [ (10, 11) ])
# prefix overlap
self.assertEqual(SparseCover([(10, 10), (5, 10)]).tolist(), [ (5, 15) ])
# prefix exact
self.assertEqual(SparseCover([(10, 10), (5, 5)]).tolist(), [ (5, 15) ])
def test_sc(self):
sc = SparseCover()
self.assertFalse(sc.covered(5, 10))
sc.add(10, 50)
self.assertFalse(sc.covered(5, 10))
self.assertFalse(sc.covered(50, 50))
self.assertTrue(sc.covered(10, 50))
self.assertTrue(sc.covered(20, 10))
sc.add(100, 50)
self.assertFalse(sc.covered(5, 10))
self.assertFalse(sc.covered(50, 50))
self.assertFalse(sc.covered(50, 100))
self.assertFalse(sc.covered(120, 100))
self.assertFalse(sc.covered(150, 1))
self.assertTrue(sc.covered(20, 10))
self.assertTrue(sc.covered(100, 50))
self.assertTrue(sc.covered(100, 1))
self.assertTrue(sc.covered(149, 1))
self.assertEqual(sc.tolist(), [ (10, 50), (100, 50) ])
sc = SparseCover(sc.tolist())
sc.add(140, 20)
self.assertEqual(sc.tolist(), [ (10, 50), (100, 60) ])
sc.add(1, 5)
self.assertFalse(sc.covered(50, 100))
self.assertTrue(sc.covered(3, 1))
self.assertEqual(sc.tolist(), [ (1, 5), (10, 50), (100, 60) ])
class _FileEmulator:
__real_open = open
__hash_factory = hashlib.sha512
def __init__(self, fname, origfile, emulmetafile):
self._closed = False
self._updateable = False
self._updated = False
self._basefp = None
self._emulmetafile = None
self._pos = 0
self._sc = None
orighash = None
try:
origfp = self.__real_open(origfile, 'rb')
orighash = self._hexdgstfp(origfp)
except FileNotFoundError:
origfp = None
try:
with self.__real_open(emulmetafile) as fp:
emuldata = json.load(fp)
if orighash is not None and emuldata['hash'] != orighash:
raise RuntimeError('hash of emulated date (%s) does not match original file (%s)' % (repr(str(emulmetafile)), repr(str(origfile))))
except FileNotFoundError:
if origfp is None:
raise RuntimeError('emulated metadata not present and original file not present: %s' % repr(str(origfile)))
# orig file exists time to emulate it.
origfp.seek(0, os.SEEK_END)
emuldata = {
'hash': orighash,
'parts': [],
'size': origfp.tell(),
}
# need to make sure this gets written out
self._updated = True
if origfp is not None:
try:
self._basefp = self.__real_open(fname, 'r+b')
except FileNotFoundError:
# if this fails, race lost, try again
self._basefp = self.__real_open(fname, 'x+b')
self._updateable = True
else:
self._basefp = self.__real_open(fname, 'rb')
self._emulmetafile = emulmetafile
self._emuldata = emuldata
self._origfp = origfp
self._sc = SparseCover(emuldata['parts'])
@classmethod
def _hexdgstfp(cls, fp):
fp.seek(0)
dgst = cls.__hash_factory()
d = None
while d != b'':
d = fp.read(64*1024)
dgst.update(d)
return dgst.hexdigest()
def __enter__(self):
return self
@property
def closed(self):
return self._closed
def close(self):
if self._closed:
return
if self._origfp is not None:
self._origfp.close()
self._origfp = None
if self._basefp is not None:
self._basefp.close()
self._basefp = None
self._closed = True
if self._updateable and self._updated:
self._emuldata['parts'] = self._sc.tolist()
with self.__real_open(self._emulmetafile, 'w') as fp:
json.dump(self._emuldata, fp)
print(file=fp)
self._updated = False
self._updateable = False
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __del__(self):
self.close()
def tell(self):
return self._pos
def seek(self, pos, whence=os.SEEK_SET):
if whence == os.SEEK_SET:
self._pos = pos
elif whence == os.SEEK_END:
self._pos = self._emuldata['size'] + pos
else:
raise ValueError('invalid whence: %s' % repr(str(whence)))
def read(self, cnt):
#if cnt > 100*1000:
# import traceback
# traceback.print_stack()
if self.closed:
raise ValueError('read of closed file')
if not self._sc.covered(self._pos, cnt):
if not self._updateable:
raise RuntimeError('dota not present and not updateable')
# read in the data
self._origfp.seek(self._pos)
data = self._origfp.read(cnt)
# write it out
self._basefp.seek(self._pos)
self._basefp.write(data)
self._basefp.flush()
self._sc.add(self._pos, cnt)
self._updated = True
#print('d1:', len(data), repr(data[:50]), (self._pos, cnt), repr(self._basefp))
self._pos += cnt
return data
self._basefp.seek(self._pos)
data = self._basefp.read(cnt)
#print('d2:', len(data), repr(data[:50]), (self._pos, cnt), repr(self._basefp))
self._pos += cnt
return data
class FileMockTest:
'''
Mixin w/ unittest.TestCase to mock open, and store subsets
of files.
The original base files are taken from the attribute fmt_origpath.
The sparse file, and it's meta data will be stores in fmt_basepath.
You can view that fmt_origpath is backing fmt_basepath in that any
missing files/data from fmt_basepath will be obtained from
fmt_origpath.
Usage:
class MyTestCase(FileMockTest, TestCase):
fmt_origpath = pathlib.Path(xxx)
def setUp(self):
super().setUp()
...
Required Attributes:
fmt_basepath - underlying path
fmt_origpa/h - path to original files
Note: if one of the following methods are used, super MUST be
called: setUp, tearDown.
For example, if the class has a setUp function:
class Example(FileMockTest, TestCase):
def setUp(self):
<... custom setup code ...>
super().setUp()
'''
__real_open = open
__ext = '.fmt'
def setUp(self):
super().setUp()
try:
self.fmt_origpath = self.fmt_origpath.resolve()
self.fmt_basepath = self.fmt_basepath.resolve()
except AttributeError:
raise RuntimeError('attribute fmt_origpath not defined on class %s' % self.__class__.__name__)
self.__openmockpatch = unittest.mock.patch(__name__ + '.open')
self.__openmock = self.__openmockpatch.start()
self.__openmock.side_effect = self.__genopen
def __genopen(self, fname, *args, **kwargs):
fname = pathlib.Path(fname)
#print('genop:', repr(fname), repr(self.fmt_basepath), repr(args))
emulmetafile = pathlib.Path(str(fname) + self.__ext)
# 1) not in fmt_basepath
# 2) exists in basepath, but not emulated
if not fname.is_relative_to(self.fmt_basepath.resolve()) or \
fname.exists() and not emulmetafile.exists():
return self.__real_open(fname, *args, **kwargs)
# either base file is missing, or base file is emulated
if not args or args[0] != 'rb':
raise ValueError('can only emulate a file for reading binary')
fname = fname.resolve()
# need to try to pull from original file
bpplen = len(self.fmt_basepath.parts)
assert fname.parts[:bpplen] == self.fmt_basepath.parts
origfile = pathlib.Path(*(self.fmt_origpath.parts + fname.parts[bpplen:]))
return _FileEmulator(fname, origfile, emulmetafile)
def tearDown(self):
try:
self.__openmockpatch.stop()
except Exception:
pass
super().tearDown()
class _TestFileMock(unittest.TestCase):
def xtest_foo(self):
class TC(FileMockTest, unittest.TestCase):
def test_openmocked(self):
self.assertIsNot(open, FileMockTest._real_open)
def test_basicopen(self):
with open('foo', 'r') as fp:
fp.read(10)
loader = unittest.TestLoader()
try:
loader.loadTestsFromTestCase(TC).debug()
except Exception:
import traceback
traceback.print_exc()
raise
class _TestCRW(FileMockTest, unittest.TestCase):
fmt_basepath = pathlib.Path('fixtures')
fmt_origpath = fmt_basepath / 'original'
class _TestCRW(unittest.TestCase):
def setUp(self):
def setUp(self):
self.fixtures = pathlib.Path('fixtures').resolve()
self.fixtures = pathlib.Path('fixtures').resolve()
super().setUp()
def test_tagname(self):
def test_tagname(self):
a = TagName('foo', (1, 2))
a = TagName('foo', (1, 2))
@@ -1278,11 +1715,15 @@ class _TestCRW(unittest.TestCase):
def test_crw(self):
def test_crw(self):
with open(self.fixtures / 'RAW_CANON_G2.CRW', 'rb') as fp:
with open(self.fixtures / 'RAW_CANON_G2.CRW', 'rb') as fp:
ci = idcrw(fp)
ci = idcrw(fp, [ 'CRW_CCDDATA', 'CRW_THMBBIG' ] )
self.assertEqual(ci['CRW_INFO']['INFO_EXPOSEINFO']['CAMERA_MODELID'], (17825792, 2222501223))
self.assertEqual(ci['CRW_INFO']['INFO_EXPOSEINFO']['CAMERA_MODELID'], (17825792, 2222501223))
self.assertEqual(ci.find('CAMERA_MODELID'), (17825792, 2222501223))
self.assertEqual(ci.find('CAMERA_MODELID'), (17825792, 2222501223))
self.assertEqual(ci.find('CAMERA_MAKEMODEL'), ('Canon', 'Canon PowerShot G2'))
self.assertEqual(ci.find('UNKN_ORIGFNAME'), 'CRW_0011.CRW')
self.assertEqual(ci.find('INFO_TARGETTYPE'), (0, 19680, 39964, 10801))
#print(repr(ci))
#print(repr(list(ci.keys())))
#print(repr(list(ci.keys())))
#print(repr(list(ci['CRW_INFO'].keys())))
#print(repr(list(ci['CRW_INFO'].keys())))
#print('unkn:', sorted(TagName._unkn))
#print('unkn:', sorted(TagName._unkn))