A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
9.8 KiB

  1. #!/usr/bin/env python
  2. '''Shoutcast Radio Feed'''
  3. import ConfigParser
  4. import StringIO
  5. import os.path
  6. import random
  7. import traceback
  8. from py_shoutcast import *
  9. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  10. from FSStorage import registerklassfun
  11. from twisted.protocols import shoutcast
  12. from twisted.python import log, threadable
  13. from twisted.internet import defer, protocol, reactor
  14. from twisted.web import error, http, resource, server
  15. from twisted.web.client import getPage, _parse
  16. PLSsection = 'playlist'
  17. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  18. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  19. return False
  20. return True
  21. class GenreFeedAsync(feeds.GenreFeed):
  22. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  23. def __init__(self, *args, **kwargs):
  24. self.havegenre = False
  25. self.fetchinggenre = None
  26. feeds.GenreFeed.__init__(self, *args, **kwargs)
  27. def gotGenre(self, page):
  28. self.genre = page
  29. self.havegenre = True
  30. # Wake everyone up
  31. self.fetchinggenre.callback(None)
  32. def errGenre(self, failure):
  33. raise NotImplementedError, failure
  34. def fetch_genres(self):
  35. if self.havegenre:
  36. return self.genre
  37. if not self.fetchinggenre:
  38. # Need to start fetching
  39. getPage(self.genre_url.encode('ascii')) \
  40. .addCallbacks(self.gotGenre, self.errGenre)
  41. self.fetchinggenre = defer.Deferred()
  42. # Always raise this if we are waiting.
  43. raise self.fetchinggenre
  44. synchronized = ['fetch_genres', 'gotGenre', ]
  45. threadable.synchronize(GenreFeedAsync)
  46. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  47. def __init__(self, *args, **kwargs):
  48. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  49. self.shout_url = \
  50. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  51. self.genre
  52. self.havestations = False
  53. self.fetchingstations = None
  54. def gotStations(self, page):
  55. self.stations = page
  56. self.havestations = True
  57. # Wake everyone up
  58. self.fetchingstations.callback(None)
  59. def errStations(self, failure):
  60. raise NotImplementedError, failure
  61. def fetch_stations(self):
  62. if self.havestations:
  63. return self.stations
  64. if not self.fetchingstations:
  65. # Need to start fetching
  66. getPage(self.shout_url.encode('ascii')) \
  67. .addCallbacks(self.gotStations, self.errStations)
  68. self.fetchingstations = defer.Deferred()
  69. # Always raise this if we are waiting.
  70. raise self.fetchingstations
  71. synchronized = ['fetch_stations', 'gotStations', ]
  72. threadable.synchronize(ShoutcastFeedAsync)
  73. class ShoutTransfer(shoutcast.ShoutcastClient):
  74. def __init__(self, request, passback):
  75. shoutcast.ShoutcastClient.__init__(self)
  76. self.request = request
  77. self.passback = passback
  78. log.msg('ShoutTransfer:', `request`, `passback`)
  79. request.registerProducer(self, 1)
  80. def connectionLost(self, reason):
  81. log.msg('connectionLost:', `self.request`, `self.passback`)
  82. shoutcast.ShoutcastClient.connectionLost(self, reason)
  83. if self.request:
  84. self.request.unregisterProducer()
  85. if self.passback:
  86. self.passback(self.request)
  87. self.passback = None
  88. self.request = None
  89. def handleResponse(self, response):
  90. #Drop the data, the parts get the important data, if we got
  91. #here, the connection closed and we are going to die anyways.
  92. pass
  93. def stopProducing(self):
  94. log.msg('stopProducing:', `self.request`, `self.passback`)
  95. shoutcast.ShoutcastClient.stopProducing(self)
  96. self.request = None
  97. self.passback = None
  98. def gotMP3Data(self, data):
  99. self.request.write(data)
  100. def gotMetaData(self, data):
  101. log.msg("meta:", `data`)
  102. pass
  103. # Remotely relay producer interface.
  104. def view_resumeProducing(self, issuer):
  105. self.resumeProducing()
  106. def view_pauseProducing(self, issuer):
  107. self.pauseProducing()
  108. def view_stopProducing(self, issuer):
  109. self.stopProducing()
  110. synchronized = ['resumeProducing', 'stopProducing']
  111. threadable.synchronize(ShoutTransfer)
  112. class ShoutProxy(resource.Resource):
  113. # We should probably expire the PLS after a while.
  114. # setResponseCode(self, code, message=None)
  115. # setHeader(self, k, v)
  116. # write(self, data)
  117. # finish(self)
  118. isLeaf = True
  119. def __init__(self, url, mt):
  120. resource.Resource.__init__(self)
  121. self.shoutpls = url
  122. self.mt = mt
  123. self.urls = None
  124. self.fetchingurls = False
  125. def dump_exc(self):
  126. exc = StringIO()
  127. traceback.print_exc(file=exc)
  128. exc.seek(0)
  129. self.request.setHeader('content-type', 'text/html')
  130. self.request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  131. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  132. '<pre>%s</pre>' % exc.read()).render(self.request))
  133. self.request.finish()
  134. self.request = None
  135. def startNextConnection(self, request):
  136. url = self.urls[self.urlpos]
  137. self.urlpos = (self.urlpos + 1) % len(self.urls)
  138. log.msg('starting transfer for:', `url`)
  139. scheme, host, port, path = _parse(url)
  140. protocol.ClientCreator(reactor, ShoutTransfer, request,
  141. self.startNextConnection).connectTCP(host, port)
  142. def gotPLS(self, page):
  143. try:
  144. pls = ConfigParser.SafeConfigParser()
  145. pls.readfp(StringIO.StringIO(page))
  146. assert pls.getint(PLSsection, 'Version') == 2
  147. assert pls.has_option(PLSsection, 'numberofentries')
  148. cnt = pls.getint(PLSsection, 'numberofentries')
  149. self.urls = []
  150. for i in range(cnt):
  151. i += 1 # stupid one based arrays
  152. self.urls.append(pls.get(PLSsection,
  153. 'File%d' % i))
  154. self.urlpos = random.randrange(len(self.urls))
  155. log.msg('urls:', `self.urls`, 'urlpos:', self.urlpos)
  156. except:
  157. self.dump_exc()
  158. return
  159. log.msg('waking up reqests')
  160. self.afterurls.callback('foo')
  161. self.afterurls = None
  162. def errPLS(self, failure):
  163. log.msg('errPLS:', `failure`)
  164. self.request.write(failure.render(self.request))
  165. self.request.finish()
  166. self.request = None
  167. def processRequest(self, ign, request):
  168. log.msg('processRequest:', `request`)
  169. self.startNextConnection(request)
  170. def render(self, request):
  171. request.setHeader('content-type', self.mt)
  172. if request.method == 'HEAD':
  173. return ''
  174. # need to start the state machine
  175. # a) fetch the playlist
  176. # b) choose a random starting point
  177. # c) connect to the server
  178. # d) select next server and goto c
  179. # return data
  180. if self.urls is None:
  181. if not self.fetchingurls:
  182. # Get the PLS
  183. self.fetchingurls = True
  184. log.msg('starting page fetch:', `self.shoutpls`)
  185. # Not really sure if ascii is the correct one,
  186. # shouldn't getPage do proper escaping for me?
  187. getPage(self.shoutpls.encode('ascii')) \
  188. .addCallbacks(self.gotPLS, self.errPLS)
  189. self.afterurls = defer.Deferred()
  190. # Always add the callback if we don't have urls
  191. self.afterurls.addCallback(self.processRequest, request)
  192. else:
  193. self.startNextConnection(request)
  194. # and make sure the connection doesn't get closed
  195. return server.NOT_DONE_YET
  196. synchronized = [ 'render', 'gotPLS', 'startNextConnection' ]
  197. threadable.synchronize(ShoutProxy)
  198. class ShoutStation(AudioItem):
  199. def __init__(self, *args, **kwargs):
  200. self.station = kwargs['station']
  201. del kwargs['station']
  202. kwargs['content'] = ShoutProxy(self.station['PLS_URL'],
  203. self.station['MimeType'].encode('ascii'))
  204. AudioItem.__init__(self, *args, **kwargs)
  205. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  206. self.res = Resource(self.url, 'http-get:*:%s:*' % \
  207. self.station['MimeType'].encode('ascii'))
  208. self.bitrate = self.station['Bitrate'] * 128 # 1024k / 8bit
  209. def stationwbitratecmp(x, y):
  210. x, y = map(lambda x: x.split('-', 1)[1], (x, y))
  211. return cmp(x, y)
  212. class ShoutGenre(MusicGenre):
  213. def __init__(self, *args, **kwargs):
  214. self.genre = kwargs['genre']
  215. del kwargs['genre']
  216. self.feeds = ShoutcastFeedAsync(self.genre)
  217. self.sl = None
  218. self.pathObjmap = {}
  219. MusicGenre.__init__(self, *args, **kwargs)
  220. def genStations(self, stations):
  221. ret = {}
  222. dupcnt = {}
  223. for i in stations:
  224. name = i['Name']
  225. if name in ret:
  226. # got a dup
  227. if name not in dupcnt:
  228. dupcnt[name] = 2
  229. ret['%s - %d' % (name, dupcnt[name])] = i
  230. dupcnt[name] += 1
  231. else:
  232. ret[name] = i
  233. return ret
  234. def checkUpdate(self):
  235. self.doUpdate()
  236. return self
  237. def doUpdate(self):
  238. #traceback.print_stack(file=log.logfile)
  239. stations = self.feeds.parse_stations()
  240. if stations == self.sl:
  241. return
  242. nl = self.genStations(stations)
  243. doupdate = False
  244. for i in self.pathObjmap.keys():
  245. if i not in nl:
  246. # delete
  247. doupdate = True
  248. self.cd.delItem(self.pathObjmap[i])
  249. del self.pathObjmap[i]
  250. for name, i in nl.iteritems():
  251. if name in self.pathObjmap:
  252. if cmpStation(i, self.cd[self.pathObjmap[name]].station):
  253. continue
  254. # Didn't match, readd
  255. self.cd.delItem(self.pathObjmap[name])
  256. del self.pathObjmap[name]
  257. doupdate = True
  258. self.pathObjmap[name] = self.cd.addItem(self.id,
  259. ShoutStation, '%sk-%s' % (i['Bitrate'], name),
  260. station = i)
  261. self.sl = stations
  262. # sort our children
  263. self.sort(lambda x, y: cmp(x.title, y.title))
  264. if doupdate:
  265. Container.doUpdate(self)
  266. class ShoutCast(Container):
  267. def __init__(self, *args, **kwargs):
  268. log.msg('shoutcast created')
  269. Container.__init__(self, *args, **kwargs)
  270. self.genres = GenreFeedAsync()
  271. self.genre_list = None
  272. self.pathObjmap = {}
  273. def checkUpdate(self):
  274. self.doUpdate()
  275. return self
  276. def doUpdate(self):
  277. #traceback.print_stack(file=log.logfile)
  278. nl = self.genres.parse_genres()[:30]
  279. if nl == self.genre_list:
  280. return
  281. doupdate = False
  282. for i in self.pathObjmap.keys():
  283. if i not in nl:
  284. # delete
  285. doupdate = True
  286. self.cd.delItem(self.pathObjmap[i])
  287. del self.pathObjmap[i]
  288. for i in nl:
  289. if i in self.pathObjmap:
  290. continue
  291. doupdate = True
  292. self.pathObjmap[i] = self.cd.addItem(self.id,
  293. ShoutGenre, i, genre = i)
  294. self.genre_list = nl
  295. # sort our children
  296. self.sort(lambda x, y: cmp(x.title, y.title))
  297. if doupdate:
  298. Container.doUpdate(self)
  299. def detectshoutcastfile(path, fobj):
  300. path = os.path.basename(path)
  301. if path == 'SHOUTcast Radio':
  302. return ShoutCast, { }
  303. return None, None
  304. registerklassfun(detectshoutcastfile)