From aa594de14a2f34309b38aae03ea25e80d172d89c Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Mon, 12 Nov 2007 08:51:18 -0800 Subject: [PATCH] make seeking possible on mpegts files... This does some magic to simply replace the pat w/ only the program we want, but keeps all the data there... it wastes a bit of bandwidth, but makes the file seekable, which is good for the media player.. We need to extend this so that we generate the PAT's first, and then start reading/writing the file so we don't pass a PAT unchanged.. [git-p4: depot-paths = "//depot/": change = 1096] --- mpegtsmod.py | 155 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 125 insertions(+), 30 deletions(-) diff --git a/mpegtsmod.py b/mpegtsmod.py index 0863051..b3fffb7 100644 --- a/mpegtsmod.py +++ b/mpegtsmod.py @@ -8,8 +8,11 @@ __version__ = '$Change$' tsselpypath = 'mpegts/tssel.py' default_audio_lang = 'eng' +import array +import itertools import os import sets +import struct import sys mpegtspath = 'mpegts' @@ -23,8 +26,8 @@ from FSStorage import FSObject, registerklassfun from twisted.python import log, threadable from twisted.spread import pb -from twisted.internet import process, protocol, reactor -from twisted.web import resource, server +from twisted.internet import abstract, process, protocol, reactor +from twisted.web import error, http, resource, server class _LimitedFile(file): def __init__(self, *args, **kwargs): @@ -100,39 +103,68 @@ class MPEGTSTransfer(pb.Viewable): threadable.synchronize(MPEGTSTransfer) -class DynamTSTransfer(protocol.ProcessProtocol): +class DynamTSTransfer(pb.Viewable): def __init__(self, path, pmt, *pids): self.path = path + #log.msg("DynamTSTransfer: pmt: %s, pids: %s" % (pmt, pids)) self.pmt = pmt self.pids = pids + self.didpmt = False - def outReceived(self, data): - self.request.write(data) + def resumeProducing(self): + if not self.request: + return - def outConnectionLost(self): - if self.request: + repcnt = 0 + data = self.fp.read(min(abstract.FileDescriptor.bufferSize, + self.size - self.written) // 188 * 188) + dataarray = array.array('B', data) + for i in xrange(0, len(data), 188): + if data[i] != 'G': + print 'bad sync' + continue + frst = dataarray[i + 1] + pid = (frst & 0x1f) << 8 | dataarray[i + 2] + if not frst & 0x40: + continue + elif not self.didpmt and pid == self.pmt: + startpmt = 4 + if ((dataarray[i + 3] >> 4) & 0x3) == 0x3: + # Adaptation + startpmt += dataarray[i + startpmt] + 1 + startpmt += dataarray[i + startpmt] + 1 + assert data[i + startpmt] =='\x02', (startpmt, + data[i:i + 20]) + self.pats = itertools.cycle(tssel.genpats( + self.pmt, struct.unpack('>H', data[i + + startpmt + 3:i + startpmt + 5])[0])) + self.didpmt = True + + if pid == 0 and self.didpmt: + assert data[i + 4] =='\x00' and \ + data[i + 5] == '\x00', 'error: %s' % `data[i:i + 10]` + repcnt += 1 + pn = self.pats.next() + data = data[:i] + pn + data[i + + 188:] + + if repcnt > 1: + print 'repcnt:', repcnt, 'len(data):', len(data) + + if data: + self.written += len(data) + self.request.write(data) + if self.request and self.fp.tell() == self.size: self.request.unregisterProducer() self.request.finish() self.request = None - def errReceived(self, data): + def pauseProducing(self): pass - #log.msg(data) def stopProducing(self): - if self.request: - self.request.unregisterProducer() - self.request.finish() - - if self.proc: - self.proc.loseConnection() - self.proc.signalProcess('INT') - + self.fp.close() self.request = None - self.proc = None - - pauseProducing = lambda x: x.proc.pauseProducing() - resumeProducing = lambda x: x.proc.resumeProducing() def render(self, request): path = self.path @@ -140,15 +172,79 @@ class DynamTSTransfer(protocol.ProcessProtocol): pids = self.pids self.request = request + fsize = size = os.path.getsize(path) + + request.setHeader('accept-ranges','bytes') + request.setHeader('content-type', 'video/mpeg') - if request.method == 'HEAD': + + try: + self.fp = open(path) + except IOError, e: + import errno + if e[0] == errno.EACCESS: + return error.ForbiddenResource().render(request) + else: + raise + + if request.setLastModified(os.path.getmtime(path)) is http.CACHED: return '' - args = [ 'tssel.py', path, str(pmt), ] + map(str, pids) - self.proc = process.Process(reactor, tsselpypath, args, - None, None, self) - self.proc.closeStdin() - request.registerProducer(self, 1) + trans = True + # Commented out because it's totally broken. --jknight 11/29/04 + # XXX - fixed? jmg 2/17/06 + range = request.getHeader('range') + + tsize = size + if range is not None: + # This is a request for partial data... + bytesrange = range.split('=') + assert bytesrange[0] == 'bytes', \ + "Syntactically invalid http range header!" + start, end = bytesrange[1].split('-', 1) + if start: + start = int(start) + self.fp.seek(start) + if end and int(end) < size: + end = int(end) + else: + end = size - 1 + else: + lastbytes = int(end) + if size < lastbytes: + lastbytes = size + start = size - lastbytes + self.fp.seek(start) + fsize = lastbytes + end = size - 1 + start = start // 188 * 188 + self.fp.seek(start) + size = (end + 1) // 188 * 188 + fsize = end - int(start) + 1 + # start is the byte offset to begin, and end is the + # byte offset to end.. fsize is size to send, tsize + # is the real size of the file, and size is the byte + # position to stop sending. + + if fsize <= 0: + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE + ) + fsize = tsize + trans = False + else: + request.setResponseCode(http.PARTIAL_CONTENT) + request.setHeader('content-range',"bytes %s-%s/%s " % ( + str(start), str(end), str(tsize))) + + request.setHeader('content-length', str(fsize)) + + if request.method == 'HEAD' or trans is False: + request.method = 'HEAD' + return '' + + self.size = tsize + self.written = 0 + request.registerProducer(self, 0) return server.NOT_DONE_YET @@ -163,9 +259,6 @@ class MPEGTSResource(resource.Resource): def render(self, request): request.setHeader('content-type', 'video/mpeg') - if request.method == 'HEAD': - return '' - # return data return DynamTSTransfer(*self.args).render(request) @@ -207,6 +300,7 @@ class MultiMPEGTS(FSObject, StorageFolder): doupdate = False origchildren, toindex = _gennameindexes(self.tvct['channels']) + #log.msg('MultiMPEGTS doUpdate: tvct: %s' % self.tvct) children = sets.Set(origchildren) for i in self.pathObjmap.keys(): if i not in children: @@ -243,6 +337,7 @@ def detectmpegts(path, fobj): # fast enough. return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] } elif len(tvct['channels']) > 1: + #log.msg('MultiMPEGTS: path: %s' % path) return MultiMPEGTS, { 'path': path } return None, None