A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

410 lines
11 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006 John-Mark Gurney <jmg@funkthat.com>
  3. '''Shoutcast Radio Feed'''
  4. __version__ = '$Change$'
  5. # $Id$
  6. # The handling of defers and state in this module is not very good. It
  7. # needs some work to ensure that error cases are properly handled. What
  8. # do we do if we get no URLs for a PLS? Do we delete ourselves just to be
  9. # readded (when we get PLS refeshing working)? Do we set a content-length
  10. # to zero till we get one?
  11. import ConfigParser
  12. import StringIO
  13. import os.path
  14. import random
  15. import urlparse
  16. import traceback
  17. from py_shoutcast import *
  18. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  19. from FSStorage import registerklassfun
  20. from twisted.protocols import shoutcast
  21. from twisted.python import log, threadable
  22. from twisted.internet import defer, protocol, reactor
  23. from twisted.web import error, http, resource, server
  24. from twisted.web.client import getPage, _parse
  25. PLSsection = 'playlist'
  26. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  27. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  28. return False
  29. return True
  30. def stationwbitratecmp(x, y):
  31. x, y = map(lambda a: a.title.split('-', 1)[1], (x, y))
  32. return cmp(x, y)
  33. class GenreFeedAsync(feeds.GenreFeed):
  34. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  35. def __init__(self, *args, **kwargs):
  36. self.havegenre = False
  37. self.fetchinggenre = None
  38. feeds.GenreFeed.__init__(self, *args, **kwargs)
  39. def gotGenre(self, page):
  40. self.genre = page
  41. self.havegenre = True
  42. # Wake everyone up
  43. self.fetchinggenre.callback(1)
  44. def errGenre(self, failure):
  45. raise NotImplementedError, failure
  46. def fetch_genres(self):
  47. if self.havegenre:
  48. return self.genre
  49. if not self.fetchinggenre:
  50. # Need to start fetching
  51. getPage(self.genre_url.encode('ascii')) \
  52. .addCallbacks(self.gotGenre, self.errGenre)
  53. self.fetchinggenre = defer.Deferred()
  54. # Always raise this if we are waiting.
  55. raise self.fetchinggenre
  56. synchronized = ['fetch_genres', 'gotGenre', ]
  57. threadable.synchronize(GenreFeedAsync)
  58. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  59. def __init__(self, *args, **kwargs):
  60. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  61. self.shout_url = \
  62. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  63. self.genre
  64. self.havestations = False
  65. self.fetchingstations = None
  66. def gotStations(self, page):
  67. self.stations = page
  68. self.havestations = True
  69. # Wake everyone up
  70. self.fetchingstations.callback(1)
  71. def errStations(self, failure):
  72. raise NotImplementedError, failure
  73. def fetch_stations(self):
  74. if self.havestations:
  75. return self.stations
  76. if not self.fetchingstations:
  77. # Need to start fetching
  78. getPage(self.shout_url.encode('ascii')) \
  79. .addCallbacks(self.gotStations, self.errStations)
  80. self.fetchingstations = defer.Deferred()
  81. # Always raise this if we are waiting.
  82. raise self.fetchingstations
  83. synchronized = ['fetch_stations', 'gotStations', ]
  84. threadable.synchronize(ShoutcastFeedAsync)
  85. class ShoutTransfer(shoutcast.ShoutcastClient):
  86. userAgent = 'because you block user-agents'
  87. def __init__(self, request, passback):
  88. shoutcast.ShoutcastClient.__init__(self)
  89. self.request = request
  90. self.passback = passback
  91. request.registerProducer(self, 1)
  92. def connectionLost(self, reason):
  93. #traceback.print_stack()
  94. log.msg('connectionLost:', `self.request`, `self.passback`)
  95. shoutcast.ShoutcastClient.connectionLost(self, reason)
  96. if self.request:
  97. self.request.unregisterProducer()
  98. if self.passback:
  99. self.passback(self.request)
  100. self.passback = None
  101. self.request = None
  102. def handleResponse(self, response):
  103. #Drop the data, the parts get the important data, if we got
  104. #here, the connection closed and we are going to die anyways.
  105. pass
  106. def stopProducing(self):
  107. if self.transport is not None:
  108. shoutcast.ShoutcastClient.stopProducing(self)
  109. self.request = None
  110. self.passback = None
  111. def gotMP3Data(self, data):
  112. if self.request is not None:
  113. self.request.write(data)
  114. def gotMetaData(self, data):
  115. log.msg("meta:", `data`)
  116. pass
  117. # Remotely relay producer interface.
  118. def view_resumeProducing(self, issuer):
  119. self.resumeProducing()
  120. def view_pauseProducing(self, issuer):
  121. self.pauseProducing()
  122. def view_stopProducing(self, issuer):
  123. self.stopProducing()
  124. synchronized = ['resumeProducing', 'stopProducing']
  125. threadable.synchronize(ShoutTransfer)
  126. class ShoutProxy(resource.Resource):
  127. # We should probably expire the PLS after a while.
  128. # setResponseCode(self, code, message=None)
  129. # setHeader(self, k, v)
  130. # write(self, data)
  131. # finish(self)
  132. isLeaf = True
  133. def __init__(self, url, mt):
  134. resource.Resource.__init__(self)
  135. self.shoutpls = url
  136. self.mt = mt
  137. self.urls = None
  138. self.fetchingurls = False
  139. def dump_exc(self):
  140. exc = StringIO.StringIO()
  141. traceback.print_exc(file=exc)
  142. exc.seek(0)
  143. self.request.setHeader('content-type', 'text/html')
  144. self.request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  145. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  146. '<pre>%s</pre>' % exc.read()).render(self.request))
  147. self.request.finish()
  148. self.request = None
  149. def startNextConnection(self, request):
  150. url = self.urls[self.urlpos]
  151. self.urlpos = (self.urlpos + 1) % len(self.urls)
  152. scheme, host, port, path = _parse(url)
  153. #print `url`
  154. protocol.ClientCreator(reactor, ShoutTransfer, request,
  155. self.startNextConnection).connectTCP(host, port)
  156. def triggerdefered(self, fun):
  157. map(fun, self.afterurls)
  158. self.afterurls = None
  159. def gotPLS(self, page):
  160. self.fetchingurls = False
  161. try:
  162. pls = ConfigParser.SafeConfigParser()
  163. pls.readfp(StringIO.StringIO(page))
  164. assert pls.getint(PLSsection, 'Version') == 2
  165. assert pls.has_option(PLSsection, 'numberofentries')
  166. cnt = pls.getint(PLSsection, 'numberofentries')
  167. self.urls = []
  168. for i in range(cnt):
  169. i += 1 # stupid one based arrays
  170. self.urls.append(pls.get(PLSsection,
  171. 'File%d' % i))
  172. #log.msg('pls urls:', self.urls)
  173. self.urlpos = random.randrange(len(self.urls))
  174. except:
  175. self.dump_exc()
  176. self.urls = None
  177. self.triggerdefered(lambda x: x.errback(1))
  178. return
  179. self.triggerdefered(lambda x: x.callback(1))
  180. def errPLS(self, failure):
  181. self.fetchingurls = False
  182. self.triggerdefered(lambda x: x.errback(1))
  183. def processRequest(self, ign, request):
  184. self.startNextConnection(request)
  185. def errRequest(self, failure, request):
  186. request.write(failure.render(self.request))
  187. request.finish()
  188. def render(self, request):
  189. request.setHeader('content-type', self.mt)
  190. # XXX - PS3 doesn't support streaming, this makes it think
  191. # that is has data, but it needs to d/l the entire thing.
  192. #request.setHeader('content-length', 1*1024*1024)
  193. if request.method == 'HEAD':
  194. return ''
  195. # need to start the state machine
  196. # a) fetch the playlist
  197. # b) choose a random starting point
  198. # c) connect to the server
  199. # d) select next server and goto c
  200. # return data
  201. if self.urls is None:
  202. if not self.fetchingurls:
  203. # Get the PLS
  204. self.fetchingurls = True
  205. # Not really sure if ascii is the correct one,
  206. # shouldn't getPage do proper escaping for me?
  207. getPage(self.shoutpls.encode('ascii')) \
  208. .addCallbacks(self.gotPLS, self.errPLS)
  209. self.afterurls = [ defer.Deferred() ]
  210. else:
  211. self.afterurls.append(defer.Deferred())
  212. # Always add the callback if we don't have urls
  213. self.afterurls[-1].addCallbacks(self.processRequest,
  214. errback=self.errRequest, callbackArgs=(request, ),
  215. errbackArgs=(request, ))
  216. else:
  217. self.startNextConnection(request)
  218. # and make sure the connection doesn't get closed
  219. return server.NOT_DONE_YET
  220. synchronized = [ 'gotPLS', 'render', 'startNextConnection',
  221. 'triggerdefered', ]
  222. threadable.synchronize(ShoutProxy)
  223. class ShoutStation(AudioItem):
  224. def __init__(self, *args, **kwargs):
  225. self.station = kwargs['station']
  226. del kwargs['station']
  227. kwargs['content'] = ShoutProxy(self.station['PLS_URL'],
  228. self.station['MimeType'].encode('ascii'))
  229. AudioItem.__init__(self, *args, **kwargs)
  230. self.url = urlparse.urljoin(self.cd.urlbase, self.id)
  231. self.res = Resource(self.url, 'http-get:*:%s:*' % \
  232. self.station['MimeType'].encode('ascii'))
  233. #self.res = Resource(urlparse.urljoin(self.url, 'pcm'),
  234. # 'http-get:*:%s:*' % 'audio/x-wav')
  235. self.bitrate = self.station['Bitrate'] * 128 # 1024k / 8bit
  236. class ShoutGenre(MusicGenre):
  237. def __init__(self, *args, **kwargs):
  238. self.genre = kwargs['genre']
  239. del kwargs['genre']
  240. self.feeds = ShoutcastFeedAsync(self.genre)
  241. self.sl = None
  242. self.pathObjmap = {}
  243. MusicGenre.__init__(self, *args, **kwargs)
  244. def genStations(self, stations):
  245. ret = {}
  246. dupcnt = {}
  247. for i in stations:
  248. name = i['Name']
  249. if name in ret:
  250. # got a dup
  251. if name not in dupcnt:
  252. dupcnt[name] = 2
  253. ret['%s - %d' % (name, dupcnt[name])] = i
  254. dupcnt[name] += 1
  255. else:
  256. ret[name] = i
  257. return ret
  258. def checkUpdate(self):
  259. self.doUpdate()
  260. return self
  261. def doUpdate(self):
  262. #traceback.print_stack(file=log.logfile)
  263. stations = self.feeds.parse_stations()
  264. if stations == self.sl:
  265. return
  266. nl = self.genStations(stations)
  267. doupdate = False
  268. for i in self.pathObjmap.keys():
  269. if i not in nl:
  270. # delete
  271. doupdate = True
  272. self.cd.delItem(self.pathObjmap[i])
  273. del self.pathObjmap[i]
  274. for name, i in nl.iteritems():
  275. if name in self.pathObjmap:
  276. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  277. continue
  278. # Didn't match, readd
  279. self.cd.delItem(self.pathObjmap[name])
  280. del self.pathObjmap[name]
  281. doupdate = True
  282. self.pathObjmap[name] = self.cd.addItem(self.id,
  283. ShoutStation, '%sk-%s' % (i['Bitrate'], name),
  284. station = i)
  285. self.sl = stations
  286. # sort our children
  287. self.sort(lambda *a: stationwbitratecmp(*a))
  288. if doupdate:
  289. Container.doUpdate(self)
  290. class ShoutCast(Container):
  291. def __init__(self, *args, **kwargs):
  292. Container.__init__(self, *args, **kwargs)
  293. self.genres = GenreFeedAsync()
  294. self.genre_list = None
  295. self.pathObjmap = {}
  296. def checkUpdate(self):
  297. self.doUpdate()
  298. return self
  299. def doUpdate(self):
  300. #traceback.print_stack(file=log.logfile)
  301. nl = self.genres.parse_genres()
  302. if nl == self.genre_list:
  303. return
  304. doupdate = False
  305. for i in self.pathObjmap.keys():
  306. if i not in nl:
  307. # delete
  308. doupdate = True
  309. self.cd.delItem(self.pathObjmap[i])
  310. del self.pathObjmap[i]
  311. for i in nl:
  312. if i in self.pathObjmap:
  313. continue
  314. doupdate = True
  315. self.pathObjmap[i] = self.cd.addItem(self.id,
  316. ShoutGenre, i, genre = i)
  317. self.genre_list = nl
  318. # sort our children
  319. self.sort(lambda x, y: cmp(x.title, y.title))
  320. if doupdate:
  321. Container.doUpdate(self)
  322. def detectshoutcastfile(path, fobj):
  323. path = os.path.basename(path)
  324. if path == 'SHOUTcast Radio':
  325. return ShoutCast, { }
  326. return None, None
  327. registerklassfun(detectshoutcastfile)