#!/usr/bin/env python # Copyright 2006-2007 John-Mark Gurney '''MPEG-TS Handling''' __version__ = '$Change$' # $Id$ tsselpypath = 'mpegts/tssel.py' default_audio_lang = 'eng' import array import itertools import os import sets import struct import sys mpegtspath = 'mpegts' if mpegtspath not in sys.path: sys.path.append(mpegtspath) import mpegts import tssel from DIDLLite import StorageFolder, VideoItem, Resource from FSStorage import FSObject, registerklassfun from twisted.python import log, threadable from twisted.spread import pb from twisted.internet import abstract, process, protocol, reactor from twisted.web import error, http, resource, server class _LimitedFile(file): def __init__(self, *args, **kwargs): self.__size = kwargs['size'] del kwargs['size'] file.__init__(self, *args, **kwargs) def remain(self): pos = self.tell() if pos > self.__size: return 0 return self.__size - pos def read(self, size=-1): if size < 0: return file.read(self, self.remain()) return file.read(self, min(size, self.remain())) def _gennameindexes(chan): ret = [] d = {} for i in chan: t = '%s %s.%s' % (i['name'], i['major'], i['minor']) ret.append(t) d[t] = i return ret, d class MPEGTSTransfer(pb.Viewable): def __init__(self, iterable, request): self.iter = iter(iterable) self.request = request request.registerProducer(self, 0) def resumeProducing(self): if not self.request: return # get data and write to request. try: data = self.iter.next() if data: # this .write will spin the reactor, calling # .doWrite and then .resumeProducing again, so # be prepared for a re-entrant call self.request.write(data) except StopIteration: if self.request: self.request.unregisterProducer() self.request.finish() self.request = None def pauseProducing(self): pass def stopProducing(self): # close zipfile self.request = None # Remotely relay producer interface. def view_resumeProducing(self, issuer): self.resumeProducing() def view_pauseProducing(self, issuer): self.pauseProducing() def view_stopProducing(self, issuer): self.stopProducing() synchronized = ['resumeProducing', 'stopProducing'] threadable.synchronize(MPEGTSTransfer) class DynamTSTransfer(pb.Viewable): def __init__(self, path, pmt, *pids): self.path = path #log.msg("DynamTSTransfer: pmt: %s, pids: %s" % (pmt, pids)) self.pmt = pmt self.pids = pids self.didpat = False def resumeProducing(self): if not self.request: return repcnt = 0 data = self.fp.read(min(abstract.FileDescriptor.bufferSize, self.size - self.written) // 188 * 188) dataarray = array.array('B', data) for i in xrange(0, len(data), 188): if data[i] != 'G': print 'bad sync' continue frst = dataarray[i + 1] pid = (frst & 0x1f) << 8 | dataarray[i + 2] if not frst & 0x40: continue elif not self.didpat and pid == 0: startpmt = i + 4 if ((dataarray[i + 3] >> 4) & 0x3) == 0x3: # Adaptation startpmt += dataarray[startpmt] + 1 startpmt += dataarray[startpmt] + 1 assert data[startpmt] =='\x00', (startpmt, data[i:startpmt + 4]) arraysize = ((dataarray[startpmt + 1] & 0xf) << 8) | dataarray[startpmt + 2] startpmt += 3 arraysize -= 4 # CRC # Remaining fields before array startpmt += 5 arraysize -= 5 for startpmt in xrange(startpmt, min(i + 188 - 3, startpmt + arraysize), 4): prognum, ppid = struct.unpack('>2H', data[startpmt:startpmt + 4]) ppid = ppid & 0x1fff if ppid == self.pmt: break else: raise KeyError, 'unable to find pmt(%d) in pkt: %s' % (pmt, `data[i:i + 188]`) self.pats = itertools.cycle(tssel.genpats( self.pmt, prognum)) self.didpat = True if pid == 0 and self.didpat: assert data[i + 4] =='\x00' and \ data[i + 5] == '\x00', 'error: %s' % `data[i:i + 10]` repcnt += 1 pn = self.pats.next() data = data[:i] + pn + data[i + 188:] if repcnt > 1: print 'repcnt:', repcnt, 'len(data):', len(data) if data: self.written += len(data) self.request.write(data) if self.request and self.fp.tell() == self.size: self.request.unregisterProducer() self.request.finish() self.request = None def pauseProducing(self): pass def stopProducing(self): self.fp.close() self.request = None def render(self, request): path = self.path pmt = self.pmt pids = self.pids self.request = request fsize = size = os.path.getsize(path) request.setHeader('accept-ranges','bytes') request.setHeader('content-type', 'video/mpeg') try: self.fp = open(path) except IOError, e: import errno if e[0] == errno.EACCESS: return error.ForbiddenResource().render(request) else: raise if request.setLastModified(os.path.getmtime(path)) is http.CACHED: return '' trans = True # Commented out because it's totally broken. --jknight 11/29/04 # XXX - fixed? jmg 2/17/06 range = request.getHeader('range') tsize = size if range is not None: # This is a request for partial data... bytesrange = range.split('=') assert bytesrange[0] == 'bytes', \ "Syntactically invalid http range header!" start, end = bytesrange[1].split('-', 1) if start: start = int(start) self.fp.seek(start) if end and int(end) < size: end = int(end) else: end = size - 1 else: lastbytes = int(end) if size < lastbytes: lastbytes = size start = size - lastbytes self.fp.seek(start) fsize = lastbytes end = size - 1 start = start // 188 * 188 self.fp.seek(start) size = (end + 1) // 188 * 188 fsize = end - int(start) + 1 # start is the byte offset to begin, and end is the # byte offset to end.. fsize is size to send, tsize # is the real size of the file, and size is the byte # position to stop sending. if fsize <= 0: request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE ) fsize = tsize trans = False else: request.setResponseCode(http.PARTIAL_CONTENT) request.setHeader('content-range',"bytes %s-%s/%s " % ( str(start), str(end), str(tsize))) request.setHeader('content-length', str(fsize)) if request.method == 'HEAD' or trans is False: request.method = 'HEAD' return '' self.size = tsize self.written = 0 request.registerProducer(self, 0) return server.NOT_DONE_YET class MPEGTSResource(resource.Resource): isLeaf = True def __init__(self, *args): resource.Resource.__init__(self) self.args = args def render(self, request): request.setHeader('content-type', 'video/mpeg') # return data return DynamTSTransfer(*self.args).render(request) class MPEGTS(FSObject, VideoItem): def __init__(self, *args, **kwargs): self.path = path = kwargs['path'] del kwargs['path'] self.tvct = tvct = kwargs['tvct'] del kwargs['tvct'] #log.msg('tvct w/ keys:', tvct, tvct.keys()) kwargs['content'] = MPEGTSResource(path, tvct['PMTpid'], *sum(mpegts.getaudiovideopids(tvct['PMT']), [])) VideoItem.__init__(self, *args, **kwargs) FSObject.__init__(self, path) self.url = '%s/%s' % (self.cd.urlbase, self.id) self.res = Resource(self.url, 'http-get:*:video/mpeg:*') def doUpdate(self): pass class MultiMPEGTS(FSObject, StorageFolder): def __init__(self, *args, **kwargs): path = kwargs['path'] del kwargs['path'] StorageFolder.__init__(self, *args, **kwargs) FSObject.__init__(self, path) # mapping from path to objectID self.pathObjmap = {} def doUpdate(self): f = mpegts.TSPStream(_LimitedFile(self.FSpath, size= 2*1024*1024)) self.tvct = mpegts.GetTVCT(f) doupdate = False origchildren, toindex = _gennameindexes(self.tvct['channels']) #log.msg('MultiMPEGTS doUpdate: tvct: %s' % self.tvct) children = sets.Set(origchildren) for i in self.pathObjmap.keys(): if i not in children: doupdate = True # delete self.cd.delItem(self.pathObjmap[i]) del self.pathObjmap[i] for i in origchildren: if i in self.pathObjmap: continue # new object if toindex[i]['prog_num'] == 0: log.msg('bogus:', toindex[i]) continue #log.msg('real tvct:', toindex[i], toindex.keys(), # self.tvct) self.pathObjmap[i] = self.cd.addItem(self.id, MPEGTS, i, path = self.FSpath, tvct = toindex[i]) doupdate = True if doupdate: StorageFolder.doUpdate(self) def detectmpegts(path, fobj): f = mpegts.TSPStream(_LimitedFile(path, size= 2*1024*1024)) tvct = mpegts.GetTVCT(f) if len(tvct['channels']) == 1: #return None, None # We might reenable this once we have pid filtering working # fast enough. return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] } elif len(tvct['channels']) > 1: #log.msg('MultiMPEGTS: path: %s' % path) return MultiMPEGTS, { 'path': path } return None, None registerklassfun(detectmpegts)