A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
9.1 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006-2008 John-Mark Gurney <jmg@funkthat.com>
  3. '''MPEG-TS Handling'''
  4. __version__ = '$Change$'
  5. # $Id$
  6. tsselpypath = 'mpegts/tssel.py'
  7. default_audio_lang = 'eng'
  8. import array
  9. import itertools
  10. import os
  11. import sets
  12. import struct
  13. import sys
  14. mpegtspath = 'mpegts'
  15. if mpegtspath not in sys.path:
  16. sys.path.append(mpegtspath)
  17. import mpegts
  18. import tssel
  19. from DIDLLite import StorageFolder, VideoItem, Resource
  20. from FSStorage import FSObject, registerklassfun
  21. from twisted.python import log, threadable
  22. from twisted.spread import pb
  23. from twisted.internet import abstract, process, protocol, reactor
  24. from twisted.web import error, http, resource, server
  25. class _LimitedFile(file):
  26. def __init__(self, *args, **kwargs):
  27. self.__size = kwargs['size']
  28. del kwargs['size']
  29. file.__init__(self, *args, **kwargs)
  30. def remain(self):
  31. pos = self.tell()
  32. if pos > self.__size:
  33. return 0
  34. return self.__size - pos
  35. def read(self, size=-1):
  36. if size < 0:
  37. return file.read(self, self.remain())
  38. return file.read(self, min(size, self.remain()))
  39. def _gennameindexes(chan):
  40. ret = []
  41. d = {}
  42. for i in chan:
  43. t = '%s %s.%s' % (i['name'], i['major'], i['minor'])
  44. ret.append(t)
  45. d[t] = i
  46. return ret, d
  47. class MPEGTSTransfer(pb.Viewable):
  48. def __init__(self, iterable, request):
  49. self.iter = iter(iterable)
  50. self.request = request
  51. request.registerProducer(self, 0)
  52. def resumeProducing(self):
  53. if not self.request:
  54. return
  55. # get data and write to request.
  56. try:
  57. data = self.iter.next()
  58. if data:
  59. # this .write will spin the reactor, calling
  60. # .doWrite and then .resumeProducing again, so
  61. # be prepared for a re-entrant call
  62. self.request.write(data)
  63. except StopIteration:
  64. if self.request:
  65. self.request.unregisterProducer()
  66. self.request.finish()
  67. self.request = None
  68. def pauseProducing(self):
  69. pass
  70. def stopProducing(self):
  71. # close zipfile
  72. self.request = None
  73. # Remotely relay producer interface.
  74. def view_resumeProducing(self, issuer):
  75. self.resumeProducing()
  76. def view_pauseProducing(self, issuer):
  77. self.pauseProducing()
  78. def view_stopProducing(self, issuer):
  79. self.stopProducing()
  80. synchronized = ['resumeProducing', 'stopProducing']
  81. threadable.synchronize(MPEGTSTransfer)
  82. class DynamTSTransfer(pb.Viewable):
  83. def __init__(self, path, pmt, *pids):
  84. self.path = path
  85. #log.msg("DynamTSTransfer: pmt: %s, pids: %s" % (pmt, pids))
  86. self.pmt = pmt
  87. self.pids = pids
  88. self.didpat = False
  89. def resumeProducing(self):
  90. if not self.request:
  91. return
  92. repcnt = 0
  93. data = self.fp.read(min(abstract.FileDescriptor.bufferSize,
  94. self.size - self.written) // 188 * 188)
  95. dataarray = array.array('B', data)
  96. for i in xrange(0, len(data), 188):
  97. if data[i] != 'G':
  98. print 'bad sync'
  99. continue
  100. frst = dataarray[i + 1]
  101. pid = (frst & 0x1f) << 8 | dataarray[i + 2]
  102. if not frst & 0x40:
  103. continue
  104. elif not self.didpat and pid == 0:
  105. startpmt = i + 4
  106. if ((dataarray[i + 3] >> 4) & 0x3) == 0x3:
  107. # Adaptation
  108. startpmt += dataarray[startpmt] + 1
  109. startpmt += dataarray[startpmt] + 1
  110. assert data[startpmt] =='\x00', (startpmt,
  111. data[i:startpmt + 4])
  112. arraysize = ((dataarray[startpmt + 1] &
  113. 0xf) << 8) | dataarray[startpmt + 2]
  114. startpmt += 3
  115. arraysize -= 4 # CRC
  116. # Remaining fields before array
  117. startpmt += 5
  118. arraysize -= 5
  119. for startpmt in xrange(startpmt,
  120. min(i + 188 - 3, startpmt + arraysize), 4):
  121. prognum, ppid = struct.unpack('>2H',
  122. data[startpmt:startpmt + 4])
  123. ppid = ppid & 0x1fff
  124. if ppid == self.pmt:
  125. break
  126. else:
  127. raise KeyError, 'unable to find pmt(%d) in pkt: %s' % (pmt, `data[i:i + 188]`)
  128. self.pats = itertools.cycle(tssel.genpats(
  129. self.pmt, prognum))
  130. self.didpat = True
  131. if pid == 0 and self.didpat:
  132. assert data[i + 4] =='\x00' and \
  133. data[i + 5] == '\x00', 'error: %s' % `data[i:i + 10]`
  134. repcnt += 1
  135. pn = self.pats.next()
  136. data = data[:i] + pn + data[i +
  137. 188:]
  138. if repcnt > 1:
  139. print 'repcnt:', repcnt, 'len(data):', len(data)
  140. if data:
  141. self.written += len(data)
  142. self.request.write(data)
  143. if self.request and self.fp.tell() == self.size:
  144. self.request.unregisterProducer()
  145. self.request.finish()
  146. self.request = None
  147. def pauseProducing(self):
  148. pass
  149. def stopProducing(self):
  150. self.fp.close()
  151. self.request = None
  152. def render(self, request):
  153. path = self.path
  154. pmt = self.pmt
  155. pids = self.pids
  156. self.request = request
  157. fsize = size = os.path.getsize(path)
  158. request.setHeader('accept-ranges','bytes')
  159. request.setHeader('content-type', 'video/mpeg')
  160. try:
  161. self.fp = open(path)
  162. except IOError, e:
  163. import errno
  164. if e[0] == errno.EACCESS:
  165. return error.ForbiddenResource().render(request)
  166. else:
  167. raise
  168. if request.setLastModified(os.path.getmtime(path)) is http.CACHED:
  169. return ''
  170. trans = True
  171. # Commented out because it's totally broken. --jknight 11/29/04
  172. # XXX - fixed? jmg 2/17/06
  173. range = request.getHeader('range')
  174. tsize = size
  175. if range is not None:
  176. # This is a request for partial data...
  177. bytesrange = range.split('=')
  178. assert bytesrange[0] == 'bytes', \
  179. "Syntactically invalid http range header!"
  180. start, end = bytesrange[1].split('-', 1)
  181. if start:
  182. start = int(start)
  183. self.fp.seek(start)
  184. if end and int(end) < size:
  185. end = int(end)
  186. else:
  187. end = size - 1
  188. else:
  189. lastbytes = int(end)
  190. if size < lastbytes:
  191. lastbytes = size
  192. start = size - lastbytes
  193. self.fp.seek(start)
  194. fsize = lastbytes
  195. end = size - 1
  196. start = start // 188 * 188
  197. self.fp.seek(start)
  198. size = (end + 1) // 188 * 188
  199. fsize = end - int(start) + 1
  200. # start is the byte offset to begin, and end is the
  201. # byte offset to end.. fsize is size to send, tsize
  202. # is the real size of the file, and size is the byte
  203. # position to stop sending.
  204. if fsize <= 0:
  205. request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE
  206. )
  207. fsize = tsize
  208. trans = False
  209. else:
  210. request.setResponseCode(http.PARTIAL_CONTENT)
  211. request.setHeader('content-range',"bytes %s-%s/%s " % (
  212. str(start), str(end), str(tsize)))
  213. request.setHeader('content-length', str(fsize))
  214. if request.method == 'HEAD' or trans is False:
  215. request.method = 'HEAD'
  216. return ''
  217. self.size = tsize
  218. self.written = 0
  219. request.registerProducer(self, 0)
  220. return server.NOT_DONE_YET
  221. class MPEGTSResource(resource.Resource):
  222. isLeaf = True
  223. def __init__(self, *args):
  224. resource.Resource.__init__(self)
  225. self.args = args
  226. def render(self, request):
  227. request.setHeader('content-type', 'video/mpeg')
  228. # return data
  229. return DynamTSTransfer(*self.args).render(request)
  230. class MPEGTS(FSObject, VideoItem):
  231. def __init__(self, *args, **kwargs):
  232. self.path = path = kwargs['path']
  233. del kwargs['path']
  234. self.tvct = tvct = kwargs['tvct']
  235. del kwargs['tvct']
  236. #log.msg('tvct w/ keys:', tvct, tvct.keys())
  237. kwargs['content'] = MPEGTSResource(path, tvct['PMTpid'],
  238. *sum(mpegts.getaudiovideopids(tvct['PMT']), []))
  239. VideoItem.__init__(self, *args, **kwargs)
  240. FSObject.__init__(self, path)
  241. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  242. self.res = Resource(self.url, 'http-get:*:video/mpeg:*')
  243. def doUpdate(self):
  244. pass
  245. class MultiMPEGTS(FSObject, StorageFolder):
  246. def __init__(self, *args, **kwargs):
  247. path = kwargs['path']
  248. del kwargs['path']
  249. StorageFolder.__init__(self, *args, **kwargs)
  250. FSObject.__init__(self, path)
  251. # mapping from path to objectID
  252. self.pathObjmap = {}
  253. def doUpdate(self):
  254. f = mpegts.TSPStream(_LimitedFile(self.FSpath,
  255. size= 2*1024*1024))
  256. self.tvct = mpegts.GetTVCT(f)
  257. doupdate = False
  258. origchildren, toindex = _gennameindexes(self.tvct['channels'])
  259. #log.msg('MultiMPEGTS doUpdate: tvct: %s' % self.tvct)
  260. children = sets.Set(origchildren)
  261. for i in self.pathObjmap.keys():
  262. if i not in children:
  263. doupdate = True
  264. # delete
  265. self.cd.delItem(self.pathObjmap[i])
  266. del self.pathObjmap[i]
  267. for i in origchildren:
  268. if i in self.pathObjmap:
  269. continue
  270. # new object
  271. if toindex[i]['prog_num'] == 0:
  272. log.msg('bogus:', toindex[i])
  273. continue
  274. #log.msg('real tvct:', toindex[i], toindex.keys(),
  275. # self.tvct)
  276. self.pathObjmap[i] = self.cd.addItem(self.id, MPEGTS,
  277. i, path = self.FSpath, tvct = toindex[i])
  278. doupdate = True
  279. if doupdate:
  280. StorageFolder.doUpdate(self)
  281. def findtsstream(fp, pktsz=188):
  282. d = fp.read(200*pktsz)
  283. i = 5
  284. pos = 0
  285. while i and pos < len(d) and pos != -1:
  286. if d[pos] == 'G':
  287. i -= 1
  288. pos += pktsz
  289. else:
  290. i = 5
  291. pos = d.find('G', pos + 1)
  292. if i or pos == -1:
  293. return False
  294. return True
  295. def detectmpegts(path, fobj):
  296. if not findtsstream(fobj):
  297. return None, None
  298. f = mpegts.TSPStream(_LimitedFile(path, size= 2*1024*1024))
  299. tvct = mpegts.GetTVCT(f)
  300. if len(tvct['channels']) == 1:
  301. #return None, None
  302. # We might reenable this once we have pid filtering working
  303. # fast enough.
  304. return MPEGTS, { 'path': path, 'tvct': tvct['channels'][0] }
  305. elif len(tvct['channels']) > 1:
  306. #log.msg('MultiMPEGTS: path: %s' % path)
  307. return MultiMPEGTS, { 'path': path }
  308. return None, None
  309. registerklassfun(detectmpegts)