@@ -8,8 +8,11 @@ __version__ = '$Change$'
tsselpypath = 'mpegts/tssel.py'
default_audio_lang = 'eng'
import array
import itertools
import os
import sets
import struct
import sys
mpegtspath = 'mpegts'
@@ -23,8 +26,8 @@ from FSStorage import FSObject, registerklassfun
from twisted.python import log, threadable
from twisted.spread import pb
from twisted.internet import process, protocol, reactor
from twisted.web import resource, server
from twisted.internet import abstract, process, protocol, reactor
from twisted.web import error, http, resource, server
class _LimitedFile(file):
def __init__(self, *args, **kwargs):
@@ -100,39 +103,68 @@ class MPEGTSTransfer(pb.Viewable):
threadable.synchronize(MPEGTSTransfer)
class DynamTSTransfer(protocol.ProcessProtocol ):
class DynamTSTransfer(pb.Viewable ):
def __init__(self, path, pmt, *pids):
self.path = path
#log.msg("DynamTSTransfer: pmt: %s, pids: %s" % (pmt, pids))
self.pmt = pmt
self.pids = pids
self.didpmt = False
def outReceived(self, data):
self.request.write(data)
def resumeProducing(self):
if not self.request:
return
def outConnectionLost(self):
if self.request:
repcnt = 0
data = self.fp.read(min(abstract.FileDescriptor.bufferSize,
self.size - self.written) // 188 * 188)
dataarray = array.array('B', data)
for i in xrange(0, len(data), 188):
if data[i] != 'G':
print 'bad sync'
continue
frst = dataarray[i + 1]
pid = (frst & 0x1f) << 8 | dataarray[i + 2]
if not frst & 0x40:
continue
elif not self.didpmt and pid == self.pmt:
startpmt = 4
if ((dataarray[i + 3] >> 4) & 0x3) == 0x3:
# Adaptation
startpmt += dataarray[i + startpmt] + 1
startpmt += dataarray[i + startpmt] + 1
assert data[i + startpmt] =='\x02', (startpmt,
data[i:i + 20])
self.pats = itertools.cycle(tssel.genpats(
self.pmt, struct.unpack('>H', data[i +
startpmt + 3:i + startpmt + 5])[0]))
self.didpmt = True
if pid == 0 and self.didpmt:
assert data[i + 4] =='\x00' and \
data[i + 5] == '\x00', 'error: %s' % `data[i:i + 10]`
repcnt += 1
pn = self.pats.next()
data = data[:i] + pn + data[i +
188:]
if repcnt > 1:
print 'repcnt:', repcnt, 'len(data):', len(data)
if data:
self.written += len(data)
self.request.write(data)
if self.request and self.fp.tell() == self.size:
self.request.unregisterProducer()
self.request.finish()
self.request = None
def errReceived(self, data):
def pauseProducing(self ):
pass
#log.msg(data)
def stopProducing(self):
if self.request:
self.request.unregisterProducer()
self.request.finish()
if self.proc:
self.proc.loseConnection()
self.proc.signalProcess('INT')
self.fp.close()
self.request = None
self.proc = None
pauseProducing = lambda x: x.proc.pauseProducing()
resumeProducing = lambda x: x.proc.resumeProducing()
def render(self, request):
path = self.path
@@ -140,15 +172,79 @@ class DynamTSTransfer(protocol.ProcessProtocol):
pids = self.pids
self.request = request
fsize = size = os.path.getsize(path)
request.setHeader('accept-ranges','bytes')
request.setHeader('content-type', 'video/mpeg')
if request.method == 'HEAD':
try:
self.fp = open(path)
except IOError, e:
import errno
if e[0] == errno.EACCESS:
return error.ForbiddenResource().render(request)
else:
raise
if request.setLastModified(os.path.getmtime(path)) is http.CACHED:
return ''
args = [ 'tssel.py', path, str(pmt), ] + map(str, pids)
self.proc = process.Process(reactor, tsselpypath, args,
None, None, self)
self.proc.closeStdin()
request.registerProducer(self, 1)
trans = True
# Commented out because it's totally broken. --jknight 11/29/04
# XXX - fixed? jmg 2/17/06
range = request.getHeader('range')
tsize = size
if range is not None:
# This is a request for partial data...
bytesrange = range.split('=')
assert bytesrange[0] == 'bytes', \
"Syntactically invalid http range header!"
start, end = bytesrange[1].split('-', 1)
if start:
start = int(start)
self.fp.seek(start)
if end and int(end) < size:
end = int(end)
else:
end = size - 1
else:
lastbytes = int(end)
if size < lastbytes:
lastbytes = size
start = size - lastbytes
self.fp.seek(start)
fsize = lastbytes
end = size - 1
start = start // 188 * 188
self.fp.seek(start)
size = (end + 1) // 188 * 188
fsize = end - int(start) + 1
# start is the byte offset to begin, and end is the
# byte offset to end.. fsize is size to send, tsize
# is the real size of the file, and size is the byte
# position to stop sending.
if fsize <= 0:
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE
)
fsize = tsize
trans = False
else:
request.setResponseCode(http.PARTIAL_CONTENT)
request.setHeader('content-range',"bytes %s-%s/%s " % (
str(start), str(end), str(tsize)))
request.setHeader('content-length', str(fsize))
if request.method == 'HEAD' or trans is False:
request.method = 'HEAD'
return ''
self.size = tsize
self.written = 0
request.registerProducer(self, 0)
return server.NOT_DONE_YET
@@ -163,9 +259,6 @@ class MPEGTSResource(resource.Resource):
def render(self, request):
request.setHeader('content-type', 'video/mpeg')
if request.method == 'HEAD':
return ''
# return data
return DynamTSTransfer(*self.args).render(request)
@@ -207,6 +300,7 @@ class MultiMPEGTS(FSObject, StorageFolder):
doupdate = False
origchildren, toindex = _gennameindexes(self.tvct['channels'])
#log.msg('MultiMPEGTS doUpdate: tvct: %s' % self.tvct)
children = sets.Set(origchildren)
for i in self.pathObjmap.keys():
if i not in children:
@@ -243,6 +337,7 @@ def detectmpegts(path, fobj):
# fast enough.
return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] }
elif len(tvct['channels']) > 1:
#log.msg('MultiMPEGTS: path: %s' % path)
return MultiMPEGTS, { 'path': path }
return None, None