A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
9.6 KiB

  1. #!/usr/bin/env python
  2. '''Shoutcast Radio Feed'''
  3. import ConfigParser
  4. import StringIO
  5. import os.path
  6. import random
  7. import traceback
  8. from py_shoutcast import *
  9. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  10. from FSStorage import registerklassfun
  11. from twisted.protocols import shoutcast
  12. from twisted.python import log, threadable
  13. from twisted.internet import defer, protocol, reactor
  14. from twisted.web import error, http, resource, server
  15. from twisted.web.client import getPage, _parse
  16. PLSsection = 'playlist'
  17. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  18. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  19. return False
  20. return True
  21. class GenreFeedAsync(feeds.GenreFeed):
  22. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  23. def __init__(self, *args, **kwargs):
  24. self.havegenre = False
  25. self.fetchinggenre = None
  26. feeds.GenreFeed.__init__(self, *args, **kwargs)
  27. def gotGenre(self, page):
  28. self.genre = page
  29. self.havegenre = True
  30. # Wake everyone up
  31. self.fetchinggenre.callback(None)
  32. def errGenre(self, failure):
  33. raise NotImplementedError, failure
  34. def fetch_genres(self):
  35. if self.havegenre:
  36. return self.genre
  37. if not self.fetchinggenre:
  38. # Need to start fetching
  39. getPage(self.genre_url.encode('ascii')) \
  40. .addCallbacks(self.gotGenre, self.errGenre)
  41. self.fetchinggenre = defer.Deferred()
  42. # Always raise this if we are waiting.
  43. raise self.fetchinggenre
  44. synchronized = ['fetch_genres', 'gotGenre', ]
  45. threadable.synchronize(GenreFeedAsync)
  46. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  47. def __init__(self, *args, **kwargs):
  48. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  49. self.shout_url = \
  50. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  51. self.genre
  52. self.havestations = False
  53. self.fetchingstations = None
  54. def gotStations(self, page):
  55. self.stations = page
  56. self.havestations = True
  57. # Wake everyone up
  58. self.fetchingstations.callback(None)
  59. def errStations(self, failure):
  60. raise NotImplementedError, failure
  61. def fetch_stations(self):
  62. if self.havestations:
  63. return self.stations
  64. if not self.fetchingstations:
  65. # Need to start fetching
  66. getPage(self.shout_url.encode('ascii')) \
  67. .addCallbacks(self.gotStations, self.errStations)
  68. self.fetchingstations = defer.Deferred()
  69. # Always raise this if we are waiting.
  70. raise self.fetchingstations
  71. synchronized = ['fetch_stations', 'gotStations', ]
  72. threadable.synchronize(ShoutcastFeedAsync)
  73. class ShoutTransfer(shoutcast.ShoutcastClient):
  74. def __init__(self, request, passback):
  75. shoutcast.ShoutcastClient.__init__(self)
  76. self.request = request
  77. self.passback = passback
  78. log.msg('ShoutTransfer:', `request`, `passback`)
  79. request.registerProducer(self, 1)
  80. def connectionLost(self, reason):
  81. log.msg('connectionLost:', `self.request`, `self.passback`)
  82. shoutcast.ShoutcastClient.connectionLost(self, reason)
  83. if self.request:
  84. self.request.unregisterProducer(self)
  85. if self.passback:
  86. self.passback(self.request)
  87. self.passback = None
  88. self.request = None
  89. def stopProducing(self):
  90. log.msg('stopProducing:', `self.request`, `self.passback`)
  91. shoutcast.ShoutcastClient.stopProducing(self)
  92. self.request = None
  93. self.passback = None
  94. def gotMP3Data(self, data):
  95. self.request.write(data)
  96. def gotMetaData(self, data):
  97. log.msg("meta:", `data`)
  98. pass
  99. # Remotely relay producer interface.
  100. def view_resumeProducing(self, issuer):
  101. self.resumeProducing()
  102. def view_pauseProducing(self, issuer):
  103. self.pauseProducing()
  104. def view_stopProducing(self, issuer):
  105. self.stopProducing()
  106. synchronized = ['resumeProducing', 'stopProducing']
  107. threadable.synchronize(ShoutTransfer)
  108. class ShoutProxy(resource.Resource):
  109. # We should probably expire the PLS after a while.
  110. # setResponseCode(self, code, message=None)
  111. # setHeader(self, k, v)
  112. # write(self, data)
  113. # finish(self)
  114. isLeaf = True
  115. def __init__(self, url, mt):
  116. resource.Resource.__init__(self)
  117. self.shoutpls = url
  118. self.mt = mt
  119. self.urls = None
  120. self.fetchingurls = False
  121. def dump_exc(self):
  122. exc = StringIO()
  123. traceback.print_exc(file=exc)
  124. exc.seek(0)
  125. self.request.setHeader('content-type', 'text/html')
  126. self.request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  127. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  128. '<pre>%s</pre>' % exc.read()).render(self.request))
  129. self.request.finish()
  130. self.request = None
  131. def startNextConnection(self, request):
  132. url = self.urls[self.urlpos]
  133. self.urlpos = (self.urlpos + 1) % len(self.urls)
  134. log.msg('starting transfer for:', `url`)
  135. scheme, host, port, path = _parse(url)
  136. protocol.ClientCreator(reactor, ShoutTransfer, request,
  137. self.startNextConnection).connectTCP(host, port)
  138. def gotPLS(self, page):
  139. try:
  140. pls = ConfigParser.SafeConfigParser()
  141. pls.readfp(StringIO.StringIO(page))
  142. assert pls.getint(PLSsection, 'Version') == 2
  143. assert pls.has_option(PLSsection, 'numberofentries')
  144. cnt = pls.getint(PLSsection, 'numberofentries')
  145. self.urls = []
  146. for i in range(cnt):
  147. i += 1 # stupid one based arrays
  148. self.urls.append(pls.get(PLSsection,
  149. 'File%d' % i))
  150. self.urlpos = random.randrange(len(self.urls))
  151. log.msg('urls:', `self.urls`, 'urlpos:', self.urlpos)
  152. except:
  153. self.dump_exc()
  154. return
  155. log.msg('waking up reqests')
  156. self.afterurls.callback('foo')
  157. self.afterurls = None
  158. def errPLS(self, failure):
  159. log.msg('errPLS:', `failure`)
  160. self.request.write(failure.render(self.request))
  161. self.request.finish()
  162. self.request = None
  163. def processRequest(self, ign, request):
  164. log.msg('processRequest:', `request`)
  165. self.startNextConnection(request)
  166. def render(self, request):
  167. request.setHeader('content-type', self.mt)
  168. if request.method == 'HEAD':
  169. return ''
  170. # need to start the state machine
  171. # a) fetch the playlist
  172. # b) choose a random starting point
  173. # c) connect to the server
  174. # d) select next server and goto c
  175. # return data
  176. if self.urls is None:
  177. if not self.fetchingurls:
  178. # Get the PLS
  179. self.fetchingurls = True
  180. log.msg('starting page fetch:', `self.shoutpls`)
  181. # Not really sure if ascii is the correct one,
  182. # shouldn't getPage do proper escaping for me?
  183. getPage(self.shoutpls.encode('ascii')) \
  184. .addCallbacks(self.gotPLS, self.errPLS)
  185. self.afterurls = defer.Deferred()
  186. # Always add the callback if we don't have urls
  187. self.afterurls.addCallback(self.processRequest, request)
  188. else:
  189. self.startNextConnection(request)
  190. # and make sure the connection doesn't get closed
  191. return server.NOT_DONE_YET
  192. synchronized = [ 'render', 'gotPLS', 'startNextConnection' ]
  193. threadable.synchronize(ShoutProxy)
  194. class ShoutStation(AudioItem):
  195. def __init__(self, *args, **kwargs):
  196. self.station = kwargs['station']
  197. del kwargs['station']
  198. kwargs['content'] = ShoutProxy(self.station['PLS_URL'],
  199. self.station['MimeType'].encode('ascii'))
  200. AudioItem.__init__(self, *args, **kwargs)
  201. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  202. self.res = Resource(self.url, 'http-get:*:%s:*' % \
  203. self.station['MimeType'].encode('ascii'))
  204. self.bitrate = self.station['Bitrate'] * 128 # 1024k / 8bit
  205. class ShoutGenre(MusicGenre):
  206. def __init__(self, *args, **kwargs):
  207. log.msg('shoutcast genre created')
  208. self.genre = kwargs['genre']
  209. del kwargs['genre']
  210. self.feeds = ShoutcastFeedAsync(self.genre)
  211. self.sl = None
  212. self.pathObjmap = {}
  213. MusicGenre.__init__(self, *args, **kwargs)
  214. def genStations(self, stations):
  215. ret = {}
  216. dupcnt = {}
  217. for i in stations:
  218. name = i['Name']
  219. if name in ret:
  220. # got a dup
  221. if name not in dupcnt:
  222. dupcnt[name] = 2
  223. ret['%s - %d' % (name, dupcnt[name])] = i
  224. dupcnt[name] += 1
  225. else:
  226. ret[name] = i
  227. return ret
  228. def checkUpdate(self):
  229. self.doUpdate()
  230. return self
  231. def doUpdate(self):
  232. #traceback.print_stack(file=log.logfile)
  233. stations = self.feeds.parse_stations()
  234. if stations == self.sl:
  235. return
  236. nl = self.genStations(stations)
  237. doupdate = False
  238. for i in self.pathObjmap.keys():
  239. if i not in nl:
  240. # delete
  241. doupdate = True
  242. self.cd.delItem(self.pathObjmap[i])
  243. del self.pathObjmap[i]
  244. for name, i in nl.iteritems():
  245. if name in self.pathObjmap:
  246. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  247. continue
  248. # Didn't match, readd
  249. self.cd.delItem(self.pathObjmap[name])
  250. del self.pathObjmap[name]
  251. doupdate = True
  252. self.pathObjmap[name] = self.cd.addItem(self.id,
  253. ShoutStation, name, station = i)
  254. self.sl = stations
  255. # sort our children
  256. self.sort(lambda x, y: cmp(x.title, y.title))
  257. if doupdate:
  258. Container.doUpdate(self)
  259. class ShoutCast(Container):
  260. def __init__(self, *args, **kwargs):
  261. log.msg('shoutcast created')
  262. Container.__init__(self, *args, **kwargs)
  263. self.genres = GenreFeedAsync()
  264. self.genre_list = None
  265. self.pathObjmap = {}
  266. def checkUpdate(self):
  267. self.doUpdate()
  268. return self
  269. def doUpdate(self):
  270. #traceback.print_stack(file=log.logfile)
  271. nl = self.genres.parse_genres()[:30]
  272. if nl == self.genre_list:
  273. return
  274. doupdate = False
  275. for i in self.pathObjmap.keys():
  276. if i not in nl:
  277. # delete
  278. doupdate = True
  279. self.cd.delItem(self.pathObjmap[i])
  280. del self.pathObjmap[i]
  281. for i in nl:
  282. if i in self.pathObjmap:
  283. continue
  284. doupdate = True
  285. self.pathObjmap[i] = self.cd.addItem(self.id,
  286. ShoutGenre, i, genre = i)
  287. self.genre_list = nl
  288. # sort our children
  289. self.sort(lambda x, y: cmp(x.title, y.title))
  290. if doupdate:
  291. Container.doUpdate(self)
  292. def detectshoutcastfile(path, fobj):
  293. path = os.path.basename(path)
  294. if path == 'SHOUTcast Radio':
  295. log.msg('matched shoutcast')
  296. return ShoutCast, { }
  297. return None, None
  298. registerklassfun(detectshoutcastfile)