A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
9.4 KiB

  1. #!/usr/bin/env python
  2. '''Shoutcast Radio Feed'''
  3. import ConfigParser
  4. import StringIO
  5. import os.path
  6. import random
  7. import traceback
  8. from py_shoutcast import *
  9. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  10. from FSStorage import registerklassfun
  11. from twisted.protocols import shoutcast
  12. from twisted.python import log, threadable
  13. from twisted.internet import defer, protocol, reactor
  14. from twisted.web import error, http, resource, server
  15. from twisted.web.client import getPage, _parse
  16. PLSsection = 'playlist'
  17. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  18. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  19. return False
  20. return True
  21. class GenreFeedAsync(feeds.GenreFeed):
  22. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  23. def __init__(self, *args, **kwargs):
  24. self.havegenre = False
  25. self.fetchinggenre = None
  26. feeds.GenreFeed.__init__(self, *args, **kwargs)
  27. def gotGenre(self, page):
  28. self.genre = page
  29. self.havegenre = True
  30. # Wake everyone up
  31. self.fetchinggenre.callback(None)
  32. def errGenre(self, failure):
  33. raise NotImplementedError, failure
  34. def fetch_genres(self):
  35. if self.havegenre:
  36. return self.genre
  37. if not self.fetchinggenre:
  38. # Need to start fetching
  39. getPage(self.genre_url.encode('ascii')) \
  40. .addCallbacks(self.gotGenre, self.errGenre)
  41. self.fetchinggenre = defer.Deferred()
  42. # Always raise this if we are waiting.
  43. raise self.fetchinggenre
  44. synchronized = ['fetch_genres', 'gotGenre', ]
  45. threadable.synchronize(GenreFeedAsync)
  46. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  47. def __init__(self, *args, **kwargs):
  48. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  49. self.shout_url = \
  50. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  51. self.genre
  52. self.havestations = False
  53. self.fetchingstations = None
  54. def gotStations(self, page):
  55. self.stations = page
  56. self.havestations = True
  57. # Wake everyone up
  58. self.fetchingstations.callback(None)
  59. def errStations(self, failure):
  60. raise NotImplementedError, failure
  61. def fetch_stations(self):
  62. if self.havestations:
  63. return self.stations
  64. if not self.fetchingstations:
  65. # Need to start fetching
  66. getPage(self.shout_url.encode('ascii')) \
  67. .addCallbacks(self.gotStations, self.errStations)
  68. self.fetchingstations = defer.Deferred()
  69. # Always raise this if we are waiting.
  70. raise self.fetchingstations
  71. synchronized = ['fetch_stations', 'gotStations', ]
  72. threadable.synchronize(ShoutcastFeedAsync)
  73. class ShoutTransfer(shoutcast.ShoutcastClient):
  74. def __init__(self, request, passback):
  75. shoutcast.ShoutcastClient.__init__(self)
  76. self.request = request
  77. self.passback = passback
  78. request.registerProducer(self, 1)
  79. def connectionLost(self, reason):
  80. log.msg('connectionLost:', `self.request`, `self.passback`)
  81. shoutcast.ShoutcastClient.connectionLost(self, reason)
  82. if self.request:
  83. self.request.unregisterProducer()
  84. if self.passback:
  85. self.passback(self.request)
  86. self.passback = None
  87. self.request = None
  88. def handleResponse(self, response):
  89. #Drop the data, the parts get the important data, if we got
  90. #here, the connection closed and we are going to die anyways.
  91. pass
  92. def stopProducing(self):
  93. shoutcast.ShoutcastClient.stopProducing(self)
  94. self.request = None
  95. self.passback = None
  96. def gotMP3Data(self, data):
  97. self.request.write(data)
  98. def gotMetaData(self, data):
  99. log.msg("meta:", `data`)
  100. pass
  101. # Remotely relay producer interface.
  102. def view_resumeProducing(self, issuer):
  103. self.resumeProducing()
  104. def view_pauseProducing(self, issuer):
  105. self.pauseProducing()
  106. def view_stopProducing(self, issuer):
  107. self.stopProducing()
  108. synchronized = ['resumeProducing', 'stopProducing']
  109. threadable.synchronize(ShoutTransfer)
  110. class ShoutProxy(resource.Resource):
  111. # We should probably expire the PLS after a while.
  112. # setResponseCode(self, code, message=None)
  113. # setHeader(self, k, v)
  114. # write(self, data)
  115. # finish(self)
  116. isLeaf = True
  117. def __init__(self, url, mt):
  118. resource.Resource.__init__(self)
  119. self.shoutpls = url
  120. self.mt = mt
  121. self.urls = None
  122. self.fetchingurls = False
  123. def dump_exc(self):
  124. exc = StringIO()
  125. traceback.print_exc(file=exc)
  126. exc.seek(0)
  127. self.request.setHeader('content-type', 'text/html')
  128. self.request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  129. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  130. '<pre>%s</pre>' % exc.read()).render(self.request))
  131. self.request.finish()
  132. self.request = None
  133. def startNextConnection(self, request):
  134. url = self.urls[self.urlpos]
  135. self.urlpos = (self.urlpos + 1) % len(self.urls)
  136. scheme, host, port, path = _parse(url)
  137. protocol.ClientCreator(reactor, ShoutTransfer, request,
  138. self.startNextConnection).connectTCP(host, port)
  139. def gotPLS(self, page):
  140. try:
  141. pls = ConfigParser.SafeConfigParser()
  142. pls.readfp(StringIO.StringIO(page))
  143. assert pls.getint(PLSsection, 'Version') == 2
  144. assert pls.has_option(PLSsection, 'numberofentries')
  145. cnt = pls.getint(PLSsection, 'numberofentries')
  146. self.urls = []
  147. for i in range(cnt):
  148. i += 1 # stupid one based arrays
  149. self.urls.append(pls.get(PLSsection,
  150. 'File%d' % i))
  151. self.urlpos = random.randrange(len(self.urls))
  152. except:
  153. self.dump_exc()
  154. return
  155. self.afterurls.callback(None)
  156. self.afterurls = None
  157. def errPLS(self, failure):
  158. self.request.write(failure.render(self.request))
  159. self.request.finish()
  160. self.request = None
  161. def processRequest(self, ign, request):
  162. self.startNextConnection(request)
  163. def render(self, request):
  164. request.setHeader('content-type', self.mt)
  165. if request.method == 'HEAD':
  166. return ''
  167. # need to start the state machine
  168. # a) fetch the playlist
  169. # b) choose a random starting point
  170. # c) connect to the server
  171. # d) select next server and goto c
  172. # return data
  173. if self.urls is None:
  174. if not self.fetchingurls:
  175. # Get the PLS
  176. self.fetchingurls = True
  177. # Not really sure if ascii is the correct one,
  178. # shouldn't getPage do proper escaping for me?
  179. getPage(self.shoutpls.encode('ascii')) \
  180. .addCallbacks(self.gotPLS, self.errPLS)
  181. self.afterurls = defer.Deferred()
  182. # Always add the callback if we don't have urls
  183. self.afterurls.addCallback(self.processRequest, request)
  184. else:
  185. self.startNextConnection(request)
  186. # and make sure the connection doesn't get closed
  187. return server.NOT_DONE_YET
  188. synchronized = [ 'render', 'gotPLS', 'startNextConnection' ]
  189. threadable.synchronize(ShoutProxy)
  190. class ShoutStation(AudioItem):
  191. def __init__(self, *args, **kwargs):
  192. self.station = kwargs['station']
  193. del kwargs['station']
  194. kwargs['content'] = ShoutProxy(self.station['PLS_URL'],
  195. self.station['MimeType'].encode('ascii'))
  196. AudioItem.__init__(self, *args, **kwargs)
  197. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  198. self.res = Resource(self.url, 'http-get:*:%s:*' % \
  199. self.station['MimeType'].encode('ascii'))
  200. self.bitrate = self.station['Bitrate'] * 128 # 1024k / 8bit
  201. def stationwbitratecmp(x, y):
  202. x, y = map(lambda x: x.split('-', 1)[1], (x, y))
  203. return cmp(x, y)
  204. class ShoutGenre(MusicGenre):
  205. def __init__(self, *args, **kwargs):
  206. self.genre = kwargs['genre']
  207. del kwargs['genre']
  208. self.feeds = ShoutcastFeedAsync(self.genre)
  209. self.sl = None
  210. self.pathObjmap = {}
  211. MusicGenre.__init__(self, *args, **kwargs)
  212. def genStations(self, stations):
  213. ret = {}
  214. dupcnt = {}
  215. for i in stations:
  216. name = i['Name']
  217. if name in ret:
  218. # got a dup
  219. if name not in dupcnt:
  220. dupcnt[name] = 2
  221. ret['%s - %d' % (name, dupcnt[name])] = i
  222. dupcnt[name] += 1
  223. else:
  224. ret[name] = i
  225. return ret
  226. def checkUpdate(self):
  227. self.doUpdate()
  228. return self
  229. def doUpdate(self):
  230. #traceback.print_stack(file=log.logfile)
  231. stations = self.feeds.parse_stations()
  232. if stations == self.sl:
  233. return
  234. nl = self.genStations(stations)
  235. doupdate = False
  236. for i in self.pathObjmap.keys():
  237. if i not in nl:
  238. # delete
  239. doupdate = True
  240. self.cd.delItem(self.pathObjmap[i])
  241. del self.pathObjmap[i]
  242. for name, i in nl.iteritems():
  243. if name in self.pathObjmap:
  244. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  245. continue
  246. # Didn't match, readd
  247. self.cd.delItem(self.pathObjmap[name])
  248. del self.pathObjmap[name]
  249. doupdate = True
  250. self.pathObjmap[name] = self.cd.addItem(self.id,
  251. ShoutStation, '%sk-%s' % (i['Bitrate'], name),
  252. station = i)
  253. self.sl = stations
  254. # sort our children
  255. self.sort(lambda x, y: cmp(x.title, y.title))
  256. if doupdate:
  257. Container.doUpdate(self)
  258. class ShoutCast(Container):
  259. def __init__(self, *args, **kwargs):
  260. Container.__init__(self, *args, **kwargs)
  261. self.genres = GenreFeedAsync()
  262. self.genre_list = None
  263. self.pathObjmap = {}
  264. def checkUpdate(self):
  265. self.doUpdate()
  266. return self
  267. def doUpdate(self):
  268. #traceback.print_stack(file=log.logfile)
  269. nl = self.genres.parse_genres()[:30]
  270. if nl == self.genre_list:
  271. return
  272. doupdate = False
  273. for i in self.pathObjmap.keys():
  274. if i not in nl:
  275. # delete
  276. doupdate = True
  277. self.cd.delItem(self.pathObjmap[i])
  278. del self.pathObjmap[i]
  279. for i in nl:
  280. if i in self.pathObjmap:
  281. continue
  282. doupdate = True
  283. self.pathObjmap[i] = self.cd.addItem(self.id,
  284. ShoutGenre, i, genre = i)
  285. self.genre_list = nl
  286. # sort our children
  287. self.sort(lambda x, y: cmp(x.title, y.title))
  288. if doupdate:
  289. Container.doUpdate(self)
  290. def detectshoutcastfile(path, fobj):
  291. path = os.path.basename(path)
  292. if path == 'SHOUTcast Radio':
  293. return ShoutCast, { }
  294. return None, None
  295. registerklassfun(detectshoutcastfile)