|
- #!/usr/bin/env python
- # Copyright 2006 John-Mark Gurney <gurney_j@resnet.uoregon.edu>
- '''MPEG-TS Handling'''
-
- __version__ = '$Change$'
- # $Id$
-
- tsselpypath = '/Users/jgurney/p4/bktrau/info/tssel.py'
- default_audio_lang = 'eng'
-
- import os
- import sets
-
- import sys
- mpegtspath = '/Users/jgurney/p4/bktrau/info'
- if mpegtspath not in sys.path:
- sys.path.append(mpegtspath)
- import mpegts
- import tssel
-
- from DIDLLite import StorageFolder, VideoItem, Resource
- from FSStorage import FSObject, registerklassfun
-
- from twisted.python import log, threadable
- from twisted.spread import pb
- from twisted.internet import process, protocol, reactor
- from twisted.web import resource, server
-
- class _LimitedFile(file):
- def __init__(self, *args, **kwargs):
- self.__size = kwargs['size']
- del kwargs['size']
- file.__init__(self, *args, **kwargs)
-
- def remain(self):
- pos = self.tell()
- if pos > self.__size:
- return 0
- return self.__size - pos
-
- def read(self, size=-1):
- if size < 0:
- return file.read(self, self.remain())
-
- return file.read(self, min(size, self.remain()))
-
- def _gennameindexes(chan):
- ret = []
- d = {}
-
- for i in chan:
- t = '%s %s.%s' % (i['name'], i['major'], i['minor'])
- ret.append(t)
- d[t] = i
-
- return ret, d
-
- class MPEGTSTransfer(pb.Viewable):
- def __init__(self, iterable, request):
- self.iter = iter(iterable)
- self.request = request
- request.registerProducer(self, 0)
-
- def resumeProducing(self):
- if not self.request:
- return
- # get data and write to request.
- try:
- data = self.iter.next()
- if data:
- # this .write will spin the reactor, calling
- # .doWrite and then .resumeProducing again, so
- # be prepared for a re-entrant call
- self.request.write(data)
- except StopIteration:
- if self.request:
- self.request.unregisterProducer()
- self.request.finish()
- self.request = None
-
- def pauseProducing(self):
- pass
-
- def stopProducing(self):
- # close zipfile
- self.request = None
-
- # Remotely relay producer interface.
-
- def view_resumeProducing(self, issuer):
- self.resumeProducing()
-
- def view_pauseProducing(self, issuer):
- self.pauseProducing()
-
- def view_stopProducing(self, issuer):
- self.stopProducing()
-
- synchronized = ['resumeProducing', 'stopProducing']
-
- threadable.synchronize(MPEGTSTransfer)
-
- class DynamTSTransfer(protocol.ProcessProtocol):
- def __init__(self, path, pmt, *pids):
- self.path = path
- self.pmt = pmt
- self.pids = pids
-
- def outReceived(self, data):
- self.request.write(data)
-
- def outConnectionLost(self):
- if self.request:
- self.request.unregisterProducer()
- self.request.finish()
- self.request = None
-
- def errReceived(self, data):
- pass
- #log.msg(data)
-
- def stopProducing(self):
- if self.request:
- self.request.unregisterProducer()
- self.request.finish()
-
- if self.proc:
- self.proc.loseConnection()
- self.proc.signalProcess('INT')
-
- self.request = None
- self.proc = None
-
- pauseProducing = lambda x: x.proc.pauseProducing()
- resumeProducing = lambda x: x.proc.resumeProducing()
-
- def render(self, request):
- path = self.path
- pmt = self.pmt
- pids = self.pids
- self.request = request
-
- request.setHeader('content-type', 'video/mpeg')
- if request.method == 'HEAD':
- return ''
-
- args = [ 'tssel.py', path, str(pmt), ] + map(str, pids)
- self.proc = process.Process(reactor, tsselpypath, args,
- None, None, self)
- self.proc.closeStdin()
- request.registerProducer(self, 1)
-
- return server.NOT_DONE_YET
-
- class MPEGTSResource(resource.Resource):
- isLeaf = True
-
- def __init__(self, *args):
- resource.Resource.__init__(self)
-
- self.args = args
-
- def render(self, request):
- request.setHeader('content-type', 'video/mpeg')
-
- if request.method == 'HEAD':
- return ''
-
- # return data
- return DynamTSTransfer(*self.args).render(request)
-
- class MPEGTS(FSObject, VideoItem):
- def __init__(self, *args, **kwargs):
- self.path = path = kwargs['path']
- del kwargs['path']
- self.tvct = tvct = kwargs['tvct']
- del kwargs['tvct']
-
- #log.msg('tvct w/ keys:', tvct, tvct.keys())
-
- kwargs['content'] = MPEGTSResource(path, tvct['PMTpid'],
- *sum(mpegts.getaudiovideopids(tvct['PMT']), []))
- VideoItem.__init__(self, *args, **kwargs)
- FSObject.__init__(self, path)
-
- self.url = '%s/%s' % (self.cd.urlbase, self.id)
- self.res = Resource(self.url, 'http-get:*:video/mpeg:*')
-
- def doUpdate(self):
- pass
-
- class MultiMPEGTS(FSObject, StorageFolder):
- def __init__(self, *args, **kwargs):
- path = kwargs['path']
- del kwargs['path']
-
- StorageFolder.__init__(self, *args, **kwargs)
- FSObject.__init__(self, path)
-
- # mapping from path to objectID
- self.pathObjmap = {}
-
- def doUpdate(self):
- f = mpegts.TSPStream(_LimitedFile(self.FSpath,
- size= 2*1024*1024))
- self.tvct = mpegts.GetTVCT(f)
-
- doupdate = False
- origchildren, toindex = _gennameindexes(self.tvct['channels'])
- children = sets.Set(origchildren)
- for i in self.pathObjmap.keys():
- if i not in children:
- doupdate = True
- # delete
- self.cd.delItem(self.pathObjmap[i])
- del self.pathObjmap[i]
-
- for i in origchildren:
- if i in self.pathObjmap:
- continue
-
- # new object
- if toindex[i]['prog_num'] == 0:
- log.msg('bogus:', toindex[i])
- continue
-
- #log.msg('real tvct:', toindex[i], toindex.keys(),
- # self.tvct)
- self.pathObjmap[i] = self.cd.addItem(self.id, MPEGTS,
- i, path = self.FSpath, tvct = toindex[i])
- doupdate = True
-
- if doupdate:
- StorageFolder.doUpdate(self)
-
- def detectmpegts(path, fobj):
- f = mpegts.TSPStream(_LimitedFile(path, size= 2*1024*1024))
- tvct = mpegts.GetTVCT(f)
-
- if len(tvct['channels']) == 1:
- #return None, None
- # We might reenable this once we have pid filtering working
- # fast enough.
- return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] }
- elif len(tvct['channels']) > 1:
- return MultiMPEGTS, { 'path': path }
-
- return None, None
-
- registerklassfun(detectmpegts)
|