A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

75 lines
1.8 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006 John-Mark Gurney <jmg@funkthat.com>
  3. __version__ = '$Change$'
  4. # $Id$
  5. from twisted.internet import reactor
  6. import twisted.internet.error
  7. # Make sure to update the global line when adding a new function
  8. __all__ = [ 'appendnamespace', 'insertnamespace', 'insertringbuf' ]
  9. DEFAULTRINGSIZE = 50
  10. appendnamespace = lambda k, v: []
  11. insertnamespace = lambda k, v: None
  12. insertringbuf = lambda k, l = DEFAULTRINGSIZE: None
  13. # This code from: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/68429
  14. class RingBuffer:
  15. def __init__(self,size_max):
  16. self.max = size_max
  17. self.data = []
  18. def append(self,x):
  19. """append an element at the end of the buffer"""
  20. self.data.append(x)
  21. if len(self.data) == self.max:
  22. self.cur=0
  23. self.__class__ = RingBufferFull
  24. def get(self):
  25. """ return a list of elements from the oldest to the newest"""
  26. return self.data
  27. class RingBufferFull:
  28. def __init__(self,n):
  29. raise "you should use RingBuffer"
  30. def append(self,x):
  31. self.data[self.cur]=x
  32. self.cur=(self.cur+1) % self.max
  33. def get(self):
  34. return self.data[self.cur:]+self.data[:self.cur]
  35. def doDebugging(opt):
  36. if not opt:
  37. return
  38. global insertnamespace, appendnamespace, insertringbuf
  39. def insertnamespace(k, v):
  40. assert isinstance(k, str)
  41. sf.namespace[k] = v
  42. def appendnamespace(k, v):
  43. try:
  44. sf.namespace[k].append(v)
  45. except KeyError:
  46. sf.namespace[k] = [ v ]
  47. def insertringbuf(k, l = DEFAULTRINGSIZE):
  48. insertnamespace(k, RingBuffer(l))
  49. from twisted.manhole import telnet
  50. class Debug(telnet.Shell):
  51. def welcomeMessage(self):
  52. data = [ '', 'PyMedS Debugging Console', '', '' ]
  53. return '\r\n'.join(data)
  54. sf = telnet.ShellFactory()
  55. sf.protocol = Debug
  56. try:
  57. reactor.listenTCP(56283, sf)
  58. except twisted.internet.error.CannotListenError:
  59. print('WARNING: cannot bind to debugger port.')