#!/usr/bin/env python # Copyright 2006 John-Mark Gurney '''Shoutcast Radio Feed''' __version__ = '$Change$' # $Id$ # The handling of defers and state in this module is not very good. It # needs some work to ensure that error cases are properly handled. What # do we do if we get no URLs for a PLS? Do we delete ourselves just to be # readded (when we get PLS refeshing working)? Do we set a content-length # to zero till we get one? import ConfigParser import cStringIO as StringIO import os.path import random import time import traceback from py_shoutcast import * from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource from FSStorage import registerklassfun from twisted.protocols import shoutcast from twisted.python import log, threadable, failure from twisted.internet import defer, protocol, reactor from twisted.web import error, http, resource, server from twisted.web.client import getPage, _parse PLSsection = 'playlist' def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )): if filter(lambda k, x = a, y = b: x[k] != y[k], keys): return False return True def stationwbitratecmp(x, y): x, y = map(lambda a: a.title.split('-', 1)[1], (x, y)) return cmp(x, y) class GenreFeedAsync(feeds.GenreFeed): genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml' def __init__(self, *args, **kwargs): self.havegenre = False self.fetchinggenre = None feeds.GenreFeed.__init__(self, *args, **kwargs) def gotGenre(self, page): self.genre = page self.havegenre = True # Wake everyone up self.fetchinggenre.callback(1) def errGenre(self, failure): raise NotImplementedError, failure def fetch_genres(self): if self.havegenre: return self.genre if not self.fetchinggenre: # Need to start fetching getPage(self.genre_url.encode('ascii')) \ .addCallbacks(self.gotGenre, self.errGenre) self.fetchinggenre = defer.Deferred() # Always raise this if we are waiting. raise self.fetchinggenre synchronized = ['fetch_genres', 'gotGenre', ] threadable.synchronize(GenreFeedAsync) class ShoutcastFeedAsync(feeds.ShoutcastFeed): def __init__(self, *args, **kwargs): feeds.ShoutcastFeed.__init__(self, *args, **kwargs) self.shout_url = \ 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \ self.genre self.havestations = False self.fetchingstations = None def gotStations(self, page): self.stations = page self.havestations = True # Wake everyone up self.fetchingstations.callback(1) def errStations(self, failure): raise NotImplementedError, failure def fetch_stations(self): if self.havestations: return self.stations if not self.fetchingstations: # Need to start fetching getPage(self.shout_url.encode('ascii')) \ .addCallbacks(self.gotStations, self.errStations) self.fetchingstations = defer.Deferred() # Always raise this if we are waiting. raise self.fetchingstations synchronized = ['fetch_stations', 'gotStations', ] threadable.synchronize(ShoutcastFeedAsync) class ShoutTransfer(shoutcast.ShoutcastClient): userAgent = 'because you block user-agents' def __init__(self, request, passback): shoutcast.ShoutcastClient.__init__(self) self.request = request self.passback = passback request.registerProducer(self, 1) def connectionLost(self, reason): #traceback.print_stack() log.msg('connectionLost:', `self.request`, `self.passback`) shoutcast.ShoutcastClient.connectionLost(self, reason) if self.request: self.request.unregisterProducer() if self.passback: self.passback(self.request) self.passback = None self.request = None def handleResponse(self, response): #Drop the data, the parts get the important data, if we got #here, the connection closed and we are going to die anyways. pass def stopProducing(self): if self.transport is not None: shoutcast.ShoutcastClient.stopProducing(self) self.request = None self.passback = None def gotMP3Data(self, data): if self.request is not None: self.request.write(data) def gotMetaData(self, data): log.msg("meta:", `data`) pass # Remotely relay producer interface. def view_resumeProducing(self, issuer): self.resumeProducing() def view_pauseProducing(self, issuer): self.pauseProducing() def view_stopProducing(self, issuer): self.stopProducing() synchronized = ['resumeProducing', 'stopProducing'] threadable.synchronize(ShoutTransfer) class ShoutProxy(resource.Resource): # We should probably expire the PLS after a while. # setResponseCode(self, code, message=None) # setHeader(self, k, v) # write(self, data) # finish(self) isLeaf = True def __init__(self, url, mt): resource.Resource.__init__(self) self.shoutpls = url self.mt = mt self.urls = None self.fetchingurls = False def dump_exc(self, failure, request): exc = StringIO.StringIO() failure.printBriefTraceback(file=exc) failure.printTraceback() exc.seek(0) request.setHeader('content-type', 'text/html') request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR, http.RESPONSES[http.INTERNAL_SERVER_ERROR], '
%s
' % exc.read()).render(request)) request.finish() def startNextConnection(self, request): url = self.urls[self.urlpos] self.urlpos = (self.urlpos + 1) % len(self.urls) scheme, host, port, path = _parse(url) #print `url` protocol.ClientCreator(reactor, ShoutTransfer, request, self.startNextConnection).connectTCP(host, port) def triggerdefered(self, fun): map(fun, self.afterurls) self.afterurls = None def gotPLS(self, page): self.fetchingurls = False pls = ConfigParser.SafeConfigParser() pls.readfp(StringIO.StringIO(page)) # KCSM 91.1 doesn't provide a version #assert pls.getint(PLSsection, 'Version') == 2 assert pls.has_option(PLSsection, 'numberofentries') cnt = pls.getint(PLSsection, 'numberofentries') self.urls = [] for i in range(cnt): i += 1 # stupid one based arrays self.urls.append(pls.get(PLSsection, 'File%d' % i)) #log.msg('pls urls:', self.urls) self.urlpos = random.randrange(len(self.urls)) self.triggerdefered(lambda x: x.callback(True)) def errPLS(self, failure): self.fetchingurls = False # XXX - retry? self.triggerdefered(lambda x: x.errback(failure)) def processRequest(self, ign, request): self.startNextConnection(request) def errRequest(self, failure, request): self.dump_exc(failure, request) def render(self, request): request.setHeader('content-type', self.mt) # XXX - PS3 doesn't support streaming, this makes it think # that is has data, but it needs to d/l the entire thing. #request.setHeader('content-length', 1*1024*1024) if request.method == 'HEAD': return '' # need to start the state machine # a) fetch the playlist # b) choose a random starting point # c) connect to the server # d) select next server and goto c # return data if self.urls is None: if not self.fetchingurls: # Get the PLS self.fetchingurls = True # Not really sure if ascii is the correct one, # shouldn't getPage do proper escaping for me? self.afterurls = [ defer.Deferred() ] d = getPage(self.shoutpls.encode('ascii')) d.addCallback(self.gotPLS) d.addErrback(self.errPLS) else: self.afterurls.append(defer.Deferred()) # Always add the callback if we don't have urls self.afterurls[-1].addCallbacks(self.processRequest, errback=self.errRequest, callbackArgs=(request, ), errbackArgs=(request, )) else: self.startNextConnection(request) # and make sure the connection doesn't get closed return server.NOT_DONE_YET synchronized = [ 'gotPLS', 'render', 'startNextConnection', 'triggerdefered', ] threadable.synchronize(ShoutProxy) class ShoutURL(AudioItem): def __init__(self, *args, **kwargs): url = kwargs.pop('url') mimetype = kwargs.pop('mimetype', 'audio/mpeg') bitrate = kwargs.pop('bitrate', None) kwargs['content'] = ShoutProxy(url, mimetype) AudioItem.__init__(self, *args, **kwargs) self.url = '%s/%s' % (self.cd.urlbase, self.id) self.res = Resource(self.url, 'http-get:*:%s:*' % mimetype) #self.res = Resource(self.url + '/pcm', 'http-get:*:%s:*' % \ # 'audio/x-wav') if bitrate is not None: self.bitrate = bitrate class ShoutFile(ShoutURL): def __init__(self, *args, **kwargs): file = kwargs.pop('file') kwargs['url'] = open(file).read().strip() ShoutURL.__init__(self, *args, **kwargs) class ShoutStation(ShoutURL): def __init__(self, *args, **kwargs): self.station = kwargs.pop('station') kwargs['url'] = self.station['PLS_URL'] kwargs['mimetype'] = self.station['MimeType'].encode('ascii') kwargs['bitrate'] = self.station['Bitrate'] * 128 # 1024k / 8bit ShoutURL.__init__(self, *args, **kwargs) class ShoutGenre(MusicGenre): def __init__(self, *args, **kwargs): self.genre = kwargs['genre'] del kwargs['genre'] #self.feeds = ShoutcastFeedAsync(self.genre) self.feeds = feeds.ShoutcastFeed(self.genre) self.sl = None MusicGenre.__init__(self, *args, **kwargs) def genStations(self, stations): ret = {} dupcnt = {} for i in stations: name = i['Name'] if name in ret: # got a dup if name not in dupcnt: dupcnt[name] = 2 ret['%s - %d' % (name, dupcnt[name])] = i dupcnt[name] += 1 else: ret[name] = i return ret def checkUpdate(self): # Sometimes the shoutcast server returns a 503 (busy) which # isn't valid XML, try again. while True: try: stations = self.feeds.parse_stations() break except: traceback.print_exc() time.sleep(1) if stations == self.sl: return self.sl = stations self.doUpdate() def genChildren(self): return self.genStations(self.sl) def genCurrent(self): return ((x.id, x.station) for x in self) def createObject(self, name, arg): return ShoutStation, '%sk-%s' % (arg['Bitrate'], name), (), \ { 'station': arg } def sort(self): super(ShoutGenre, self).sort(lambda *a: stationwbitratecmp(*a)) class ShoutCast(Container): def __init__(self, *args, **kwargs): Container.__init__(self, *args, **kwargs) #self.genres = GenreFeedAsync() self.genres = feeds.GenreFeed() self.genre_list = None def checkUpdate(self): while True: try: nl = self.genres.parse_genres() if nl == self.genre_list: return break except Exception, x: log.msg('parse_genres exception:', `x`) time.sleep(1) self.genre_list = nl super(ShoutCast, self).doUpdate() def genChildren(self): return self.genre_list def genCurrent(self): return ((x.id, x.genre) for x in self) def createObject(self, i): return ShoutGenre, i, (), { 'genre': i } def detectshoutcastfile(origpath, fobj): path = os.path.basename(origpath) if path == 'SHOUTcast Radio': return ShoutCast, { } ext = os.path.splitext(path)[1] if ext == '.scst': return ShoutFile, { 'file': origpath } return None, None registerklassfun(detectshoutcastfile)