A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
9.5 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006 John-Mark Gurney <gurney_j@resnet.uoregon.edu>
  3. '''Shoutcast Radio Feed'''
  4. __version__ = '$Change$'
  5. # $Id$
  6. import ConfigParser
  7. import StringIO
  8. import os.path
  9. import random
  10. import traceback
  11. from py_shoutcast import *
  12. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  13. from FSStorage import registerklassfun
  14. from twisted.protocols import shoutcast
  15. from twisted.python import log, threadable
  16. from twisted.internet import defer, protocol, reactor
  17. from twisted.web import error, http, resource, server
  18. from twisted.web.client import getPage, _parse
  19. PLSsection = 'playlist'
  20. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  21. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  22. return False
  23. return True
  24. def stationwbitratecmp(x, y):
  25. x, y = map(lambda a: a.title.split('-', 1)[1], (x, y))
  26. return cmp(x, y)
  27. class GenreFeedAsync(feeds.GenreFeed):
  28. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  29. def __init__(self, *args, **kwargs):
  30. self.havegenre = False
  31. self.fetchinggenre = None
  32. feeds.GenreFeed.__init__(self, *args, **kwargs)
  33. def gotGenre(self, page):
  34. self.genre = page
  35. self.havegenre = True
  36. # Wake everyone up
  37. self.fetchinggenre.callback(1)
  38. def errGenre(self, failure):
  39. raise NotImplementedError, failure
  40. def fetch_genres(self):
  41. if self.havegenre:
  42. return self.genre
  43. if not self.fetchinggenre:
  44. # Need to start fetching
  45. getPage(self.genre_url.encode('ascii')) \
  46. .addCallbacks(self.gotGenre, self.errGenre)
  47. self.fetchinggenre = defer.Deferred()
  48. # Always raise this if we are waiting.
  49. raise self.fetchinggenre
  50. synchronized = ['fetch_genres', 'gotGenre', ]
  51. threadable.synchronize(GenreFeedAsync)
  52. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  53. def __init__(self, *args, **kwargs):
  54. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  55. self.shout_url = \
  56. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  57. self.genre
  58. self.havestations = False
  59. self.fetchingstations = None
  60. def gotStations(self, page):
  61. self.stations = page
  62. self.havestations = True
  63. # Wake everyone up
  64. self.fetchingstations.callback(1)
  65. def errStations(self, failure):
  66. raise NotImplementedError, failure
  67. def fetch_stations(self):
  68. if self.havestations:
  69. return self.stations
  70. if not self.fetchingstations:
  71. # Need to start fetching
  72. getPage(self.shout_url.encode('ascii')) \
  73. .addCallbacks(self.gotStations, self.errStations)
  74. self.fetchingstations = defer.Deferred()
  75. # Always raise this if we are waiting.
  76. raise self.fetchingstations
  77. synchronized = ['fetch_stations', 'gotStations', ]
  78. threadable.synchronize(ShoutcastFeedAsync)
  79. class ShoutTransfer(shoutcast.ShoutcastClient):
  80. def __init__(self, request, passback):
  81. shoutcast.ShoutcastClient.__init__(self)
  82. self.request = request
  83. self.passback = passback
  84. request.registerProducer(self, 1)
  85. def connectionLost(self, reason):
  86. log.msg('connectionLost:', `self.request`, `self.passback`)
  87. shoutcast.ShoutcastClient.connectionLost(self, reason)
  88. if self.request:
  89. self.request.unregisterProducer()
  90. if self.passback:
  91. self.passback(self.request)
  92. self.passback = None
  93. self.request = None
  94. def handleResponse(self, response):
  95. #Drop the data, the parts get the important data, if we got
  96. #here, the connection closed and we are going to die anyways.
  97. pass
  98. def stopProducing(self):
  99. shoutcast.ShoutcastClient.stopProducing(self)
  100. self.request = None
  101. self.passback = None
  102. def gotMP3Data(self, data):
  103. self.request.write(data)
  104. def gotMetaData(self, data):
  105. log.msg("meta:", `data`)
  106. pass
  107. # Remotely relay producer interface.
  108. def view_resumeProducing(self, issuer):
  109. self.resumeProducing()
  110. def view_pauseProducing(self, issuer):
  111. self.pauseProducing()
  112. def view_stopProducing(self, issuer):
  113. self.stopProducing()
  114. synchronized = ['resumeProducing', 'stopProducing']
  115. threadable.synchronize(ShoutTransfer)
  116. class ShoutProxy(resource.Resource):
  117. # We should probably expire the PLS after a while.
  118. # setResponseCode(self, code, message=None)
  119. # setHeader(self, k, v)
  120. # write(self, data)
  121. # finish(self)
  122. isLeaf = True
  123. def __init__(self, url, mt):
  124. resource.Resource.__init__(self)
  125. self.shoutpls = url
  126. self.mt = mt
  127. self.urls = None
  128. self.fetchingurls = False
  129. def dump_exc(self):
  130. exc = StringIO()
  131. traceback.print_exc(file=exc)
  132. exc.seek(0)
  133. self.request.setHeader('content-type', 'text/html')
  134. self.request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  135. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  136. '<pre>%s</pre>' % exc.read()).render(self.request))
  137. self.request.finish()
  138. self.request = None
  139. def startNextConnection(self, request):
  140. url = self.urls[self.urlpos]
  141. self.urlpos = (self.urlpos + 1) % len(self.urls)
  142. scheme, host, port, path = _parse(url)
  143. protocol.ClientCreator(reactor, ShoutTransfer, request,
  144. self.startNextConnection).connectTCP(host, port)
  145. def gotPLS(self, page):
  146. try:
  147. pls = ConfigParser.SafeConfigParser()
  148. pls.readfp(StringIO.StringIO(page))
  149. assert pls.getint(PLSsection, 'Version') == 2
  150. assert pls.has_option(PLSsection, 'numberofentries')
  151. cnt = pls.getint(PLSsection, 'numberofentries')
  152. self.urls = []
  153. for i in range(cnt):
  154. i += 1 # stupid one based arrays
  155. self.urls.append(pls.get(PLSsection,
  156. 'File%d' % i))
  157. self.urlpos = random.randrange(len(self.urls))
  158. except:
  159. self.dump_exc()
  160. return
  161. self.afterurls.callback(1)
  162. self.afterurls = None
  163. def errPLS(self, failure):
  164. self.request.write(failure.render(self.request))
  165. self.request.finish()
  166. self.request = None
  167. def processRequest(self, ign, request):
  168. self.startNextConnection(request)
  169. def render(self, request):
  170. request.setHeader('content-type', self.mt)
  171. if request.method == 'HEAD':
  172. return ''
  173. # need to start the state machine
  174. # a) fetch the playlist
  175. # b) choose a random starting point
  176. # c) connect to the server
  177. # d) select next server and goto c
  178. # return data
  179. if self.urls is None:
  180. if not self.fetchingurls:
  181. # Get the PLS
  182. self.fetchingurls = True
  183. # Not really sure if ascii is the correct one,
  184. # shouldn't getPage do proper escaping for me?
  185. getPage(self.shoutpls.encode('ascii')) \
  186. .addCallbacks(self.gotPLS, self.errPLS)
  187. self.afterurls = defer.Deferred()
  188. # Always add the callback if we don't have urls
  189. self.afterurls.addCallback(self.processRequest, request)
  190. else:
  191. self.startNextConnection(request)
  192. # and make sure the connection doesn't get closed
  193. return server.NOT_DONE_YET
  194. synchronized = [ 'render', 'gotPLS', 'startNextConnection' ]
  195. threadable.synchronize(ShoutProxy)
  196. class ShoutStation(AudioItem):
  197. def __init__(self, *args, **kwargs):
  198. self.station = kwargs['station']
  199. del kwargs['station']
  200. kwargs['content'] = ShoutProxy(self.station['PLS_URL'],
  201. self.station['MimeType'].encode('ascii'))
  202. AudioItem.__init__(self, *args, **kwargs)
  203. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  204. self.res = Resource(self.url, 'http-get:*:%s:*' % \
  205. self.station['MimeType'].encode('ascii'))
  206. self.bitrate = self.station['Bitrate'] * 128 # 1024k / 8bit
  207. class ShoutGenre(MusicGenre):
  208. def __init__(self, *args, **kwargs):
  209. self.genre = kwargs['genre']
  210. del kwargs['genre']
  211. self.feeds = ShoutcastFeedAsync(self.genre)
  212. self.sl = None
  213. self.pathObjmap = {}
  214. MusicGenre.__init__(self, *args, **kwargs)
  215. def genStations(self, stations):
  216. ret = {}
  217. dupcnt = {}
  218. for i in stations:
  219. name = i['Name']
  220. if name in ret:
  221. # got a dup
  222. if name not in dupcnt:
  223. dupcnt[name] = 2
  224. ret['%s - %d' % (name, dupcnt[name])] = i
  225. dupcnt[name] += 1
  226. else:
  227. ret[name] = i
  228. return ret
  229. def checkUpdate(self):
  230. self.doUpdate()
  231. return self
  232. def doUpdate(self):
  233. #traceback.print_stack(file=log.logfile)
  234. stations = self.feeds.parse_stations()
  235. if stations == self.sl:
  236. return
  237. nl = self.genStations(stations)
  238. doupdate = False
  239. for i in self.pathObjmap.keys():
  240. if i not in nl:
  241. # delete
  242. doupdate = True
  243. self.cd.delItem(self.pathObjmap[i])
  244. del self.pathObjmap[i]
  245. for name, i in nl.iteritems():
  246. if name in self.pathObjmap:
  247. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  248. continue
  249. # Didn't match, readd
  250. self.cd.delItem(self.pathObjmap[name])
  251. del self.pathObjmap[name]
  252. doupdate = True
  253. self.pathObjmap[name] = self.cd.addItem(self.id,
  254. ShoutStation, '%sk-%s' % (i['Bitrate'], name),
  255. station = i)
  256. self.sl = stations
  257. # sort our children
  258. self.sort(lambda *a: stationwbitratecmp(*a))
  259. if doupdate:
  260. Container.doUpdate(self)
  261. class ShoutCast(Container):
  262. def __init__(self, *args, **kwargs):
  263. Container.__init__(self, *args, **kwargs)
  264. self.genres = GenreFeedAsync()
  265. self.genre_list = None
  266. self.pathObjmap = {}
  267. def checkUpdate(self):
  268. self.doUpdate()
  269. return self
  270. def doUpdate(self):
  271. #traceback.print_stack(file=log.logfile)
  272. nl = self.genres.parse_genres()
  273. if nl == self.genre_list:
  274. return
  275. doupdate = False
  276. for i in self.pathObjmap.keys():
  277. if i not in nl:
  278. # delete
  279. doupdate = True
  280. self.cd.delItem(self.pathObjmap[i])
  281. del self.pathObjmap[i]
  282. for i in nl:
  283. if i in self.pathObjmap:
  284. continue
  285. doupdate = True
  286. self.pathObjmap[i] = self.cd.addItem(self.id,
  287. ShoutGenre, i, genre = i)
  288. self.genre_list = nl
  289. # sort our children
  290. self.sort(lambda x, y: cmp(x.title, y.title))
  291. if doupdate:
  292. Container.doUpdate(self)
  293. def detectshoutcastfile(path, fobj):
  294. path = os.path.basename(path)
  295. if path == 'SHOUTcast Radio':
  296. return ShoutCast, { }
  297. return None, None
  298. registerklassfun(detectshoutcastfile)