# Licensed under the MIT license # http://opensource.org/licenses/mit-license.php # Copyright 2005, Tim Potter # Copyright 2006-2009 John-Mark Gurney __version__ = '$Change: 1665 $' # $Id: //depot/python/pymeds/main/DIDLLite.py#32 $ import functools import itertools import unittest import et for i in [ 'Element', 'SubElement', 'tostring', ]: locals()[i] = getattr(et.ET, i) class Resource(object): """An object representing a resource.""" validattrs = { 'protocolinfo': 'protocolInfo', 'importuri': 'importUri', 'size': 'size', 'duration': 'duration', 'protection': 'protection', 'bitrate': 'bitrate', 'bitspersample': 'bitsPerSample', 'samplefrequency': 'sampleFrequence', 'nraudiochannels': 'nrAudioChannels', 'resolution': 'resolution', 'colordepth': 'colorDepth', 'tspec': 'tspec', 'alloweduse': 'allowedUse', 'validitystart': 'validityStart', 'validityend': 'validityEnd', 'remainingtime': 'remainingTime', 'usageinfo': 'usageInfo', 'rightsinfouri': 'rightsInfoURI', 'contentinfouri': 'contentInfoURI', 'recordquality': 'recordQuality', } def __init__(self, data, protocolInfo): object.__init__(self) # Use thses so setattr can be more simple object.__setattr__(self, 'data', data) object.__setattr__(self, 'attrs', {}) self.protocolInfo = protocolInfo def __getattr__(self, key): try: return self.attrs[key.lower()] except KeyError: raise AttributeError(key) def __setattr__(self, key, value): key = key.lower() assert key in self.validattrs self.attrs[key] = value def toElement(self): root = Element('res') root.text = self.data for i in self.attrs: attr = self.validattrs[i] value = self.attrs[i] funname = 'format_%s' % attr if hasattr(self, funname): value = getattr(self, funname)(value) else: value = str(value) assert isinstance(value, str), \ 'value is not a string: %s' % repr(value) root.attrib[attr] = value return root @staticmethod def format_duration(s): if isinstance(s, str): return s # assume it is a number s = abs(s) secs = int(s) frac = s - secs minutes, secs = divmod(secs, 60) hours, minutes = divmod(minutes, 60) if frac: frac = ('%.2f' % frac)[1:] else: frac = '' return '%d:%02d:%02d%s' % (hours, minutes, secs, frac) class ResourceList(list): '''Special class to not overwrite mimetypes that already exist.''' def __init__(self, *args, **kwargs): self._mt = {} list.__init__(self, *args, **kwargs) def append(self, k): assert isinstance(k, Resource) mt = k.protocolInfo.split(':')[2] if mt in self._mt: return list.append(self, k) self._mt[mt] = k class Object(object): """The root class of the entire content directory class heirachy.""" klass = 'object' _optionattrs = { 'creator': 'dc', 'writeStatus': 'upnp', 'artist': 'upnp', 'actor': 'upnp', 'author': 'upnp', 'producer': 'upnp', 'director': 'upnp', 'publisher': 'dc', 'contributor': 'dc', 'genre': 'upnp', 'album': 'upnp', 'playlist': 'upnp', 'albumArtURI': 'upnp', 'artistDiscographyURI': 'upnp', 'lyricsURI': 'upnp', 'relation': 'dc', 'storageMedium': 'upnp', 'description': 'dc', 'longDescription': 'upnp', 'icon': 'upnp', 'region': 'upnp', 'rights': 'dc', 'date': 'dc', 'language': 'dc', 'playbackCount': 'upnp', 'lastPlaybackTime': 'upnp', 'lastPlaybackPosition': 'upnp', 'recordedStartDateTime': 'upnp', 'recordedDuration': 'upnp', 'recordedDayOfWeek': 'upnp', 'srsRecordScheduleID': 'upnp', 'srsRecordTaskID': 'upnp', 'recordable': 'upnp', 'programTitle': 'upnp', 'seriesTitle': 'upnp', 'programID': 'upnp', 'seriesID': 'upnp', 'channelID': 'upnp', 'episodeCount': 'upnp', 'episodeNumber': 'upnp', 'programCode': 'upnp', 'rating': 'upnp', 'channelGroupName': 'upnp', 'callSign': 'upnp', 'networkAffiliation': 'upnp', 'serviceProvider': 'upnp', 'price': 'upnp', 'payPerView': 'upnp', 'epgProviderName': 'upnp', 'dateTimeRange': 'upnp', 'radioCallSign': 'upnp', 'radioStationID': 'upnp', 'radioBand': 'upnp', 'channelNr': 'upnp', 'channelName': 'upnp', 'scheduledStartTime': 'upnp', 'scheduledEndTime': 'upnp', 'signalStrength': 'upnp', 'signalLocked': 'upnp', 'tuned': 'upnp', 'neverPlayable': 'upnp', 'bookmarkID': 'upnp', 'bookmarkedObjectID': 'upnp', 'deviceUDN': 'upnp', 'stateVariableCollection': 'upnp', 'DVDRegionCode': 'upnp', 'originalTrackNumber': 'upnp', 'toc': 'upnp', 'userAnnoation': 'upnp', } res = None content = property(lambda x: x._content) needupdate = None # do we update before sending? (for res) def __init__(self, cd, id, parentID, title, restricted=False, creator=None, **kwargs): self.cd = cd self.id = id self.parentID = parentID self.title = title self.creator = creator if restricted: self.restricted = '1' else: self.restricted = '0' if 'content' in kwargs: self._content = kwargs.pop('content') for i in kwargs: if i not in self._optionattrs: raise TypeError('invalid keyword arg: %s' % repr(i)) setattr(self, i, kwargs[i]) def __cmp__(self, other): if not isinstance(other, self.__class__): return cmp(self.__class__.__name__, other.__class__.__name__) return cmp(self.id, other.id) def __repr__(self): cls = self.__class__ return '<%s.%s: id: %s, parent: %s, title: %s>' % \ (cls.__module__, cls.__name__, self.id, self.parentID, self.title) def checkUpdate(self): # It's tempting to call doUpdate here, but each object has to # decide that. pass def toElement(self): root = Element(self.elementName) root.attrib['id'] = self.id root.attrib['parentID'] = self.parentID SubElement(root, 'dc:title').text = self.title SubElement(root, 'upnp:class').text = self.klass root.attrib['restricted'] = self.restricted for i in (x for x in self.__dict__ if x in self._optionattrs): obj = getattr(self, i) if obj is None: continue SubElement(root, '%s:%s' % (self._optionattrs[i], i)).text = str(getattr(self, i)) if self.res is not None: try: resiter = iter(self.res) except TypeError as x: resiter = [ self.res ] for res in resiter: root.append(res.toElement()) return root def toString(self): return tostring(self.toElement()) class Item(Object): """A class used to represent atomic (non-container) content objects.""" klass = Object.klass + '.item' elementName = 'item' refID = None needupdate = True def doUpdate(self, child=False): # do NOT update parent container per 2.2.9 for # ContainerUpdateID changes # must be Container.didUpdate, otherwise we could update the # parent when we really just want to update the ID Container.didUpdate(self.cd[self.parentID]) def toElement(self): root = Object.toElement(self) if self.refID is not None: SubElement(root, 'refID').text = self.refID return root class ImageItem(Item): klass = Item.klass + '.imageItem' class Photo(ImageItem): klass = ImageItem.klass + '.photo' class AudioItem(Item): """A piece of content that when rendered generates some audio.""" klass = Item.klass + '.audioItem' class MusicTrack(AudioItem): """A discrete piece of audio that should be interpreted as music.""" klass = AudioItem.klass + '.musicTrack' class AudioBroadcast(AudioItem): klass = AudioItem.klass + '.audioBroadcast' class AudioBook(AudioItem): klass = AudioItem.klass + '.audioBook' class VideoItem(Item): klass = Item.klass + '.videoItem' class Movie(VideoItem): klass = VideoItem.klass + '.movie' class VideoBroadcast(VideoItem): klass = VideoItem.klass + '.videoBroadcast' class MusicVideoClip(VideoItem): klass = VideoItem.klass + '.musicVideoClip' class PlaylistItem(Item): klass = Item.klass + '.playlistItem' class TextItem(Item): klass = Item.klass + '.textItem' class Container(Object, list): """An object that can contain other objects.""" klass = Object.klass + '.container' elementName = 'container' childCount = property(lambda x: len(x)) searchable = None updateID = 0 needcontupdate = False _optionattrs = Object._optionattrs.copy() _optionattrs.update({ 'searchClass': 'upnp', 'createClass': 'upnp', 'storageTotal': 'upnp', 'storageUsed': 'upnp', 'storageFree': 'upnp', 'storageMaxPartition': 'upnp', }) def __init__(self, cd, id, parentID, title, **kwargs): Object.__init__(self, cd, id, parentID, title, **kwargs) list.__init__(self) self.doingUpdate = False self.needcontupdate = False self.oldchildren = {} def genCurrent(self): '''This function returns a tuple/list/generator that returns tuples of (id, name). name must match what is returned by genChildren. If name is used for the title directly, no override is necessary.''' return ((x.id, x.title) for x in self) def genChildren(self): '''This function returns a list or dict of names of all the current children.''' raise NotImplementedError def createObject(self, i, arg=None): '''This function returns the (class, name, *args, **kwargs) that will be passed to the addItem method of the ContentDirectory. arg will be passed the value of the dict keyed by i if genChildren is a dict.''' raise NotImplementedError def sort(self, fun=None): if fun is not None: return list.sort(self, key=functools.cmp_to_key(fun)) else: return list.sort(self, key=lambda x: x.title) def doUpdate(self): if self.doingUpdate: return self.doingUpdate = True self.needcontupdate = False # Get the current children children = self.genChildren() if isinstance(children, dict): oldchildren = self.oldchildren self.oldchildren = children isdict = True else: children = set(children) isdict = False # Delete the old object that no longer exists. # Make a mapping of current names to ids. names = {} print('i:', repr(self), repr(self.genCurrent), repr(self.__class__)) for id, i in tuple(self.genCurrent()): if i not in children: didupdate = True # delete print('del:', repr(id), repr(i)) self.cd.delItem(id) self.needcontupdate = True else: names[i] = id # Make sure that the existing objects don't need to be # updated. # Create any new objects that don't currently exist. for i in children: if i in names: if isdict: print('oc:', repr(oldchildren[i]), repr(children[i])) if oldchildren[i] == children[i]: continue # Delete the old and recreate self.cd.delItem(names[i]) self.needcontupdate = True else: # XXX - some sort of comparision? continue # new object if isdict: args = (children[i], ) else: args = () try: #print 'i:', `i`, `isdict`, `args`, `self` pass except UnicodeEncodeError: print('i decode error') klass, name, args, kwargs = self.createObject(i, *args) if klass is not None: self.cd.addItem(self.id, klass, name, *args, **kwargs) self.needcontupdate = True # sort our children self.sort() self.doingUpdate = False if self.needcontupdate: self.didUpdate() def didUpdate(self): if self.doingUpdate: self.needcontupdate = True return if self.id == '0': self.updateID = (self.updateID + 1) else: self.updateID = (self.updateID + 1) % (1 << 32) Container.didUpdate(self.cd['0']) def _addSet(self, e, items): if items is not None: if not isinstance(items, (list, tuple)): items = [ items ] for i in items: el = SubElement(root, e) el.text = i # XXX - how to specify? el.attrib['includeDerived'] = '1' def toElement(self): root = Object.toElement(self) # only include if we have children, it's possible we don't # have our children yet, and childCount is optional. if self.childCount: root.attrib['childCount'] = str(self.childCount) if self.searchable is not None: root.attrib['searchable'] = str(self.searchable) return root def __repr__(self): cls = self.__class__ return '<%s.%s: id: %s, parent: %s, title: %s, cnt: %d>' % \ (cls.__module__, cls.__name__, self.id, self.parentID, self.title, len(self)) class TestContainerObj(Container): def genChildren(self): return self._genchildren def createObject(self, name): return Object, name, (), {} class MockContainer(object): def __init__(self): self.itemiter = itertools.count(1) def addItem(self, *args, **kwargs): return next(self.itemiter) def __getitem__(self, id): return Container(None, '0', None, None) class TestContainer(unittest.TestCase): def xtest_container(self): cont = MockContainer() c = TestContainerObj(cont, None, None, None) self.assertEqual(len(tuple(c.genCurrent())), 0) c._genchildren = [ 'objb', 'obja' ] c.doUpdate() self.assertEqual(tuple(c.genCurrent()), ((1, 'obja'), (2, 'objb'))) class Person(Container): klass = Container.klass + '.person' class MusicArtist(Person): klass = Person.klass + '.musicArtist' class PlaylistContainer(Container): klass = Container.klass + '.playlistContainer' class Album(Container): klass = Container.klass + '.album' class MusicAlbum(Album): klass = Album.klass + '.musicAlbum' class PhotoAlbum(Album): klass = Album.klass + '.photoAlbum' class Genre(Container): klass = Container.klass + '.genre' class MusicGenre(Genre): klass = Genre.klass + '.musicGenre' class MovieGenre(Genre): klass = Genre.klass + '.movieGenre' class StorageSystem(Container): klass = Container.klass + '.storageSystem' storageTotal = -1 storageUsed = -1 storageFree = -1 storageMaxParition = -1 storageMedium = 'UNKNOWN' class StorageVolume(Container): klass = Container.klass + '.storageVolume' storageTotal = -1 storageUsed = -1 storageFree = -1 storageMedium = 'UNKNOWN' class StorageFolder(Container): klass = Container.klass + '.storageFolder' storageUsed = -1 class DIDLElement(Element): def __init__(self): super().__init__('DIDL-Lite', {}) self.attrib['xmlns'] = 'urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/' self.attrib['xmlns:dc'] = 'http://purl.org/dc/elements/1.1/' self.attrib['xmlns:upnp'] = 'urn:schemas-upnp-org:metadata-1-0/upnp' def addItem(self, item): self.append(item.toElement()) def numItems(self): return len(self) def toString(self): return tostring(self) if __name__ == '__main__': root = DIDLElement() root.addItem(Container(None, '0\Movie\\', '0\\', 'Movie')) root.addItem(Container(None, '0\Music\\', '0\\', 'Music')) root.addItem(Container(None, '0\Photo\\', '0\\', 'Photo')) root.addItem(Container(None, '0\OnlineMedia\\', '0\\', 'OnlineMedia')) print(tostring(root))