diff --git a/pymeds.py b/pymeds.py index ed2116e..aac607d 100755 --- a/pymeds.py +++ b/pymeds.py @@ -28,7 +28,6 @@ def tryloadmodule(mod): # These should be sorted by how much work they do, the least work the earlier. # mpegtsmod can be really expensive. modules = [ - 'shoutcast', 'audio', 'Clip', 'pyvr', diff --git a/shoutcast.py b/shoutcast.py deleted file mode 100644 index 8064388..0000000 --- a/shoutcast.py +++ /dev/null @@ -1,441 +0,0 @@ -#!/usr/bin/env python -# Copyright 2006 John-Mark Gurney -'''Shoutcast Radio Feed''' - -__version__ = '$Change$' -# $Id$ - -# The handling of defers and state in this module is not very good. It -# needs some work to ensure that error cases are properly handled. What -# do we do if we get no URLs for a PLS? Do we delete ourselves just to be -# readded (when we get PLS refeshing working)? Do we set a content-length -# to zero till we get one? - -import ConfigParser -import cStringIO as StringIO -import os.path -import random -import time -import traceback -import xml.dom.minidom - -from py_shoutcast import * - -from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource -from FSStorage import registerklassfun - -from twisted.protocols import shoutcast -from twisted.python import log, threadable, failure -from twisted.internet import defer, protocol, reactor -from twisted.web import error, http, resource, server -from twisted.web.client import getPage, _parse - -PLSsection = 'playlist' - -def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )): - if filter(lambda k, x = a, y = b: x[k] != y[k], keys): - return False - return True - -def stationwbitratecmp(x, y): - x, y = map(lambda a: a.title.split('-', 1)[1], (x, y)) - return cmp(x, y) - -class GenreFeedAsync(feeds.GenreFeed): - genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml' - - def __init__(self, *args, **kwargs): - self.havegenre = False - self.fetchinggenre = None - feeds.GenreFeed.__init__(self, *args, **kwargs) - - def gotGenre(self, page): - self.genre = page - self.havegenre = True - - # Wake everyone up - self.fetchinggenre.callback(1) - - def errGenre(self, failure): - raise NotImplementedError, failure - - def fetch_genres(self): - if self.havegenre: - return self.genre - if not self.fetchinggenre: - # Need to start fetching - getPage(self.genre_url.encode('ascii')) \ - .addCallbacks(self.gotGenre, self.errGenre) - self.fetchinggenre = defer.Deferred() - # Always raise this if we are waiting. - raise self.fetchinggenre - - synchronized = ['fetch_genres', 'gotGenre', ] - -threadable.synchronize(GenreFeedAsync) - -class ShoutcastFeedAsync(feeds.ShoutcastFeed): - def __init__(self, *args, **kwargs): - feeds.ShoutcastFeed.__init__(self, *args, **kwargs) - - self.shout_url = \ - 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \ - self.genre - - self.havestations = False - self.fetchingstations = None - - def gotStations(self, page): - self.stations = page - self.havestations = True - - # Wake everyone up - self.fetchingstations.callback(1) - - def errStations(self, failure): - raise NotImplementedError, failure - - def fetch_stations(self): - if self.havestations: - return self.stations - if not self.fetchingstations: - # Need to start fetching - getPage(self.shout_url.encode('ascii')) \ - .addCallbacks(self.gotStations, self.errStations) - self.fetchingstations = defer.Deferred() - # Always raise this if we are waiting. - raise self.fetchingstations - - synchronized = ['fetch_stations', 'gotStations', ] - -threadable.synchronize(ShoutcastFeedAsync) - -class ShoutTransfer(shoutcast.ShoutcastClient): - userAgent = 'because you block user-agents' - def __init__(self, request, passback): - shoutcast.ShoutcastClient.__init__(self) - self.request = request - self.passback = passback - request.registerProducer(self, 1) - - def connectionLost(self, reason): - #traceback.print_stack() - log.msg('connectionLost:', `self.request`, `self.passback`) - shoutcast.ShoutcastClient.connectionLost(self, reason) - if self.request: - self.request.unregisterProducer() - if self.passback: - self.passback(self.request) - - self.passback = None - self.request = None - - def handleResponse(self, response): - #Drop the data, the parts get the important data, if we got - #here, the connection closed and we are going to die anyways. - pass - - def stopProducing(self): - if self.transport is not None: - shoutcast.ShoutcastClient.stopProducing(self) - self.request = None - self.passback = None - - def gotMP3Data(self, data): - if data[:6] == '': - print 'gd:', `data` - if self.request is not None: - self.request.write(data) - - def gotMetaData(self, data): - log.msg("meta:", `data`) - pass - - # Remotely relay producer interface. - - def view_resumeProducing(self, issuer): - self.resumeProducing() - - def view_pauseProducing(self, issuer): - self.pauseProducing() - - def view_stopProducing(self, issuer): - self.stopProducing() - - synchronized = ['resumeProducing', 'stopProducing'] - -threadable.synchronize(ShoutTransfer) - -class ShoutProxy(resource.Resource): - # We should probably expire the PLS after a while. - - # setResponseCode(self, code, message=None) - # setHeader(self, k, v) - # write(self, data) - # finish(self) - - isLeaf = True - - def __init__(self, url, mt): - resource.Resource.__init__(self) - self.shoutpls = url - self.mt = mt - self.urls = None - self.fetchingurls = False - - def dump_exc(self, failure, request): - exc = StringIO.StringIO() - failure.printBriefTraceback(file=exc) - failure.printTraceback() - exc.seek(0) - request.setHeader('content-type', 'text/html') - request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR, - http.RESPONSES[http.INTERNAL_SERVER_ERROR], - '
%s
' % exc.read()).render(request)) - request.finish() - - def startNextConnection(self, request): - url = self.urls[self.urlpos] - self.urlpos = (self.urlpos + 1) % len(self.urls) - scheme, host, port, path = _parse(url) - #print `url` - protocol.ClientCreator(reactor, ShoutTransfer, request, - self.startNextConnection).connectTCP(host, port) - - def triggerdefered(self, fun): - map(fun, self.afterurls) - self.afterurls = None - - def gotURL(self, page): - self.fetchingurls = False - try: - urls = self.parsePLS(page) - except ConfigParser.MissingSectionHeaderError: - urls = self.parseWAX(page) - - #log.msg('pls urls:', self.urls) - self.urls = urls - self.urlpos = random.randrange(len(self.urls)) - - self.triggerdefered(lambda x: x.callback(True)) - - def parseWAX(self, page): - print 'trying WAX' - dom = xml.dom.minidom.parseString(page) - rootel = dom.documentElement - if rootel.nodeName != 'asx': - raise ValueError('Only asx allowed, got %s' % - `rootel.nodeName`) - - urls = [] - for i in rootel.getElementsByTagName('entry'): - urls.extend(str(x.getAttribute('href')) for x in - i.getElementsByTagName('ref')) - - print 'returning:', `urls` - return urls - - def parsePLS(self, page): - pls = ConfigParser.SafeConfigParser() - pls.readfp(StringIO.StringIO(page)) - # KCSM 91.1 doesn't provide a version - #assert pls.getint(PLSsection, 'Version') == 2 - assert pls.has_option(PLSsection, 'numberofentries') - cnt = pls.getint(PLSsection, 'numberofentries') - urls = [] - for i in range(cnt): - i += 1 # stupid one based arrays - urls.append(pls.get(PLSsection, - 'File%d' % i)) - return urls - - def errURL(self, failure): - self.fetchingurls = False - # XXX - retry? - self.triggerdefered(lambda x: x.errback(failure)) - - def processRequest(self, ign, request): - self.startNextConnection(request) - - def errRequest(self, failure, request): - self.dump_exc(failure, request) - - def render(self, request): - request.setHeader('content-type', self.mt) - # XXX - PS3 doesn't support streaming, this makes it think - # that is has data, but it needs to d/l the entire thing. - #request.setHeader('content-length', 1*1024*1024) - - if request.method == 'HEAD': - return '' - - # need to start the state machine - # a) fetch the playlist - # b) choose a random starting point - # c) connect to the server - # d) select next server and goto c - # return data - if self.urls is None: - if not self.fetchingurls: - # Get the page - self.fetchingurls = True - # Not really sure if ascii is the correct one, - # shouldn't getPage do proper escaping for me? - self.afterurls = [ defer.Deferred() ] - d = getPage(self.shoutpls.encode('ascii')) - d.addCallback(self.gotURL) - d.addErrback(self.errURL) - else: - self.afterurls.append(defer.Deferred()) - # Always add the callback if we don't have urls - self.afterurls[-1].addCallbacks(self.processRequest, - errback=self.errRequest, callbackArgs=(request, ), - errbackArgs=(request, )) - else: - self.startNextConnection(request) - - # and make sure the connection doesn't get closed - return server.NOT_DONE_YET - - synchronized = [ 'gotURL', 'render', 'startNextConnection', - 'triggerdefered', ] -threadable.synchronize(ShoutProxy) - -class ShoutURL(AudioItem): - def __init__(self, *args, **kwargs): - url = kwargs.pop('url') - mimetype = kwargs.pop('mimetype', 'audio/mpeg') - bitrate = kwargs.pop('bitrate', None) - - kwargs['content'] = ShoutProxy(url, mimetype) - AudioItem.__init__(self, *args, **kwargs) - self.url = '%s/%s' % (self.cd.urlbase, self.id) - self.res = Resource(self.url, 'http-get:*:%s:*' % mimetype) - #self.res = Resource(self.url + '/pcm', 'http-get:*:%s:*' % \ - # 'audio/x-wav') - if bitrate is not None: - self.bitrate = bitrate - -class ShoutFile(ShoutURL): - def __init__(self, *args, **kwargs): - file = kwargs.pop('file') - kwargs['url'] = open(file).read().strip() - - ShoutURL.__init__(self, *args, **kwargs) - -class ShoutStation(ShoutURL): - def __init__(self, *args, **kwargs): - self.station = kwargs.pop('station') - - kwargs['url'] = self.station['PLS_URL'] - kwargs['mimetype'] = self.station['MimeType'].encode('ascii') - kwargs['bitrate'] = self.station['Bitrate'] * 128 # 1024k / 8bit - - ShoutURL.__init__(self, *args, **kwargs) - -class ShoutGenre(MusicGenre): - def __init__(self, *args, **kwargs): - self.genre = kwargs['genre'] - del kwargs['genre'] - #self.feeds = ShoutcastFeedAsync(self.genre) - self.feeds = feeds.ShoutcastFeed(self.genre) - self.sl = None - - MusicGenre.__init__(self, *args, **kwargs) - - def genStations(self, stations): - ret = {} - dupcnt = {} - - for i in stations: - name = i['Name'] - if name in ret: - # got a dup - if name not in dupcnt: - dupcnt[name] = 2 - - ret['%s - %d' % (name, dupcnt[name])] = i - dupcnt[name] += 1 - else: - ret[name] = i - - return ret - - def checkUpdate(self): - # Sometimes the shoutcast server returns a 503 (busy) which - # isn't valid XML, try again. - while True: - try: - stations = self.feeds.parse_stations() - break - except: - traceback.print_exc() - - time.sleep(1) - - if stations == self.sl: - return - - self.sl = stations - - self.doUpdate() - - def genChildren(self): - return self.genStations(self.sl) - - def genCurrent(self): - return ((x.id, x.station) for x in self) - - def createObject(self, name, arg): - return ShoutStation, '%sk-%s' % (arg['Bitrate'], name), (), \ - { 'station': arg } - - def sort(self): - super(ShoutGenre, self).sort(lambda *a: stationwbitratecmp(*a)) - -class ShoutCast(Container): - def __init__(self, *args, **kwargs): - Container.__init__(self, *args, **kwargs) - - #self.genres = GenreFeedAsync() - self.genres = feeds.GenreFeed() - self.genre_list = None - - def checkUpdate(self): - while True: - try: - nl = self.genres.parse_genres() - if nl == self.genre_list: - return - break - except Exception, x: - print 'genre:', `self.genres.genre` - print 'genres:', `self.genres.genres` - log.msg('parse_genres exception:', `x`) - time.sleep(1) - - self.genre_list = nl - - super(ShoutCast, self).doUpdate() - - def genChildren(self): - return self.genre_list - - def genCurrent(self): - return ((x.id, x.genre) for x in self) - - def createObject(self, i): - return ShoutGenre, i, (), { 'genre': i } - -def detectshoutcastfile(origpath, fobj): - path = os.path.basename(origpath) - if path == 'SHOUTcast Radio': - return ShoutCast, { } - - ext = os.path.splitext(path)[1] - if ext == '.scst': - return ShoutFile, { 'file': origpath } - - return None, None - -registerklassfun(detectshoutcastfile)