diff --git a/mpegtsmod.py b/mpegtsmod.py index 0863051..b3fffb7 100644 --- a/mpegtsmod.py +++ b/mpegtsmod.py @@ -8,8 +8,11 @@ __version__ = '$Change$' tsselpypath = 'mpegts/tssel.py' default_audio_lang = 'eng' +import array +import itertools import os import sets +import struct import sys mpegtspath = 'mpegts' @@ -23,8 +26,8 @@ from FSStorage import FSObject, registerklassfun from twisted.python import log, threadable from twisted.spread import pb -from twisted.internet import process, protocol, reactor -from twisted.web import resource, server +from twisted.internet import abstract, process, protocol, reactor +from twisted.web import error, http, resource, server class _LimitedFile(file): def __init__(self, *args, **kwargs): @@ -100,39 +103,68 @@ class MPEGTSTransfer(pb.Viewable): threadable.synchronize(MPEGTSTransfer) -class DynamTSTransfer(protocol.ProcessProtocol): +class DynamTSTransfer(pb.Viewable): def __init__(self, path, pmt, *pids): self.path = path + #log.msg("DynamTSTransfer: pmt: %s, pids: %s" % (pmt, pids)) self.pmt = pmt self.pids = pids + self.didpmt = False - def outReceived(self, data): - self.request.write(data) + def resumeProducing(self): + if not self.request: + return - def outConnectionLost(self): - if self.request: + repcnt = 0 + data = self.fp.read(min(abstract.FileDescriptor.bufferSize, + self.size - self.written) // 188 * 188) + dataarray = array.array('B', data) + for i in xrange(0, len(data), 188): + if data[i] != 'G': + print 'bad sync' + continue + frst = dataarray[i + 1] + pid = (frst & 0x1f) << 8 | dataarray[i + 2] + if not frst & 0x40: + continue + elif not self.didpmt and pid == self.pmt: + startpmt = 4 + if ((dataarray[i + 3] >> 4) & 0x3) == 0x3: + # Adaptation + startpmt += dataarray[i + startpmt] + 1 + startpmt += dataarray[i + startpmt] + 1 + assert data[i + startpmt] =='\x02', (startpmt, + data[i:i + 20]) + self.pats = itertools.cycle(tssel.genpats( + self.pmt, struct.unpack('>H', data[i + + startpmt + 3:i + startpmt + 5])[0])) + self.didpmt = True + + if pid == 0 and self.didpmt: + assert data[i + 4] =='\x00' and \ + data[i + 5] == '\x00', 'error: %s' % `data[i:i + 10]` + repcnt += 1 + pn = self.pats.next() + data = data[:i] + pn + data[i + + 188:] + + if repcnt > 1: + print 'repcnt:', repcnt, 'len(data):', len(data) + + if data: + self.written += len(data) + self.request.write(data) + if self.request and self.fp.tell() == self.size: self.request.unregisterProducer() self.request.finish() self.request = None - def errReceived(self, data): + def pauseProducing(self): pass - #log.msg(data) def stopProducing(self): - if self.request: - self.request.unregisterProducer() - self.request.finish() - - if self.proc: - self.proc.loseConnection() - self.proc.signalProcess('INT') - + self.fp.close() self.request = None - self.proc = None - - pauseProducing = lambda x: x.proc.pauseProducing() - resumeProducing = lambda x: x.proc.resumeProducing() def render(self, request): path = self.path @@ -140,15 +172,79 @@ class DynamTSTransfer(protocol.ProcessProtocol): pids = self.pids self.request = request + fsize = size = os.path.getsize(path) + + request.setHeader('accept-ranges','bytes') + request.setHeader('content-type', 'video/mpeg') - if request.method == 'HEAD': + + try: + self.fp = open(path) + except IOError, e: + import errno + if e[0] == errno.EACCESS: + return error.ForbiddenResource().render(request) + else: + raise + + if request.setLastModified(os.path.getmtime(path)) is http.CACHED: return '' - args = [ 'tssel.py', path, str(pmt), ] + map(str, pids) - self.proc = process.Process(reactor, tsselpypath, args, - None, None, self) - self.proc.closeStdin() - request.registerProducer(self, 1) + trans = True + # Commented out because it's totally broken. --jknight 11/29/04 + # XXX - fixed? jmg 2/17/06 + range = request.getHeader('range') + + tsize = size + if range is not None: + # This is a request for partial data... + bytesrange = range.split('=') + assert bytesrange[0] == 'bytes', \ + "Syntactically invalid http range header!" + start, end = bytesrange[1].split('-', 1) + if start: + start = int(start) + self.fp.seek(start) + if end and int(end) < size: + end = int(end) + else: + end = size - 1 + else: + lastbytes = int(end) + if size < lastbytes: + lastbytes = size + start = size - lastbytes + self.fp.seek(start) + fsize = lastbytes + end = size - 1 + start = start // 188 * 188 + self.fp.seek(start) + size = (end + 1) // 188 * 188 + fsize = end - int(start) + 1 + # start is the byte offset to begin, and end is the + # byte offset to end.. fsize is size to send, tsize + # is the real size of the file, and size is the byte + # position to stop sending. + + if fsize <= 0: + request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE + ) + fsize = tsize + trans = False + else: + request.setResponseCode(http.PARTIAL_CONTENT) + request.setHeader('content-range',"bytes %s-%s/%s " % ( + str(start), str(end), str(tsize))) + + request.setHeader('content-length', str(fsize)) + + if request.method == 'HEAD' or trans is False: + request.method = 'HEAD' + return '' + + self.size = tsize + self.written = 0 + request.registerProducer(self, 0) return server.NOT_DONE_YET @@ -163,9 +259,6 @@ class MPEGTSResource(resource.Resource): def render(self, request): request.setHeader('content-type', 'video/mpeg') - if request.method == 'HEAD': - return '' - # return data return DynamTSTransfer(*self.args).render(request) @@ -207,6 +300,7 @@ class MultiMPEGTS(FSObject, StorageFolder): doupdate = False origchildren, toindex = _gennameindexes(self.tvct['channels']) + #log.msg('MultiMPEGTS doUpdate: tvct: %s' % self.tvct) children = sets.Set(origchildren) for i in self.pathObjmap.keys(): if i not in children: @@ -243,6 +337,7 @@ def detectmpegts(path, fobj): # fast enough. return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] } elif len(tvct['channels']) > 1: + #log.msg('MultiMPEGTS: path: %s' % path) return MultiMPEGTS, { 'path': path } return None, None