A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
9.4 KiB

  1. #!/usr/bin/env python
  2. '''Shoutcast Radio Feed'''
  3. import ConfigParser
  4. import StringIO
  5. import os.path
  6. import random
  7. import traceback
  8. from py_shoutcast import *
  9. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  10. from FSStorage import registerklassfun
  11. from twisted.protocols import shoutcast
  12. from twisted.python import log, threadable
  13. from twisted.internet import defer, protocol, reactor
  14. from twisted.web import error, http, resource, server
  15. from twisted.web.client import getPage, _parse
  16. PLSsection = 'playlist'
  17. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  18. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  19. return False
  20. return True
  21. def stationwbitratecmp(x, y):
  22. x, y = map(lambda a: a.title.split('-', 1)[1], (x, y))
  23. return cmp(x, y)
  24. class GenreFeedAsync(feeds.GenreFeed):
  25. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  26. def __init__(self, *args, **kwargs):
  27. self.havegenre = False
  28. self.fetchinggenre = None
  29. feeds.GenreFeed.__init__(self, *args, **kwargs)
  30. def gotGenre(self, page):
  31. self.genre = page
  32. self.havegenre = True
  33. # Wake everyone up
  34. self.fetchinggenre.callback(1)
  35. def errGenre(self, failure):
  36. raise NotImplementedError, failure
  37. def fetch_genres(self):
  38. if self.havegenre:
  39. return self.genre
  40. if not self.fetchinggenre:
  41. # Need to start fetching
  42. getPage(self.genre_url.encode('ascii')) \
  43. .addCallbacks(self.gotGenre, self.errGenre)
  44. self.fetchinggenre = defer.Deferred()
  45. # Always raise this if we are waiting.
  46. raise self.fetchinggenre
  47. synchronized = ['fetch_genres', 'gotGenre', ]
  48. threadable.synchronize(GenreFeedAsync)
  49. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  50. def __init__(self, *args, **kwargs):
  51. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  52. self.shout_url = \
  53. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  54. self.genre
  55. self.havestations = False
  56. self.fetchingstations = None
  57. def gotStations(self, page):
  58. self.stations = page
  59. self.havestations = True
  60. # Wake everyone up
  61. self.fetchingstations.callback(1)
  62. def errStations(self, failure):
  63. raise NotImplementedError, failure
  64. def fetch_stations(self):
  65. if self.havestations:
  66. return self.stations
  67. if not self.fetchingstations:
  68. # Need to start fetching
  69. getPage(self.shout_url.encode('ascii')) \
  70. .addCallbacks(self.gotStations, self.errStations)
  71. self.fetchingstations = defer.Deferred()
  72. # Always raise this if we are waiting.
  73. raise self.fetchingstations
  74. synchronized = ['fetch_stations', 'gotStations', ]
  75. threadable.synchronize(ShoutcastFeedAsync)
  76. class ShoutTransfer(shoutcast.ShoutcastClient):
  77. def __init__(self, request, passback):
  78. shoutcast.ShoutcastClient.__init__(self)
  79. self.request = request
  80. self.passback = passback
  81. request.registerProducer(self, 1)
  82. def connectionLost(self, reason):
  83. log.msg('connectionLost:', `self.request`, `self.passback`)
  84. shoutcast.ShoutcastClient.connectionLost(self, reason)
  85. if self.request:
  86. self.request.unregisterProducer()
  87. if self.passback:
  88. self.passback(self.request)
  89. self.passback = None
  90. self.request = None
  91. def handleResponse(self, response):
  92. #Drop the data, the parts get the important data, if we got
  93. #here, the connection closed and we are going to die anyways.
  94. pass
  95. def stopProducing(self):
  96. shoutcast.ShoutcastClient.stopProducing(self)
  97. self.request = None
  98. self.passback = None
  99. def gotMP3Data(self, data):
  100. self.request.write(data)
  101. def gotMetaData(self, data):
  102. log.msg("meta:", `data`)
  103. pass
  104. # Remotely relay producer interface.
  105. def view_resumeProducing(self, issuer):
  106. self.resumeProducing()
  107. def view_pauseProducing(self, issuer):
  108. self.pauseProducing()
  109. def view_stopProducing(self, issuer):
  110. self.stopProducing()
  111. synchronized = ['resumeProducing', 'stopProducing']
  112. threadable.synchronize(ShoutTransfer)
  113. class ShoutProxy(resource.Resource):
  114. # We should probably expire the PLS after a while.
  115. # setResponseCode(self, code, message=None)
  116. # setHeader(self, k, v)
  117. # write(self, data)
  118. # finish(self)
  119. isLeaf = True
  120. def __init__(self, url, mt):
  121. resource.Resource.__init__(self)
  122. self.shoutpls = url
  123. self.mt = mt
  124. self.urls = None
  125. self.fetchingurls = False
  126. def dump_exc(self):
  127. exc = StringIO()
  128. traceback.print_exc(file=exc)
  129. exc.seek(0)
  130. self.request.setHeader('content-type', 'text/html')
  131. self.request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  132. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  133. '<pre>%s</pre>' % exc.read()).render(self.request))
  134. self.request.finish()
  135. self.request = None
  136. def startNextConnection(self, request):
  137. url = self.urls[self.urlpos]
  138. self.urlpos = (self.urlpos + 1) % len(self.urls)
  139. scheme, host, port, path = _parse(url)
  140. protocol.ClientCreator(reactor, ShoutTransfer, request,
  141. self.startNextConnection).connectTCP(host, port)
  142. def gotPLS(self, page):
  143. try:
  144. pls = ConfigParser.SafeConfigParser()
  145. pls.readfp(StringIO.StringIO(page))
  146. assert pls.getint(PLSsection, 'Version') == 2
  147. assert pls.has_option(PLSsection, 'numberofentries')
  148. cnt = pls.getint(PLSsection, 'numberofentries')
  149. self.urls = []
  150. for i in range(cnt):
  151. i += 1 # stupid one based arrays
  152. self.urls.append(pls.get(PLSsection,
  153. 'File%d' % i))
  154. self.urlpos = random.randrange(len(self.urls))
  155. except:
  156. self.dump_exc()
  157. return
  158. self.afterurls.callback(1)
  159. self.afterurls = None
  160. def errPLS(self, failure):
  161. self.request.write(failure.render(self.request))
  162. self.request.finish()
  163. self.request = None
  164. def processRequest(self, ign, request):
  165. self.startNextConnection(request)
  166. def render(self, request):
  167. request.setHeader('content-type', self.mt)
  168. if request.method == 'HEAD':
  169. return ''
  170. # need to start the state machine
  171. # a) fetch the playlist
  172. # b) choose a random starting point
  173. # c) connect to the server
  174. # d) select next server and goto c
  175. # return data
  176. if self.urls is None:
  177. if not self.fetchingurls:
  178. # Get the PLS
  179. self.fetchingurls = True
  180. # Not really sure if ascii is the correct one,
  181. # shouldn't getPage do proper escaping for me?
  182. getPage(self.shoutpls.encode('ascii')) \
  183. .addCallbacks(self.gotPLS, self.errPLS)
  184. self.afterurls = defer.Deferred()
  185. # Always add the callback if we don't have urls
  186. self.afterurls.addCallback(self.processRequest, request)
  187. else:
  188. self.startNextConnection(request)
  189. # and make sure the connection doesn't get closed
  190. return server.NOT_DONE_YET
  191. synchronized = [ 'render', 'gotPLS', 'startNextConnection' ]
  192. threadable.synchronize(ShoutProxy)
  193. class ShoutStation(AudioItem):
  194. def __init__(self, *args, **kwargs):
  195. self.station = kwargs['station']
  196. del kwargs['station']
  197. kwargs['content'] = ShoutProxy(self.station['PLS_URL'],
  198. self.station['MimeType'].encode('ascii'))
  199. AudioItem.__init__(self, *args, **kwargs)
  200. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  201. self.res = Resource(self.url, 'http-get:*:%s:*' % \
  202. self.station['MimeType'].encode('ascii'))
  203. self.bitrate = self.station['Bitrate'] * 128 # 1024k / 8bit
  204. class ShoutGenre(MusicGenre):
  205. def __init__(self, *args, **kwargs):
  206. self.genre = kwargs['genre']
  207. del kwargs['genre']
  208. self.feeds = ShoutcastFeedAsync(self.genre)
  209. self.sl = None
  210. self.pathObjmap = {}
  211. MusicGenre.__init__(self, *args, **kwargs)
  212. def genStations(self, stations):
  213. ret = {}
  214. dupcnt = {}
  215. for i in stations:
  216. name = i['Name']
  217. if name in ret:
  218. # got a dup
  219. if name not in dupcnt:
  220. dupcnt[name] = 2
  221. ret['%s - %d' % (name, dupcnt[name])] = i
  222. dupcnt[name] += 1
  223. else:
  224. ret[name] = i
  225. return ret
  226. def checkUpdate(self):
  227. self.doUpdate()
  228. return self
  229. def doUpdate(self):
  230. #traceback.print_stack(file=log.logfile)
  231. stations = self.feeds.parse_stations()
  232. if stations == self.sl:
  233. return
  234. nl = self.genStations(stations)
  235. doupdate = False
  236. for i in self.pathObjmap.keys():
  237. if i not in nl:
  238. # delete
  239. doupdate = True
  240. self.cd.delItem(self.pathObjmap[i])
  241. del self.pathObjmap[i]
  242. for name, i in nl.iteritems():
  243. if name in self.pathObjmap:
  244. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  245. continue
  246. # Didn't match, readd
  247. self.cd.delItem(self.pathObjmap[name])
  248. del self.pathObjmap[name]
  249. doupdate = True
  250. self.pathObjmap[name] = self.cd.addItem(self.id,
  251. ShoutStation, '%sk-%s' % (i['Bitrate'], name),
  252. station = i)
  253. self.sl = stations
  254. # sort our children
  255. self.sort(lambda *a: stationwbitratecmp(*a))
  256. if doupdate:
  257. Container.doUpdate(self)
  258. class ShoutCast(Container):
  259. def __init__(self, *args, **kwargs):
  260. Container.__init__(self, *args, **kwargs)
  261. self.genres = GenreFeedAsync()
  262. self.genre_list = None
  263. self.pathObjmap = {}
  264. def checkUpdate(self):
  265. self.doUpdate()
  266. return self
  267. def doUpdate(self):
  268. #traceback.print_stack(file=log.logfile)
  269. nl = self.genres.parse_genres()
  270. if nl == self.genre_list:
  271. return
  272. doupdate = False
  273. for i in self.pathObjmap.keys():
  274. if i not in nl:
  275. # delete
  276. doupdate = True
  277. self.cd.delItem(self.pathObjmap[i])
  278. del self.pathObjmap[i]
  279. for i in nl:
  280. if i in self.pathObjmap:
  281. continue
  282. doupdate = True
  283. self.pathObjmap[i] = self.cd.addItem(self.id,
  284. ShoutGenre, i, genre = i)
  285. self.genre_list = nl
  286. # sort our children
  287. self.sort(lambda x, y: cmp(x.title, y.title))
  288. if doupdate:
  289. Container.doUpdate(self)
  290. def detectshoutcastfile(path, fobj):
  291. path = os.path.basename(path)
  292. if path == 'SHOUTcast Radio':
  293. return ShoutCast, { }
  294. return None, None
  295. registerklassfun(detectshoutcastfile)