A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

428 lines
11 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006 John-Mark Gurney <jmg@funkthat.com>
  3. '''Shoutcast Radio Feed'''
  4. __version__ = '$Change$'
  5. # $Id$
  6. # The handling of defers and state in this module is not very good. It
  7. # needs some work to ensure that error cases are properly handled. What
  8. # do we do if we get no URLs for a PLS? Do we delete ourselves just to be
  9. # readded (when we get PLS refeshing working)? Do we set a content-length
  10. # to zero till we get one?
  11. import ConfigParser
  12. import cStringIO as StringIO
  13. import os.path
  14. import random
  15. import traceback
  16. from py_shoutcast import *
  17. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  18. from FSStorage import registerklassfun
  19. from twisted.protocols import shoutcast
  20. from twisted.python import log, threadable, failure
  21. from twisted.internet import defer, protocol, reactor
  22. from twisted.web import error, http, resource, server
  23. from twisted.web.client import getPage, _parse
  24. PLSsection = 'playlist'
  25. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  26. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  27. return False
  28. return True
  29. def stationwbitratecmp(x, y):
  30. x, y = map(lambda a: a.title.split('-', 1)[1], (x, y))
  31. return cmp(x, y)
  32. class GenreFeedAsync(feeds.GenreFeed):
  33. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  34. def __init__(self, *args, **kwargs):
  35. self.havegenre = False
  36. self.fetchinggenre = None
  37. feeds.GenreFeed.__init__(self, *args, **kwargs)
  38. def gotGenre(self, page):
  39. self.genre = page
  40. self.havegenre = True
  41. # Wake everyone up
  42. self.fetchinggenre.callback(1)
  43. def errGenre(self, failure):
  44. raise NotImplementedError, failure
  45. def fetch_genres(self):
  46. if self.havegenre:
  47. return self.genre
  48. if not self.fetchinggenre:
  49. # Need to start fetching
  50. getPage(self.genre_url.encode('ascii')) \
  51. .addCallbacks(self.gotGenre, self.errGenre)
  52. self.fetchinggenre = defer.Deferred()
  53. # Always raise this if we are waiting.
  54. raise self.fetchinggenre
  55. synchronized = ['fetch_genres', 'gotGenre', ]
  56. threadable.synchronize(GenreFeedAsync)
  57. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  58. def __init__(self, *args, **kwargs):
  59. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  60. self.shout_url = \
  61. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  62. self.genre
  63. self.havestations = False
  64. self.fetchingstations = None
  65. def gotStations(self, page):
  66. self.stations = page
  67. self.havestations = True
  68. # Wake everyone up
  69. self.fetchingstations.callback(1)
  70. def errStations(self, failure):
  71. raise NotImplementedError, failure
  72. def fetch_stations(self):
  73. if self.havestations:
  74. return self.stations
  75. if not self.fetchingstations:
  76. # Need to start fetching
  77. getPage(self.shout_url.encode('ascii')) \
  78. .addCallbacks(self.gotStations, self.errStations)
  79. self.fetchingstations = defer.Deferred()
  80. # Always raise this if we are waiting.
  81. raise self.fetchingstations
  82. synchronized = ['fetch_stations', 'gotStations', ]
  83. threadable.synchronize(ShoutcastFeedAsync)
  84. class ShoutTransfer(shoutcast.ShoutcastClient):
  85. userAgent = 'because you block user-agents'
  86. def __init__(self, request, passback):
  87. shoutcast.ShoutcastClient.__init__(self)
  88. self.request = request
  89. self.passback = passback
  90. request.registerProducer(self, 1)
  91. def connectionLost(self, reason):
  92. #traceback.print_stack()
  93. log.msg('connectionLost:', `self.request`, `self.passback`)
  94. shoutcast.ShoutcastClient.connectionLost(self, reason)
  95. if self.request:
  96. self.request.unregisterProducer()
  97. if self.passback:
  98. self.passback(self.request)
  99. self.passback = None
  100. self.request = None
  101. def handleResponse(self, response):
  102. #Drop the data, the parts get the important data, if we got
  103. #here, the connection closed and we are going to die anyways.
  104. pass
  105. def stopProducing(self):
  106. if self.transport is not None:
  107. shoutcast.ShoutcastClient.stopProducing(self)
  108. self.request = None
  109. self.passback = None
  110. def gotMP3Data(self, data):
  111. if self.request is not None:
  112. self.request.write(data)
  113. def gotMetaData(self, data):
  114. log.msg("meta:", `data`)
  115. pass
  116. # Remotely relay producer interface.
  117. def view_resumeProducing(self, issuer):
  118. self.resumeProducing()
  119. def view_pauseProducing(self, issuer):
  120. self.pauseProducing()
  121. def view_stopProducing(self, issuer):
  122. self.stopProducing()
  123. synchronized = ['resumeProducing', 'stopProducing']
  124. threadable.synchronize(ShoutTransfer)
  125. class ShoutProxy(resource.Resource):
  126. # We should probably expire the PLS after a while.
  127. # setResponseCode(self, code, message=None)
  128. # setHeader(self, k, v)
  129. # write(self, data)
  130. # finish(self)
  131. isLeaf = True
  132. def __init__(self, url, mt):
  133. resource.Resource.__init__(self)
  134. self.shoutpls = url
  135. self.mt = mt
  136. self.urls = None
  137. self.fetchingurls = False
  138. def dump_exc(self, failure, request):
  139. exc = StringIO.StringIO()
  140. failure.printBriefTraceback(file=exc)
  141. failure.printTraceback()
  142. exc.seek(0)
  143. request.setHeader('content-type', 'text/html')
  144. request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  145. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  146. '<pre>%s</pre>' % exc.read()).render(request))
  147. request.finish()
  148. def startNextConnection(self, request):
  149. url = self.urls[self.urlpos]
  150. self.urlpos = (self.urlpos + 1) % len(self.urls)
  151. scheme, host, port, path = _parse(url)
  152. #print `url`
  153. protocol.ClientCreator(reactor, ShoutTransfer, request,
  154. self.startNextConnection).connectTCP(host, port)
  155. def triggerdefered(self, fun):
  156. map(fun, self.afterurls)
  157. self.afterurls = None
  158. def gotPLS(self, page):
  159. self.fetchingurls = False
  160. pls = ConfigParser.SafeConfigParser()
  161. pls.readfp(StringIO.StringIO(page))
  162. # KCSM 91.1 doesn't provide a version
  163. #assert pls.getint(PLSsection, 'Version') == 2
  164. assert pls.has_option(PLSsection, 'numberofentries')
  165. cnt = pls.getint(PLSsection, 'numberofentries')
  166. self.urls = []
  167. for i in range(cnt):
  168. i += 1 # stupid one based arrays
  169. self.urls.append(pls.get(PLSsection,
  170. 'File%d' % i))
  171. #log.msg('pls urls:', self.urls)
  172. self.urlpos = random.randrange(len(self.urls))
  173. self.triggerdefered(lambda x: x.callback(True))
  174. def errPLS(self, failure):
  175. self.fetchingurls = False
  176. # XXX - retry?
  177. self.triggerdefered(lambda x: x.errback(failure))
  178. def processRequest(self, ign, request):
  179. self.startNextConnection(request)
  180. def errRequest(self, failure, request):
  181. self.dump_exc(failure, request)
  182. def render(self, request):
  183. request.setHeader('content-type', self.mt)
  184. # XXX - PS3 doesn't support streaming, this makes it think
  185. # that is has data, but it needs to d/l the entire thing.
  186. #request.setHeader('content-length', 1*1024*1024)
  187. if request.method == 'HEAD':
  188. return ''
  189. # need to start the state machine
  190. # a) fetch the playlist
  191. # b) choose a random starting point
  192. # c) connect to the server
  193. # d) select next server and goto c
  194. # return data
  195. if self.urls is None:
  196. if not self.fetchingurls:
  197. # Get the PLS
  198. self.fetchingurls = True
  199. # Not really sure if ascii is the correct one,
  200. # shouldn't getPage do proper escaping for me?
  201. self.afterurls = [ defer.Deferred() ]
  202. d = getPage(self.shoutpls.encode('ascii'))
  203. d.addCallback(self.gotPLS)
  204. d.addErrback(self.errPLS)
  205. else:
  206. self.afterurls.append(defer.Deferred())
  207. # Always add the callback if we don't have urls
  208. self.afterurls[-1].addCallbacks(self.processRequest,
  209. errback=self.errRequest, callbackArgs=(request, ),
  210. errbackArgs=(request, ))
  211. else:
  212. self.startNextConnection(request)
  213. # and make sure the connection doesn't get closed
  214. return server.NOT_DONE_YET
  215. synchronized = [ 'gotPLS', 'render', 'startNextConnection',
  216. 'triggerdefered', ]
  217. threadable.synchronize(ShoutProxy)
  218. class ShoutURL(AudioItem):
  219. def __init__(self, *args, **kwargs):
  220. url = kwargs.pop('url')
  221. mimetype = kwargs.pop('mimetype', 'audio/mpeg')
  222. bitrate = kwargs.pop('bitrate', None)
  223. kwargs['content'] = ShoutProxy(url, mimetype)
  224. AudioItem.__init__(self, *args, **kwargs)
  225. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  226. self.res = Resource(self.url, 'http-get:*:%s:*' % mimetype)
  227. #self.res = Resource(self.url + '/pcm', 'http-get:*:%s:*' % \
  228. # 'audio/x-wav')
  229. if bitrate is not None:
  230. self.bitrate = bitrate
  231. class ShoutFile(ShoutURL):
  232. def __init__(self, *args, **kwargs):
  233. file = kwargs.pop('file')
  234. kwargs['url'] = open(file).read().strip()
  235. ShoutURL.__init__(self, *args, **kwargs)
  236. class ShoutStation(ShoutURL):
  237. def __init__(self, *args, **kwargs):
  238. self.station = kwargs.pop('station')
  239. kwargs['url'] = self.station['PLS_URL']
  240. kwargs['mimetype'] = self.station['MimeType'].encode('ascii')
  241. kwargs['bitrate'] = self.station['Bitrate'] * 128 # 1024k / 8bit
  242. ShoutURL.__init__(self, *args, **kwargs)
  243. class ShoutGenre(MusicGenre):
  244. def __init__(self, *args, **kwargs):
  245. self.genre = kwargs['genre']
  246. del kwargs['genre']
  247. #self.feeds = ShoutcastFeedAsync(self.genre)
  248. self.feeds = feeds.ShoutcastFeed(self.genre)
  249. self.sl = None
  250. self.pathObjmap = {}
  251. MusicGenre.__init__(self, *args, **kwargs)
  252. def genStations(self, stations):
  253. ret = {}
  254. dupcnt = {}
  255. for i in stations:
  256. name = i['Name']
  257. if name in ret:
  258. # got a dup
  259. if name not in dupcnt:
  260. dupcnt[name] = 2
  261. ret['%s - %d' % (name, dupcnt[name])] = i
  262. dupcnt[name] += 1
  263. else:
  264. ret[name] = i
  265. return ret
  266. def checkUpdate(self):
  267. self.doUpdate()
  268. def doUpdate(self):
  269. #traceback.print_stack(file=log.logfile)
  270. stations = self.feeds.parse_stations()
  271. if stations == self.sl:
  272. return
  273. nl = self.genStations(stations)
  274. doupdate = False
  275. for i in self.pathObjmap.keys():
  276. if i not in nl:
  277. # delete
  278. doupdate = True
  279. self.cd.delItem(self.pathObjmap[i])
  280. del self.pathObjmap[i]
  281. for name, i in nl.iteritems():
  282. if name in self.pathObjmap:
  283. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  284. continue
  285. # Didn't match, readd
  286. self.cd.delItem(self.pathObjmap[name])
  287. del self.pathObjmap[name]
  288. doupdate = True
  289. self.pathObjmap[name] = self.cd.addItem(self.id,
  290. ShoutStation, '%sk-%s' % (i['Bitrate'], name),
  291. station = i)
  292. self.sl = stations
  293. # sort our children
  294. self.sort(lambda *a: stationwbitratecmp(*a))
  295. if doupdate:
  296. Container.doUpdate(self)
  297. class ShoutCast(Container):
  298. def __init__(self, *args, **kwargs):
  299. Container.__init__(self, *args, **kwargs)
  300. #self.genres = GenreFeedAsync()
  301. self.genres = feeds.GenreFeed()
  302. self.genre_list = None
  303. self.pathObjmap = {}
  304. def checkUpdate(self):
  305. self.doUpdate()
  306. def doUpdate(self):
  307. #traceback.print_stack(file=log.logfile)
  308. nl = self.genres.parse_genres()
  309. if nl == self.genre_list:
  310. return
  311. doupdate = False
  312. for i in self.pathObjmap.keys():
  313. if i not in nl:
  314. # delete
  315. doupdate = True
  316. self.cd.delItem(self.pathObjmap[i])
  317. del self.pathObjmap[i]
  318. for i in nl:
  319. if i in self.pathObjmap:
  320. continue
  321. doupdate = True
  322. self.pathObjmap[i] = self.cd.addItem(self.id,
  323. ShoutGenre, i, genre=i)
  324. self.genre_list = nl
  325. # sort our children
  326. self.sort(lambda x, y: cmp(x.title, y.title))
  327. if doupdate:
  328. Container.doUpdate(self)
  329. def detectshoutcastfile(origpath, fobj):
  330. path = os.path.basename(origpath)
  331. if path == 'SHOUTcast Radio':
  332. return ShoutCast, { }
  333. ext = os.path.splitext(path)[1]
  334. if ext == '.scst':
  335. return ShoutFile, { 'file': origpath }
  336. return None, None
  337. registerklassfun(detectshoutcastfile)