Yet Another Denon Python Module
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

460 lines
10 KiB

  1. #!/usr/bin/env python
  2. __author__ = 'John-Mark Gurney'
  3. __copyright__ = 'Copyright 2017 John-Mark Gurney. All rights reserved.'
  4. __license__ = '2-clause BSD license'
  5. # Copyright 2017, John-Mark Gurney
  6. # All rights reserved.
  7. #
  8. # Redistribution and use in source and binary forms, with or without
  9. # modification, are permitted provided that the following conditions are met:
  10. #
  11. # 1. Redistributions of source code must retain the above copyright notice, this
  12. # list of conditions and the following disclaimer.
  13. # 2. Redistributions in binary form must reproduce the above copyright notice,
  14. # this list of conditions and the following disclaimer in the documentation
  15. # and/or other materials provided with the distribution.
  16. #
  17. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  18. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  19. # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  20. # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  21. # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  22. # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  23. # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  24. # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  26. # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27. #
  28. # The views and conclusions contained in the software and documentation are those
  29. # of the authors and should not be interpreted as representing official policies,
  30. # either expressed or implied, of the Project.
  31. from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
  32. from twisted.protocols import basic
  33. from twisted.test import proto_helpers
  34. from twisted.trial import unittest
  35. import mock
  36. import time
  37. import twisted.internet.serialport
  38. __all__ = [ 'DenonAVR' ]
  39. class DenonAVR(object,basic.LineReceiver):
  40. delimiter = '\r'
  41. timeOut = 1
  42. def __init__(self, serdev):
  43. '''Specify the serial device connected to the Denon AVR.'''
  44. self._ser = twisted.internet.serialport.SerialPort(self, serdev, None, baudrate=9600)
  45. self._cmdswaiting = {}
  46. self._power = None
  47. self._vol = None
  48. self._volmax = None
  49. self._speakera = None
  50. self._speakerb = None
  51. self._z2mute = None
  52. self._zm = None
  53. self._ms = None
  54. def _magic(cmd, attrname, settrans, args, doc):
  55. def getter(self):
  56. return getattr(self, attrname)
  57. def setter(self, arg):
  58. arg = settrans(arg)
  59. if arg != getattr(self, attrname):
  60. self._sendcmd(cmd, args[arg])
  61. return property(getter, setter, doc=doc)
  62. @property
  63. def ms(self):
  64. 'Surround mode'
  65. return self._ms
  66. power = _magic('PW', '_power', bool, { True: 'ON', False: 'STANDBY' }, 'Power status, True if on')
  67. mute = _magic('MU', '_mute', bool, { True: 'ON', False: 'OFF' }, 'Mute speakers, True speakers are muted (no sound)')
  68. z2mute = _magic('Z2MU', '_z2mute', bool, { True: 'ON', False: 'OFF' }, 'Mute Zone 2 speakers, True speakers are muted (no sound)')
  69. @staticmethod
  70. def _makevolarg(arg):
  71. arg = int(arg)
  72. if arg < 0 or arg > 99:
  73. raise ValueError('Volume out of range.')
  74. arg -= 1
  75. arg %= 100
  76. return '%02d' % arg
  77. @staticmethod
  78. def _parsevolarg(arg):
  79. arg = int(arg)
  80. if arg < 0 or arg > 99:
  81. raise ValueError('Volume out of range.')
  82. arg += 1
  83. arg %= 100
  84. return arg
  85. @property
  86. def vol(self):
  87. 'Volumn, range 0 through 99'
  88. return self._vol
  89. @vol.setter
  90. def vol(self, arg):
  91. if arg == self._vol:
  92. return
  93. if self._volmax is not None and arg > self._volmax:
  94. raise ValueError('volume %d, exceeds max: %d' % (arg,
  95. self._volmax))
  96. arg = self._makevolarg(arg)
  97. self._sendcmd('MV', arg)
  98. self.process_events(till='MV')
  99. self.process_events(till='MV')
  100. @property
  101. def volmax(self):
  102. 'Maximum volume supported.'
  103. return self._volmax
  104. def proc_PW(self, arg):
  105. if arg == 'STANDBY':
  106. self._power = False
  107. elif arg == 'ON':
  108. self._power = True
  109. else:
  110. raise RuntimeError('unknown PW arg: %s' % `arg`)
  111. def proc_MU(self, arg):
  112. if arg == 'ON':
  113. self._mute = True
  114. elif arg == 'OFF':
  115. self._mute = False
  116. else:
  117. raise RuntimeError('unknown MU arg: %s' % `arg`)
  118. def proc_ZM(self, arg):
  119. if arg == 'ON':
  120. self._zm = True
  121. elif arg == 'OFF':
  122. self._zm = False
  123. else:
  124. raise RuntimeError('unknown ZM arg: %s' % `arg`)
  125. def proc_MV(self, arg):
  126. if arg[:4] == 'MAX ':
  127. self._volmax = self._parsevolarg(arg[4:])
  128. else:
  129. self._vol = self._parsevolarg(arg)
  130. def proc_MS(self, arg):
  131. self._ms = arg
  132. def proc_PS(self, arg):
  133. if arg == 'FRONT A':
  134. self._speakera = True
  135. self._speakerb = False
  136. else:
  137. raise RuntimeError('unknown PS arg: %s' % `arg`)
  138. def proc_Z2(self, arg):
  139. if arg == 'MUOFF':
  140. self._z2mute = False
  141. else:
  142. raise RuntimeError('unknown Z2 arg: %s' % `arg`)
  143. def _sendcmd(self, cmd, args):
  144. cmd = '%s%s' % (cmd, args)
  145. #print 'sendcmd:', `cmd`
  146. self.sendLine(cmd)
  147. def lineReceived(self, event):
  148. '''Process a line from the AVR.'''
  149. #print 'lR:', `event`
  150. if len(event) >= 2:
  151. fun = getattr(self, 'proc_%s' % event[:2])
  152. fun(event[2:])
  153. for d in self._cmdswaiting.pop(event[:2], []):
  154. d.callback(event)
  155. def _waitfor(self, resp):
  156. d = Deferred()
  157. cmd = resp[:2]
  158. self._cmdswaiting.setdefault(cmd, []).append(d)
  159. if len(resp) > 2:
  160. @inlineCallbacks
  161. def extraresp(d=d):
  162. while True:
  163. r = yield d
  164. if r.startswith(resp):
  165. returnValue(r)
  166. d = self._waitfor(cmd)
  167. d = extraresp()
  168. return d
  169. @inlineCallbacks
  170. def update(self):
  171. '''Update the status of the AVR. This ensures that the
  172. state of the object matches the amp.'''
  173. d = self._waitfor('PW')
  174. self._sendcmd('PW', '?')
  175. d = yield d
  176. d = self._waitfor('MVMAX')
  177. self._sendcmd('MV', '?')
  178. d = yield d
  179. class TestDenon(unittest.TestCase):
  180. TEST_DEV = '/dev/tty.usbserial-FTC8DHBJ'
  181. def test_comms(self):
  182. # comment out to make it easy to restore skip
  183. self.skipTest('perf')
  184. avr = DenonAVR(self.TEST_DEV)
  185. self.assertIsNone(avr.power)
  186. avr.update()
  187. self.assertIsNotNone(avr.power)
  188. self.assertIsNotNone(avr.vol)
  189. avr.power = False
  190. time.sleep(1)
  191. avr.power = True
  192. self.assertTrue(avr.power)
  193. print 'foostart'
  194. time.sleep(1)
  195. avr.update()
  196. time.sleep(1)
  197. avr.vol = 0
  198. self.assertEqual(avr.vol, 0)
  199. time.sleep(1)
  200. avr.vol = 5
  201. avr.update()
  202. self.assertEqual(avr.vol, 5)
  203. avr.vol = 50
  204. avr.update()
  205. self.assertEqual(avr.vol, 50)
  206. avr.power = False
  207. self.assertFalse(avr.power)
  208. self.assertIsNotNone(avr.volmax)
  209. class TestStaticMethods(unittest.TestCase):
  210. def test_makevolarg(self):
  211. self.assertRaises(ValueError, DenonAVR._makevolarg, -1)
  212. self.assertRaises(ValueError, DenonAVR._makevolarg, 3874)
  213. self.assertRaises(ValueError, DenonAVR._makevolarg, 100)
  214. self.assertEqual(DenonAVR._makevolarg(0), '99')
  215. self.assertEqual(DenonAVR._makevolarg(1), '00')
  216. self.assertEqual(DenonAVR._makevolarg(99), '98')
  217. def test_parsevolarg(self):
  218. self.assertEqual(DenonAVR._parsevolarg('99'), 0)
  219. self.assertEqual(DenonAVR._parsevolarg('00'), 1)
  220. self.assertEqual(DenonAVR._parsevolarg('98'), 99)
  221. self.assertRaises(ValueError, DenonAVR._parsevolarg, '-1')
  222. class TestMethods(unittest.TestCase):
  223. @mock.patch('twisted.internet.serialport.SerialPort')
  224. def setUp(self, sfu):
  225. self.avr = DenonAVR('null')
  226. self.tr = proto_helpers.StringTransport()
  227. self.avr.makeConnection(self.tr)
  228. @staticmethod
  229. def getTimeout():
  230. return .1
  231. @inlineCallbacks
  232. def test_update(self):
  233. avr = self.avr
  234. d = avr.update()
  235. self.assertEqual(self.tr.value(), 'PW?\r')
  236. avr.dataReceived('PWSTANDBY\r')
  237. avr.dataReceived('MV51\rMVMAX 80\r')
  238. d = yield d
  239. self.assertEqual(self.tr.value(), 'PW?\rMV?\r')
  240. self.assertEqual(avr.power, False)
  241. self.assertIsNone(d)
  242. self.tr.clear()
  243. d = avr.update()
  244. self.assertEqual(self.tr.value(), 'PW?\r')
  245. avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
  246. avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
  247. d = yield d
  248. self.assertEqual(self.tr.value(), 'PW?\rMV?\r')
  249. self.assertEqual(avr.power, True)
  250. self.assertIsNone(d)
  251. @inlineCallbacks
  252. def test_waitfor(self):
  253. avr = self.avr
  254. avr.proc_AB = lambda arg: None
  255. d = avr._waitfor('AB123')
  256. # make sure that matching, but different response doesn't trigger
  257. avr.dataReceived('ABABC\r')
  258. self.assertFalse(d.called)
  259. # make sure that it triggers
  260. avr.dataReceived('AB123\r')
  261. self.assertTrue(d.called)
  262. d = yield d
  263. # and we get correct response
  264. self.assertEqual(d, 'AB123')
  265. def test_proc_events(self):
  266. avr = self.avr
  267. self.avr.dataReceived('PWON\r')
  268. self.assertEqual(avr.power, True)
  269. self.avr.dataReceived('MUON\r' + 'PWON\r')
  270. self.assertEqual(avr.mute, True)
  271. self.assertEqual(avr.power, True)
  272. self.avr.dataReceived('PWSTANDBY\r')
  273. self.assertEqual(avr.power, False)
  274. @mock.patch('yadenon.DenonAVR.sendLine')
  275. def test_proc_PW(self, sendline):
  276. avr = self.avr
  277. avr.proc_PW('STANDBY')
  278. self.assertEqual(avr.power, False)
  279. avr.proc_PW('ON')
  280. self.assertEqual(avr.power, True)
  281. self.assertRaises(RuntimeError, avr.proc_PW, 'foobar')
  282. avr.power = False
  283. sendline.assert_any_call('PWSTANDBY')
  284. def test_proc_MU(self):
  285. avr = self.avr
  286. avr.proc_MU('ON')
  287. self.assertEqual(avr.mute, True)
  288. avr.proc_MU('OFF')
  289. self.assertEqual(avr.mute, False)
  290. self.assertRaises(RuntimeError, avr.proc_MU, 'foobar')
  291. def test_proc_PS(self):
  292. avr = self.avr
  293. avr.proc_PS('FRONT A')
  294. self.assertEqual(avr._speakera, True)
  295. self.assertEqual(avr._speakerb, False)
  296. self.assertRaises(RuntimeError, avr.proc_PS, 'foobar')
  297. def test_proc_Z2(self):
  298. avr = self.avr
  299. avr.proc_Z2('MUOFF')
  300. self.assertEqual(avr.z2mute, False)
  301. self.assertRaises(RuntimeError, avr.proc_Z2, 'foobar')
  302. def test_proc_MS(self):
  303. avr = self.avr
  304. avr.proc_MS('STEREO')
  305. self.assertEqual(avr.ms, 'STEREO')
  306. def test_proc_ZM(self):
  307. avr = self.avr
  308. avr.proc_ZM('ON')
  309. self.assertEqual(avr._zm, True)
  310. avr.proc_ZM('OFF')
  311. self.assertEqual(avr._zm, False)
  312. self.assertRaises(RuntimeError, avr.proc_ZM, 'foobar')
  313. def test_proc_MV(self):
  314. avr = self.avr
  315. avr.proc_MV('MAX 80')
  316. self.assertEqual(avr.volmax, 81)
  317. avr.proc_MV('99')
  318. self.assertEqual(avr.vol, 0)
  319. avr.vol = 0
  320. self.assertRaises(ValueError, setattr, avr, 'vol', 82)