|
- import array
- import os.path
- import string
- import UserDict
-
- from ctypes import *
-
- __all__ = [ 'FLACDec' ]
-
- # Find out if we need to endian swap the buffer
- def _islittleendian():
- '''Run a check to see if the box is little endian or not.'''
-
- t = array.array('H', '\x00\x01')
- if t[0] == 1:
- return False
- else:
- return True
-
- is_little_endian = _islittleendian()
-
- inter = {}
-
- try:
- dname = os.path.dirname(__file__)
- if not dname:
- dname = '.'
- path = os.path.join(dname, '_interleave.so')
- interleavelib = CDLL(path)
- for i in (16, ):
- f = getattr(interleavelib, 'interleave%d' % i)
- f.restype = None
- # bigendian, nchan, chanmap, chansamps, data, out
- outtype = locals()['c_int%d' % i]
- f.argtypes = [ c_int, POINTER(c_int), c_int,
- POINTER(POINTER(c_int32)), POINTER(outtype), ]
- inter[i] = outtype, f
- except OSError:
- import warnings
- warnings.warn('Using slow interleaver, compile the _interleave.so module to improve performance.')
-
- inter[16] = None, lambda nchan, chanmap, blksize, inbuf, ignore: \
- array.array('h', [ inbuf[chanmap[x % nchan]][x / nchan] for x in
- xrange(blksize * nchan)])
-
- flaclib = CDLL('libFLAC.so')
-
- # Defines
-
- FLAC__METADATA_TYPE_STREAMINFO = 0
- FLAC__METADATA_TYPE_PADDING = 1
- FLAC__METADATA_TYPE_APPLICATION = 2
- FLAC__METADATA_TYPE_SEEKTABLE = 3
- FLAC__METADATA_TYPE_VORBIS_COMMENT = 4
- FLAC__METADATA_TYPE_CUESHEET = 5
- FLAC__METADATA_TYPE_PICTURE = 6
- FLAC__METADATA_TYPE_UNDEFINED = 7
-
- FLAC__MAX_CHANNELS = 8
- FLAC__MAX_LPC_ORDER = 32
- FLAC__MAX_FIXED_ORDER = 4
-
- FLAC__byte = c_uint8
- FLAC__byte_p = POINTER(FLAC__byte)
-
- # Data
- FLAC__StreamDecoderStateString = (c_char_p * 10).in_dll(flaclib,
- 'FLAC__StreamDecoderStateString')
- FLAC__StreamDecoderInitStatusString = (c_char_p * 6).in_dll(flaclib,
- 'FLAC__StreamDecoderInitStatusString')
- FLAC__StreamDecoderErrorStatusString = (c_char_p * 4).in_dll(flaclib,
- 'FLAC__StreamDecoderErrorStatusString')
- FLAC__StreamMetadata_Picture_TypeString = (c_char_p * 21).in_dll(flaclib,
- 'FLAC__StreamMetadata_Picture_TypeString')
-
- # Enums
- FLAC__STREAM_DECODER_SEARCH_FOR_METADATA = 0
- FLAC__STREAM_DECODER_READ_METADATA = 1
- FLAC__STREAM_DECODER_SEARCH_FOR_FRAME_SYNC = 2
- FLAC__STREAM_DECODER_READ_FRAME = 3
- FLAC__STREAM_DECODER_END_OF_STREAM = 4
- FLAC__STREAM_DECODER_OGG_ERROR = 5
- FLAC__STREAM_DECODER_SEEK_ERROR = 6
- FLAC__STREAM_DECODER_ABORTED = 7
- FLAC__STREAM_DECODER_MEMORY_ALLOCATION_ERROR = 8
- FLAC__STREAM_DECODER_UNINITIALIZED = 9
-
- FLAC__STREAM_DECODER_INIT_STATUS_OK = 0
- FLAC__FRAME_NUMBER_TYPE_FRAME_NUMBER = 0
- FLAC__FRAME_NUMBER_TYPE_SAMPLE_NUMBER = 1
- FLAC__SUBFRAME_TYPE_CONSTANT = 0
- FLAC__SUBFRAME_TYPE_VERBATIM = 1
- FLAC__SUBFRAME_TYPE_FIXED = 2
- FLAC__SUBFRAME_TYPE_LPC = 3
-
- FLAC__STREAM_METADATA_PICTURE_TYPE_OTHER = 0 # < Other */
- FLAC__STREAM_METADATA_PICTURE_TYPE_FILE_ICON_STANDARD = 1 # < 32x32 pixels 'file icon' (PNG only) */
- FLAC__STREAM_METADATA_PICTURE_TYPE_FILE_ICON = 2 # < Other file icon */
- FLAC__STREAM_METADATA_PICTURE_TYPE_FRONT_COVER = 3 # < Cover (front) */
- FLAC__STREAM_METADATA_PICTURE_TYPE_BACK_COVER = 4 # < Cover (back) */
- FLAC__STREAM_METADATA_PICTURE_TYPE_LEAFLET_PAGE = 5 # < Leaflet page */
- FLAC__STREAM_METADATA_PICTURE_TYPE_MEDIA = 6 # < Media (e.g. label side of CD) */
- FLAC__STREAM_METADATA_PICTURE_TYPE_LEAD_ARTIST = 7 # < Lead artist/lead performer/soloist */
- FLAC__STREAM_METADATA_PICTURE_TYPE_ARTIST = 8 # < Artist/performer */
- FLAC__STREAM_METADATA_PICTURE_TYPE_CONDUCTOR = 9 # < Conductor */
- FLAC__STREAM_METADATA_PICTURE_TYPE_BAND = 10 # < Band/Orchestra */
- FLAC__STREAM_METADATA_PICTURE_TYPE_COMPOSER = 11 # < Composer */
- FLAC__STREAM_METADATA_PICTURE_TYPE_LYRICIST = 12 # < Lyricist/text writer */
- FLAC__STREAM_METADATA_PICTURE_TYPE_RECORDING_LOCATION = 13 # < Recording Location */
- FLAC__STREAM_METADATA_PICTURE_TYPE_DURING_RECORDING = 14 # < During recording */
- FLAC__STREAM_METADATA_PICTURE_TYPE_DURING_PERFORMANCE = 15 # < During performance */
- FLAC__STREAM_METADATA_PICTURE_TYPE_VIDEO_SCREEN_CAPTURE = 16 # < Movie/video screen capture */
- FLAC__STREAM_METADATA_PICTURE_TYPE_FISH = 17 # < A bright coloured fish */
- FLAC__STREAM_METADATA_PICTURE_TYPE_ILLUSTRATION = 18 # < Illustration */
- FLAC__STREAM_METADATA_PICTURE_TYPE_BAND_LOGOTYPE = 19 # < Band/artist logotype */
- FLAC__STREAM_METADATA_PICTURE_TYPE_PUBLISHER_LOGOTYPE = 20 # < Publisher/Studio logotype */
-
- pictnamemap = dict(enumerate(('other', 'icon', 'icon', 'cover', 'backcover',
- 'leaflet', 'media', 'lead', 'artist', 'conductor', 'band', 'composer',
- 'lyricist', 'location', 'recording', 'performance', 'video', 'fish',
- 'illustration', 'bandlogo', 'publogo')))
-
- OrigStructure = Structure
- class Structure(Structure):
- def getarray(self, base):
- base_array = getattr(self, base)
- return [ base_array[x] for x in xrange(getattr(self,
- 'num_' + base)) ]
-
- def __getattr__(self, k):
- if k[-6:] == '_array':
- return self.getarray(k[:-6])
-
- raise AttributeError(k)
-
- def asobj(self):
- r = {}
- #print 'asobj:', `self`
- if hasattr(self, '_material_'):
- flds = self._material_
- else:
- flds = self._fields_
-
- for i in flds:
- attr = i[0]
- obj = getattr(self, attr)
- if attr[-6:] == '_array':
- #print 'asarraycnt'
- v = [ x.asobj() for x in
- self.getarray(attr[:-6]) ]
- elif isinstance(obj, Structure):
- #print 'asstruct'
- v = obj.asobj()
- elif isinstance(obj, Array) and obj._type_ != c_char:
- #print 'asarray'
- v = obj[:]
- else:
- #print 'asobj'
- v = obj
- r[attr] = v
-
- return r
-
- def __repr__(self, fields=None):
- cls = self.__class__
- if fields is None:
- if hasattr(self, '_material_'):
- fields = self._material_
- else:
- fields = self._fields_
- return '<%s.%s: %s>' % (cls.__module__, cls.__name__,
- ', '.join([ '%s: %s' % (x[0], `getattr(self, x[0])`)
- for x in fields ]))
-
- class FLAC__StreamMetadata_StreamInfo(Structure):
- _fields_ = [ ('min_blocksize', c_uint),
- ('max_blocksize', c_uint),
- ('min_framesize', c_uint),
- ('max_framesize', c_uint),
- ('sample_rate', c_uint),
- ('channels', c_uint),
- ('bits_per_sample', c_uint),
- ('total_samples', c_uint64),
- ('md5sum', c_ubyte * 16),
- ]
-
- class FLAC__StreamMetadata_VorbisComment_Entry(Structure):
- _fields_ = [ ('length', c_uint32),
- ('entry', POINTER(c_char)), # string bounded by length
- ]
-
- def asstring(self):
- return self.entry[:self.length].decode('utf-8')
-
- def __repr__(self):
- return '<FLAC__StreamMetadata_VorbisComment_Entry: %s>' % \
- `self.asstring()`
-
- def makestrrange(a, b):
- '''Make a string w/ characters from a through b (inclusive).'''
-
- return ''.join(chr(x) for x in xrange(a, b + 1))
-
- class VorbisComments(UserDict.DictMixin):
- def __init__(self, vc=()):
- d = self._dict = {}
-
- for i in vc:
- k, v = i.split('=', 1)
- try:
- self[k].append(v)
- except KeyError:
- d[self.makevalidkey(k)] = [ v ]
-
- transtable = string.maketrans(makestrrange(0x41, 0x5a),
- makestrrange(0x61, 0x7a))
- delchars = makestrrange(0, 0x1f) + '=' + makestrrange(0x7e, 0x7f)
- @staticmethod
- def makevalidkey(k):
- k = str(k)
- origlen = len(k)
- k = k.translate(VorbisComments.transtable,
- VorbisComments.delchars)
- if len(k) != origlen:
- raise ValueError('Invalid key')
-
- return k
-
- def __getitem__(self, k):
- return self._dict[self.makevalidkey(k)]
-
- def __setitem__(self, k, v):
- # XXX - allow? check v?
- return self._dict.__setitem__(self.makevalidkey(k), v)
-
- def __delitem__(self, k):
- return self._dict.__delitem__(self.makevalidkey(k))
-
- def keys(self):
- return self._dict.keys()
-
- class FLAC__StreamMetadata_VorbisComment(Structure):
- _fields_ = [ ('vendor_string',
- FLAC__StreamMetadata_VorbisComment_Entry),
- ('num_comments', c_uint32),
- ('comments', POINTER(FLAC__StreamMetadata_VorbisComment_Entry)),
- ]
-
- _material_ = (('vendor_string', ), ('comments_array', ))
- def asobj(self):
- return self.vendor_string.asstring(), \
- VorbisComments(x.asstring() for x in
- self.getarray('comments'))
-
- class FLAC__StreamMetadata_CueSheet_Index(Structure):
- _fields_ = [ ('offset', c_uint64),
- ('number', c_ubyte),
- ]
-
- class FLAC__StreamMetadata_CueSheet_Track(Structure):
- _fields_ = [ ('offset', c_uint64),
- ('number', c_ubyte),
- ('isrc', c_char * 13), # string + NUL
- ('type', c_ubyte, 1),
- ('pre_emphasis', c_ubyte, 1),
- ('num_indices', c_ubyte),
- ('indices', POINTER(FLAC__StreamMetadata_CueSheet_Index)),
- ]
- _material_ = (('offset', ), ('number', ), ('isrc', ), ('type', ),
- ('pre_emphasis', ), ('indices_array', ))
-
- class FLAC__StreamMetadata_CueSheet(Structure):
- _fields_ = [ ('media_catalog_number', c_char * 129), # string + nul
- ('lead_in', c_uint64),
- ('is_cd', c_int),
- ('num_tracks', c_uint),
- ('tracks', POINTER(FLAC__StreamMetadata_CueSheet_Track)),
- ]
- _material_ = (('media_catalog_number', ), ('lead_in', ), ('is_cd', ),
- ('tracks_array', ))
-
- class FLAC__StreamMetadata_Picture(Structure):
- _fields_ = [ ('type', c_int),
- ('mime_type', c_char_p),
-
- # description is listed as FLAC__byte_p, but
- # documented as NUL terminated UTF-8 string
- ('description', c_char_p),
- ('width', c_uint32),
- ('height', c_uint32),
- ('depth', c_uint32),
- ('colors', c_uint32),
- ('data_length', c_uint32),
- ('data', FLAC__byte_p),
- ]
-
- _material_ = (('type', ), ('mime_type', ), ('width', ), ('height', ), ('depth', ), ('colors', ))
-
- def asobj(self):
- return (self.type, self.mime_type,
- self.description.decode('utf-8'), self.width, self.height,
- self.depth, self.colors,
- ''.join(chr(x) for x in self.data[:self.data_length]))
-
- class FLAC__StreamMetadataData(Union):
- _fields_ = [ ('stream_info', FLAC__StreamMetadata_StreamInfo),
- ('vorbis_comment', FLAC__StreamMetadata_VorbisComment),
- ('cue_sheet', FLAC__StreamMetadata_CueSheet),
- ('picture', FLAC__StreamMetadata_Picture),
- ]
-
- class FLAC__StreamMetadata(Structure):
- _fields_ = [ ('type', c_int),
- ('is_last', c_int),
- ('length', c_uint),
- ('data', FLAC__StreamMetadataData),
- ]
-
- class FLAC__EntropyCodingMethod_PartitionedRiceContents(Structure):
- pass
-
- class FLAC__EntropyCodingMethod_PartitionedRice(Structure):
- _fields_ = [ ('order', c_uint),
- ('contents',
- POINTER(FLAC__EntropyCodingMethod_PartitionedRiceContents)),
- ]
-
- class FLAC__EntropyCodingMethod(Structure):
- _fields_ = [ ('type', c_int),
- ('partitioned_rice', FLAC__EntropyCodingMethod_PartitionedRice),
- ]
-
- class FLAC__Subframe_Constant(Structure):
- _fields_ = [ ('value', c_int32), ]
-
- class FLAC__Subframe_Verbatim(Structure):
- _fields_ = [ ('data', POINTER(c_int32)), ]
-
- class FLAC__Subframe_Fixed(Structure):
- _fields_ = [ ('entropy_coding_method', FLAC__EntropyCodingMethod),
- ('order', c_uint),
- ('warmup', c_int32 * FLAC__MAX_FIXED_ORDER),
- ('residual', POINTER(c_int32)),
- ]
-
- class FLAC__Subframe_LPC(Structure):
- _fields_ = [ ('entropy_coding_method', FLAC__EntropyCodingMethod),
- ('order', c_uint),
- ('qlp_coeff_precision', c_uint),
- ('quantization_level', c_int),
- ('qlp_coeff', c_int32 * FLAC__MAX_LPC_ORDER),
- ('warmup', c_int32 * FLAC__MAX_LPC_ORDER),
- ('residual', POINTER(c_int32)),
- ]
-
- class FLAC__SubframeUnion(Union):
- _fields_ = [ ('constant', FLAC__Subframe_Constant),
- ('fixed', FLAC__Subframe_Fixed),
- ('lpc', FLAC__Subframe_LPC),
- ('verbatim', FLAC__Subframe_Verbatim),
- ]
-
- class FLAC__Subframe(Structure):
- _fields_ = [ ('type', c_int),
- ('data', FLAC__SubframeUnion),
- ('wasted_bits', c_uint),
- ]
-
- class number_union(Union):
- _fields_ = [ ('frame_number', c_uint32),
- ('sample_number', c_uint64),
- ]
-
- class FLAC__FrameHeader(Structure):
- _fields_ = [ ('blocksize', c_uint),
- ('sample_rate', c_uint),
- ('channels', c_uint),
- ('channel_assignment', c_int),
- ('bits_per_sample', c_uint),
- ('number_type', c_int),
- ('number', number_union),
- ('crc', c_uint8),
- ]
-
- class FLAC__FrameFooter(Structure):
- _fields_ = [ ('crc', c_uint16), ]
-
- class FLAC__Frame(Structure):
- _fields_ = [ ('header', FLAC__FrameHeader),
- ('subframes', FLAC__Subframe * FLAC__MAX_CHANNELS),
- ('footer', FLAC__FrameFooter),
- ]
-
- class FLAC__StreamDecoder(Structure):
- pass
-
- # Types
-
- FLAC__StreamMetadata_p = POINTER(FLAC__StreamMetadata)
- FLAC__Frame_p = POINTER(FLAC__Frame)
- FLAC__StreamDecoder_p = POINTER(FLAC__StreamDecoder)
-
- # Function typedefs
- FLAC__StreamDecoderReadCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p,
- POINTER(c_ubyte), POINTER(c_size_t), c_void_p)
- FLAC__StreamDecoderSeekCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p,
- c_uint64, c_void_p)
- FLAC__StreamDecoderTellCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p,
- POINTER(c_uint64), c_void_p)
- FLAC__StreamDecoderLengthCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p,
- POINTER(c_uint64), c_void_p)
- FLAC__StreamDecoderEofCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p,
- c_void_p)
- FLAC__StreamDecoderWriteCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p,
- FLAC__Frame_p, POINTER(POINTER(c_int32)), c_void_p)
- FLAC__StreamDecoderMetadataCallback = CFUNCTYPE(None, FLAC__StreamDecoder_p,
- FLAC__StreamMetadata_p, c_void_p)
- FLAC__StreamDecoderErrorCallback = CFUNCTYPE(None, FLAC__StreamDecoder_p,
- c_int, c_void_p)
-
- funs = {
- 'FLAC__stream_decoder_new': (FLAC__StreamDecoder_p, []),
- 'FLAC__stream_decoder_delete': (None, [FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_set_md5_checking': (c_int,
- [ FLAC__StreamDecoder_p, c_int, ]),
- 'FLAC__stream_decoder_set_metadata_respond': (c_int,
- [ FLAC__StreamDecoder_p, c_int, ]),
- 'FLAC__stream_decoder_set_metadata_respond_all': (c_int,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_state': (c_int,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_total_samples': (c_uint64,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_channels': (c_uint,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_channel_assignment': (c_int,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_bits_per_sample': (c_uint,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_sample_rate': (c_uint,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_get_blocksize': (c_uint,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_init_stream': (c_int, [ FLAC__StreamDecoder_p,
- FLAC__StreamDecoderReadCallback, FLAC__StreamDecoderSeekCallback,
- FLAC__StreamDecoderTellCallback, FLAC__StreamDecoderLengthCallback,
- FLAC__StreamDecoderEofCallback, FLAC__StreamDecoderWriteCallback,
- FLAC__StreamDecoderMetadataCallback,
- FLAC__StreamDecoderErrorCallback, ]),
- 'FLAC__stream_decoder_init_file': (c_int, [ FLAC__StreamDecoder_p,
- c_char_p, FLAC__StreamDecoderWriteCallback,
- FLAC__StreamDecoderMetadataCallback,
- FLAC__StreamDecoderErrorCallback, c_void_p, ]),
- 'FLAC__stream_decoder_finish': (c_int, [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_flush': (c_int, [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_reset': (c_int, [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_process_single': (c_int,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_process_until_end_of_metadata': (c_int,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_process_until_end_of_stream': (c_int,
- [ FLAC__StreamDecoder_p, ]),
- 'FLAC__stream_decoder_seek_absolute': (c_int,
- [ FLAC__StreamDecoder_p, c_uint64, ]),
- }
-
- for i in funs:
- f = getattr(flaclib, i)
- f.restype, f.argtypes = funs[i]
-
- def clientdatawrapper(fun):
- def newfun(*args):
- #print 'nf:', `args`
- return fun(*args[1:-1])
-
- return newfun
-
- def getindex(ar, idx):
- for i in ar:
- if idx == i['number']:
- return i
-
- raise ValueError('index %d not present: %s' % (idx, `ar`))
-
- def _checksum(n):
- ret = 0
-
- while n > 0:
- n, m = divmod(n, 10)
- ret += m
-
- return ret
-
- def _cuetotrackinfo(cuesheet):
- '''XXX - do not use. This will not work because the cuesheet does
- not include data tracks!'''
- if not cuesheet['is_cd']:
- raise ValueError('cuesheet isn\'t for a cd')
-
- tracks = []
- ta = cuesheet['tracks_array']
- tot = 0
- cksum = 0
- for i in xrange(1, len(ta)):
- offstart = ta[i - 1]['offset']
- offend = ta[i]['offset']
- secs = offend / 44100 - offstart / 44100
- #print ta[i - 1]['number'], secs, offstart / (2352 / 4), (offend - offstart) / (2352 / 4)
- tot += secs
- cksum += _checksum(offstart / 44100 + 2)
-
- #print tot
- tracks.append(divmod(tot, 60))
-
- #print `tracks`
- return (long(cksum) % 0xff) << 24 | tot << 8 | (len(ta) - 1), 0
-
- class FLACDec(object):
- '''Class to support flac decoding.'''
-
- channels = property(lambda x: x._channels)
- samplerate = property(lambda x: x._samplerate)
- bitspersample = property(lambda x: x._bitspersample)
- totalsamples = property(lambda x: x._totalsamples)
- bytespersample = property(lambda x: x._bytespersample)
- tags = property(lambda x: x._tags)
- vorbis = property(lambda x: x._vorbis)
- cuesheet = property(lambda x: x._cuesheet)
- pictures = property(lambda x: x._pictures)
-
- def __len__(self):
- return self._total_samps
-
- def __init__(self, file):
- '''Pass in the file name. We currently do not support file objects.'''
-
- self.flacdec = None
- self._lasterror = None
-
- # We need to keep references to the callback functions
- # around so they won't be garbage collected.
- self.write_wrap = FLAC__StreamDecoderWriteCallback(
- clientdatawrapper(self._cb_write))
- self.metadata_wrap = FLAC__StreamDecoderMetadataCallback(
- clientdatawrapper(self._cb_metadata))
- self.error_wrap = FLAC__StreamDecoderErrorCallback(
- clientdatawrapper(self._cb_error))
-
- self.flacdec = flaclib.FLAC__stream_decoder_new()
- if self.flacdec == 0:
- raise RuntimeError('allocating decoded')
-
- flaclib.FLAC__stream_decoder_set_md5_checking(self.flacdec,
- True)
- for i in (FLAC__METADATA_TYPE_VORBIS_COMMENT,
- FLAC__METADATA_TYPE_CUESHEET, FLAC__METADATA_TYPE_PICTURE):
- flaclib.FLAC__stream_decoder_set_metadata_respond(
- self.flacdec, i)
-
- status = flaclib.FLAC__stream_decoder_init_file(self.flacdec,
- file, self.write_wrap, self.metadata_wrap,
- self.error_wrap, None)
- if status != FLAC__STREAM_DECODER_INIT_STATUS_OK:
- raise ValueError(
- FLAC__StreamDecoderInitStatusString[status])
-
- self._tags = {}
- self._vorbis = None
- self._cuesheet = None
- self._pictures = {}
-
- flaclib.FLAC__stream_decoder_process_until_end_of_metadata(
- self.flacdec)
- if self._lasterror is not None:
- raise ValueError(
- FLAC__StreamDecoderErrorStatusString[
- self._lasterror])
- self.curcnt = 0
- self.cursamp = 0
- if self.vorbis is None:
- self.vorbis = '', VorbisComments()
-
- def close(self):
- '''Finish decoding and close all the objects.'''
-
- if self.flacdec is None:
- return
-
- #print 'close called'
- if not flaclib.FLAC__stream_decoder_finish(self.flacdec):
- md5invalid = True
- else:
- md5invalid = False
-
- flaclib.FLAC__stream_decoder_delete(self.flacdec)
- self.flacdec = None
- self.write_wrap = None
- self.metadata_wrap = None
- self.error_wrap = None
- if md5invalid:
- pass
- #raise ValueError('invalid md5')
-
- def _cb_write(self, frame_p, buffer_pp):
- frame = frame_p[0]
- #print 'write:', `frame`
- #for i in xrange(frame.header.channels):
- # print '%d:' % i, `frame.subframes[i]`
-
- nchan = frame.header.channels
- #print 'sample number:', frame.header.number.sample_number
- self.cursamp = frame.header.number.sample_number
- self.curcnt = frame.header.blocksize
- outtype, fun = inter[frame.header.bits_per_sample]
- if outtype is None:
- outbuf = None
- else:
- outbuf = (outtype * (nchan * frame.header.blocksize))()
- # nchan, chanmap, chansamps, data, out
- assert nchan == 2
- r = fun(nchan, (c_int * 2)(0, 1), frame.header.blocksize,
- buffer_pp, outbuf)
- if outtype is None:
- self.curdata = r
- else:
- self.curdata = array.array('h', outbuf)
-
- if is_little_endian:
- self.curdata.byteswap()
-
- return 0
-
- def _cb_metadata(self, metadata_p):
- md = metadata_p[0]
- #print 'metadata:', `md`
- if md.type == FLAC__METADATA_TYPE_STREAMINFO:
- si = md.data.stream_info
- self._channels = si.channels
- self._samplerate = si.sample_rate
- self._bitspersample = si.bits_per_sample
- self._totalsamples = si.total_samples
- self._bytespersample = si.channels * \
- si.bits_per_sample / 8
- #print `si`
- elif md.type == FLAC__METADATA_TYPE_VORBIS_COMMENT:
- self._vorbis = md.data.vorbis_comment.asobj()
- self._tags = self.vorbis[1]
- #print 'vc:', `md.data.vorbis_comment`
- #print 'v:', `self.vorbis`
- elif md.type == FLAC__METADATA_TYPE_CUESHEET:
- self._cuesheet = md.data.cue_sheet.asobj()
- #print 'cs:', `md.data.cue_sheet`
- #print 'c:', `self.cuesheet`
- elif md.type == FLAC__METADATA_TYPE_PICTURE:
- pict = md.data.picture
- #print 'p:', `md.data.picture`
- #print 'po:', `pict.asobj()`
- self._pictures.setdefault(pictnamemap[pict.type],
- []).append(pict.asobj())
- #print 'pd:', `self._pictures`
- else:
- print 'unknown metatype:', md.type
-
- def _cb_error(self, errstatus):
- #print 'error:', `errstatus`
- self._lasterror = errstatus
-
- def getstate(self):
- state = flaclib.FLAC__stream_decoder_get_state(self.flacdec)
- return state
-
- state = property(getstate)
-
- def goto(self, pos):
- '''Go to sample possition.'''
-
- if self.flacdec is None:
- raise ValueError('closed')
-
- pos = min(self.totalsamples - 1, pos)
- if flaclib.FLAC__stream_decoder_seek_absolute(self.flacdec,
- pos):
- return
-
- # the slow way
-
- state = self.state
- if state == FLAC__STREAM_DECODER_SEEK_ERROR:
- print 'WARNING: possibly invalid file!'
-
- if self.cursamp > pos or \
- state == FLAC__STREAM_DECODER_SEEK_ERROR:
- if not flaclib.FLAC__stream_decoder_reset(self.flacdec):
- raise RuntimeError('unable to seek to beginin')
- flaclib.FLAC__stream_decoder_process_until_end_of_metadata(self.flacdec)
- flaclib.FLAC__stream_decoder_process_single(
- self.flacdec)
-
- read = pos - self.cursamp
- while read:
- tread = min(read, 512*1024)
- self.read(tread)
- read -= tread
-
- def read(self, nsamp=16*1024, oneblk=False):
- '''If oneblk is True, we will only process data once.'''
-
- if self.flacdec is None:
- raise ValueError('closed')
-
- r = []
- nsamp = min(nsamp, self.totalsamples - self.cursamp)
- nchan = self.channels
- while nsamp:
- if self.curcnt == 0:
- flaclib.FLAC__stream_decoder_process_single(
- self.flacdec)
- continue
-
- cnt = min(nsamp, self.curcnt)
- sampcnt = cnt * nchan
- r.append(self.curdata[:sampcnt].tostring())
-
- self.cursamp += cnt
- self.curcnt -= cnt
- self.curdata = self.curdata[sampcnt:]
- nsamp -= cnt
-
- if oneblk:
- break
-
- return ''.join(r)
-
- def doprofile(d):
- def readtoend():
- while d.read():
- pass
-
- import hotshot.stats
-
- prof = hotshot.Profile('decread.prof')
- prof.runcall(readtoend)
- prof.close()
- stats = hotshot.stats.load('decread.prof')
- stats.strip_dirs()
- stats.sort_stats('time', 'calls')
- stats.print_stats(20)
-
- if __name__ == '__main__':
- import sys
-
- if len(sys.argv) == 1:
- print 'Usage: %s <file>' % sys.argv[0]
- sys.exit(1)
-
- d = FLACDec(sys.argv[1])
- print 'total samples:', d.totalsamples
- print 'channels:', d.channels
- print 'rate:', d.samplerate
- print 'bps:', d.bitspersample
- print 'pict types:', d.pictures.keys()
- print `d.read(10)`
- print 'going'
- d.goto(d.totalsamples - 1000*1000)
- print 'here'
- print `d.read(10)`
- doprofile(d)
|