A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
9.1 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006-2008 John-Mark Gurney <jmg@funkthat.com>
  3. '''MPEG-TS Handling'''
  4. __version__ = '$Change$'
  5. # $Id$
  6. tsselpypath = 'mpegts/tssel.py'
  7. default_audio_lang = 'eng'
  8. import array
  9. import itertools
  10. import os
  11. import sets
  12. import struct
  13. import urlparse
  14. import sys
  15. mpegtspath = 'mpegts'
  16. if mpegtspath not in sys.path:
  17. sys.path.append(mpegtspath)
  18. import mpegts
  19. import tssel
  20. from DIDLLite import StorageFolder, VideoItem, Resource
  21. from FSStorage import FSObject, registerklassfun
  22. from twisted.python import log, threadable
  23. from twisted.spread import pb
  24. from twisted.internet import abstract, process, protocol, reactor
  25. from twisted.web import error, http, resource, server
  26. class _LimitedFile(file):
  27. def __init__(self, *args, **kwargs):
  28. self.__size = kwargs['size']
  29. del kwargs['size']
  30. file.__init__(self, *args, **kwargs)
  31. def remain(self):
  32. pos = self.tell()
  33. if pos > self.__size:
  34. return 0
  35. return self.__size - pos
  36. def read(self, size=-1):
  37. if size < 0:
  38. return file.read(self, self.remain())
  39. return file.read(self, min(size, self.remain()))
  40. def _gennameindexes(chan):
  41. ret = []
  42. d = {}
  43. for i in chan:
  44. t = '%s %s.%s' % (i['name'], i['major'], i['minor'])
  45. ret.append(t)
  46. d[t] = i
  47. return ret, d
  48. class MPEGTSTransfer(pb.Viewable):
  49. def __init__(self, iterable, request):
  50. self.iter = iter(iterable)
  51. self.request = request
  52. request.registerProducer(self, 0)
  53. def resumeProducing(self):
  54. if not self.request:
  55. return
  56. # get data and write to request.
  57. try:
  58. data = self.iter.next()
  59. if data:
  60. # this .write will spin the reactor, calling
  61. # .doWrite and then .resumeProducing again, so
  62. # be prepared for a re-entrant call
  63. self.request.write(data)
  64. except StopIteration:
  65. if self.request:
  66. self.request.unregisterProducer()
  67. self.request.finish()
  68. self.request = None
  69. def pauseProducing(self):
  70. pass
  71. def stopProducing(self):
  72. # close zipfile
  73. self.request = None
  74. # Remotely relay producer interface.
  75. def view_resumeProducing(self, issuer):
  76. self.resumeProducing()
  77. def view_pauseProducing(self, issuer):
  78. self.pauseProducing()
  79. def view_stopProducing(self, issuer):
  80. self.stopProducing()
  81. synchronized = ['resumeProducing', 'stopProducing']
  82. threadable.synchronize(MPEGTSTransfer)
  83. class DynamTSTransfer(pb.Viewable):
  84. def __init__(self, path, pmt, *pids):
  85. self.path = path
  86. #log.msg("DynamTSTransfer: pmt: %s, pids: %s" % (pmt, pids))
  87. self.pmt = pmt
  88. self.pids = pids
  89. self.didpat = False
  90. def resumeProducing(self):
  91. if not self.request:
  92. return
  93. repcnt = 0
  94. data = self.fp.read(min(abstract.FileDescriptor.bufferSize,
  95. self.size - self.written) // 188 * 188)
  96. dataarray = array.array('B', data)
  97. for i in xrange(0, len(data), 188):
  98. if data[i] != 'G':
  99. print 'bad sync'
  100. continue
  101. frst = dataarray[i + 1]
  102. pid = (frst & 0x1f) << 8 | dataarray[i + 2]
  103. if not frst & 0x40:
  104. continue
  105. elif not self.didpat and pid == 0:
  106. startpmt = i + 4
  107. if ((dataarray[i + 3] >> 4) & 0x3) == 0x3:
  108. # Adaptation
  109. startpmt += dataarray[startpmt] + 1
  110. startpmt += dataarray[startpmt] + 1
  111. assert data[startpmt] =='\x00', (startpmt,
  112. data[i:startpmt + 4])
  113. arraysize = ((dataarray[startpmt + 1] &
  114. 0xf) << 8) | dataarray[startpmt + 2]
  115. startpmt += 3
  116. arraysize -= 4 # CRC
  117. # Remaining fields before array
  118. startpmt += 5
  119. arraysize -= 5
  120. for startpmt in xrange(startpmt,
  121. min(i + 188 - 3, startpmt + arraysize), 4):
  122. prognum, ppid = struct.unpack('>2H',
  123. data[startpmt:startpmt + 4])
  124. ppid = ppid & 0x1fff
  125. if ppid == self.pmt:
  126. break
  127. else:
  128. raise KeyError, 'unable to find pmt(%d) in pkt: %s' % (pmt, `data[i:i + 188]`)
  129. self.pats = itertools.cycle(tssel.genpats(
  130. self.pmt, prognum))
  131. self.didpat = True
  132. if pid == 0 and self.didpat:
  133. assert data[i + 4] =='\x00' and \
  134. data[i + 5] == '\x00', 'error: %s' % `data[i:i + 10]`
  135. repcnt += 1
  136. pn = self.pats.next()
  137. data = data[:i] + pn + data[i +
  138. 188:]
  139. if repcnt > 1:
  140. print 'repcnt:', repcnt, 'len(data):', len(data)
  141. if data:
  142. self.written += len(data)
  143. self.request.write(data)
  144. if self.request and self.fp.tell() == self.size:
  145. self.request.unregisterProducer()
  146. self.request.finish()
  147. self.request = None
  148. def pauseProducing(self):
  149. pass
  150. def stopProducing(self):
  151. self.fp.close()
  152. self.request = None
  153. def render(self, request):
  154. path = self.path
  155. pmt = self.pmt
  156. pids = self.pids
  157. self.request = request
  158. fsize = size = os.path.getsize(path)
  159. request.setHeader('accept-ranges','bytes')
  160. request.setHeader('content-type', 'video/mpeg')
  161. try:
  162. self.fp = open(path)
  163. except IOError, e:
  164. import errno
  165. if e[0] == errno.EACCESS:
  166. return error.ForbiddenResource().render(request)
  167. else:
  168. raise
  169. if request.setLastModified(os.path.getmtime(path)) is http.CACHED:
  170. return ''
  171. trans = True
  172. # Commented out because it's totally broken. --jknight 11/29/04
  173. # XXX - fixed? jmg 2/17/06
  174. range = request.getHeader('range')
  175. tsize = size
  176. if range is not None:
  177. # This is a request for partial data...
  178. bytesrange = range.split('=')
  179. assert bytesrange[0] == 'bytes', \
  180. "Syntactically invalid http range header!"
  181. start, end = bytesrange[1].split('-', 1)
  182. if start:
  183. start = int(start)
  184. self.fp.seek(start)
  185. if end and int(end) < size:
  186. end = int(end)
  187. else:
  188. end = size - 1
  189. else:
  190. lastbytes = int(end)
  191. if size < lastbytes:
  192. lastbytes = size
  193. start = size - lastbytes
  194. self.fp.seek(start)
  195. fsize = lastbytes
  196. end = size - 1
  197. start = start // 188 * 188
  198. self.fp.seek(start)
  199. size = (end + 1) // 188 * 188
  200. fsize = end - int(start) + 1
  201. # start is the byte offset to begin, and end is the
  202. # byte offset to end.. fsize is size to send, tsize
  203. # is the real size of the file, and size is the byte
  204. # position to stop sending.
  205. if fsize <= 0:
  206. request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE
  207. )
  208. fsize = tsize
  209. trans = False
  210. else:
  211. request.setResponseCode(http.PARTIAL_CONTENT)
  212. request.setHeader('content-range',"bytes %s-%s/%s " % (
  213. str(start), str(end), str(tsize)))
  214. request.setHeader('content-length', str(fsize))
  215. if request.method == 'HEAD' or trans is False:
  216. request.method = 'HEAD'
  217. return ''
  218. self.size = tsize
  219. self.written = 0
  220. request.registerProducer(self, 0)
  221. return server.NOT_DONE_YET
  222. class MPEGTSResource(resource.Resource):
  223. isLeaf = True
  224. def __init__(self, *args):
  225. resource.Resource.__init__(self)
  226. self.args = args
  227. def render(self, request):
  228. request.setHeader('content-type', 'video/mpeg')
  229. # return data
  230. return DynamTSTransfer(*self.args).render(request)
  231. class MPEGTS(FSObject, VideoItem):
  232. def __init__(self, *args, **kwargs):
  233. self.path = path = kwargs['path']
  234. del kwargs['path']
  235. self.tvct = tvct = kwargs['tvct']
  236. del kwargs['tvct']
  237. #log.msg('tvct w/ keys:', tvct, tvct.keys())
  238. kwargs['content'] = MPEGTSResource(path, tvct['PMTpid'],
  239. *sum(mpegts.getaudiovideopids(tvct['PMT']), []))
  240. VideoItem.__init__(self, *args, **kwargs)
  241. FSObject.__init__(self, path)
  242. self.url = urlparse.urljoin(self.cd.urlbase, self.id)
  243. self.res = Resource(self.url, 'http-get:*:video/mpeg:*')
  244. def doUpdate(self):
  245. pass
  246. class MultiMPEGTS(FSObject, StorageFolder):
  247. def __init__(self, *args, **kwargs):
  248. path = kwargs['path']
  249. del kwargs['path']
  250. StorageFolder.__init__(self, *args, **kwargs)
  251. FSObject.__init__(self, path)
  252. # mapping from path to objectID
  253. self.pathObjmap = {}
  254. def doUpdate(self):
  255. f = mpegts.TSPStream(_LimitedFile(self.FSpath,
  256. size= 2*1024*1024))
  257. self.tvct = mpegts.GetTVCT(f)
  258. doupdate = False
  259. origchildren, toindex = _gennameindexes(self.tvct['channels'])
  260. #log.msg('MultiMPEGTS doUpdate: tvct: %s' % self.tvct)
  261. children = sets.Set(origchildren)
  262. for i in self.pathObjmap.keys():
  263. if i not in children:
  264. doupdate = True
  265. # delete
  266. self.cd.delItem(self.pathObjmap[i])
  267. del self.pathObjmap[i]
  268. for i in origchildren:
  269. if i in self.pathObjmap:
  270. continue
  271. # new object
  272. if toindex[i]['prog_num'] == 0:
  273. log.msg('bogus:', toindex[i])
  274. continue
  275. #log.msg('real tvct:', toindex[i], toindex.keys(),
  276. # self.tvct)
  277. self.pathObjmap[i] = self.cd.addItem(self.id, MPEGTS,
  278. i, path = self.FSpath, tvct = toindex[i])
  279. doupdate = True
  280. if doupdate:
  281. StorageFolder.doUpdate(self)
  282. def findtsstream(fp, pktsz=188):
  283. d = fp.read(200*pktsz)
  284. i = 5
  285. pos = 0
  286. while i and pos < len(d) and pos != -1:
  287. if d[pos] == 'G':
  288. i -= 1
  289. pos += pktsz
  290. else:
  291. i = 5
  292. pos = d.find('G', pos + 1)
  293. if i or pos == -1:
  294. return False
  295. return True
  296. def detectmpegts(path, fobj):
  297. if not findtsstream(fobj):
  298. return None, None
  299. f = mpegts.TSPStream(_LimitedFile(path, size= 2*1024*1024))
  300. tvct = mpegts.GetTVCT(f)
  301. if len(tvct['channels']) == 1:
  302. #return None, None
  303. # We might reenable this once we have pid filtering working
  304. # fast enough.
  305. return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] }
  306. elif len(tvct['channels']) > 1:
  307. #log.msg('MultiMPEGTS: path: %s' % path)
  308. return MultiMPEGTS, { 'path': path }
  309. return None, None
  310. registerklassfun(detectmpegts)