|
|
@@ -31,20 +31,27 @@ __license__ = '2-clause BSD license' |
|
|
|
# of the authors and should not be interpreted as representing official policies, |
|
|
|
# either expressed or implied, of the Project. |
|
|
|
|
|
|
|
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue |
|
|
|
from twisted.protocols import basic |
|
|
|
from twisted.test import proto_helpers |
|
|
|
from twisted.trial import unittest |
|
|
|
import mock |
|
|
|
import serial |
|
|
|
import unittest |
|
|
|
import threading |
|
|
|
import time |
|
|
|
import twisted.internet.serialport |
|
|
|
|
|
|
|
__all__ = [ 'DenonAVR' ] |
|
|
|
|
|
|
|
class DenonAVR(object): |
|
|
|
class DenonAVR(object,basic.LineReceiver): |
|
|
|
delimiter = '\r' |
|
|
|
timeOut = 1 |
|
|
|
|
|
|
|
def __init__(self, serdev): |
|
|
|
'''Specify the serial device connected to the Denon AVR.''' |
|
|
|
|
|
|
|
self._ser = serial.serial_for_url(serdev, baudrate=9600, |
|
|
|
timeout=.5) |
|
|
|
self._ser = twisted.internet.serialport.SerialPort(self, serdev, None, baudrate=9600) |
|
|
|
self._cmdswaiting = {} |
|
|
|
|
|
|
|
self._power = None |
|
|
|
self._vol = None |
|
|
|
self._volmax = None |
|
|
@@ -54,28 +61,26 @@ class DenonAVR(object): |
|
|
|
self._zm = None |
|
|
|
self._ms = None |
|
|
|
|
|
|
|
def _magic(cmd, attrname, settrans, args, doc): |
|
|
|
def getter(self): |
|
|
|
return getattr(self, attrname) |
|
|
|
|
|
|
|
def setter(self, arg): |
|
|
|
arg = settrans(arg) |
|
|
|
if arg != getattr(self, attrname): |
|
|
|
self._sendcmd(cmd, args[arg]) |
|
|
|
|
|
|
|
return property(getter, setter, doc=doc) |
|
|
|
|
|
|
|
@property |
|
|
|
def ms(self): |
|
|
|
'Surround mode' |
|
|
|
|
|
|
|
return self._ms |
|
|
|
|
|
|
|
@property |
|
|
|
def power(self): |
|
|
|
'Power status, True if on' |
|
|
|
|
|
|
|
return self._power |
|
|
|
|
|
|
|
@power.setter |
|
|
|
def power(self, arg): |
|
|
|
arg = bool(arg) |
|
|
|
|
|
|
|
if arg != self._power: |
|
|
|
args = { True: 'ON', False: 'STANDBY' } |
|
|
|
self._sendcmd('PW', args[arg]) |
|
|
|
self.process_events(till='PW') |
|
|
|
time.sleep(1) |
|
|
|
self.update() |
|
|
|
power = _magic('PW', '_power', bool, { True: 'ON', False: 'STANDBY' }, 'Power status, True if on') |
|
|
|
mute = _magic('MU', '_mute', bool, { True: 'ON', False: 'OFF' }, 'Mute speakers, True speakers are muted (no sound)') |
|
|
|
z2mute = _magic('Z2MU', '_z2mute', bool, { True: 'ON', False: 'OFF' }, 'Mute Zone 2 speakers, True speakers are muted (no sound)') |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _makevolarg(arg): |
|
|
@@ -173,11 +178,11 @@ class DenonAVR(object): |
|
|
|
raise RuntimeError('unknown Z2 arg: %s' % `arg`) |
|
|
|
|
|
|
|
def _sendcmd(self, cmd, args): |
|
|
|
cmd = '%s%s\r' % (cmd, args) |
|
|
|
cmd = '%s%s' % (cmd, args) |
|
|
|
|
|
|
|
print 'scmd:', `cmd` |
|
|
|
self._ser.write(cmd) |
|
|
|
self._ser.flush() |
|
|
|
#print 'sendcmd:', `cmd` |
|
|
|
|
|
|
|
self.sendLine(cmd) |
|
|
|
|
|
|
|
def _readcmd(self, timo=None): |
|
|
|
'''If timo == 0, and the first read returns the empty string, |
|
|
@@ -209,36 +214,68 @@ class DenonAVR(object): |
|
|
|
|
|
|
|
return cmd |
|
|
|
|
|
|
|
def process_events(self, till=None): |
|
|
|
'''Process events until the till command is received, otherwise |
|
|
|
process a single event.''' |
|
|
|
def lineReceived(self, event): |
|
|
|
'''Process a line from the AVR.''' |
|
|
|
|
|
|
|
#print 'lR:', `event` |
|
|
|
if len(event) >= 2: |
|
|
|
fun = getattr(self, 'proc_%s' % event[:2]) |
|
|
|
fun(event[2:]) |
|
|
|
|
|
|
|
for d in self._cmdswaiting.pop(event[:2], []): |
|
|
|
d.callback(event) |
|
|
|
|
|
|
|
def _waitfor(self, resp): |
|
|
|
d = Deferred() |
|
|
|
|
|
|
|
cmd = resp[:2] |
|
|
|
self._cmdswaiting.setdefault(cmd, []).append(d) |
|
|
|
|
|
|
|
assert till is None or len(till) == 2 |
|
|
|
while True: |
|
|
|
event = self._readcmd() |
|
|
|
# XXX - not sure how to test this code to ensure that |
|
|
|
# d isn't triggered till resp is received |
|
|
|
# if resp is changed to cmd, test_update still passes, |
|
|
|
# though it shouldn't. |
|
|
|
# Probably need to |
|
|
|
if len(resp) > 2: |
|
|
|
@inlineCallbacks |
|
|
|
def extraresp(d=d): |
|
|
|
while True: |
|
|
|
r = yield d |
|
|
|
if r.startswith(resp): |
|
|
|
returnValue(r) |
|
|
|
return |
|
|
|
|
|
|
|
if len(event) >= 2: |
|
|
|
fun = getattr(self, 'proc_%s' % event[:2]) |
|
|
|
fun(event[2:]) |
|
|
|
d = self._waitfor(cmd) |
|
|
|
|
|
|
|
if till is None or event[:2] == till: |
|
|
|
return event |
|
|
|
d = extraresp() |
|
|
|
|
|
|
|
return d |
|
|
|
|
|
|
|
@inlineCallbacks |
|
|
|
def update(self): |
|
|
|
'''Update the status of the AVR. This ensures that the |
|
|
|
state of the object matches the amp.''' |
|
|
|
|
|
|
|
d = self._waitfor('PW') |
|
|
|
|
|
|
|
self._sendcmd('PW', '?') |
|
|
|
|
|
|
|
d = yield d |
|
|
|
|
|
|
|
d = self._waitfor('MVMAX') |
|
|
|
|
|
|
|
self._sendcmd('MV', '?') |
|
|
|
self.process_events(till='MV') # first vol |
|
|
|
self.process_events(till='MV') # second max vol |
|
|
|
|
|
|
|
d = yield d |
|
|
|
|
|
|
|
class TestDenon(unittest.TestCase): |
|
|
|
TEST_DEV = '/dev/tty.usbserial-FTC8DHBJ' |
|
|
|
|
|
|
|
# comment out to make it easy to restore skip |
|
|
|
@unittest.skip('perf') |
|
|
|
#@unittest.TestCase.skipTest('perf') |
|
|
|
def test_comms(self): |
|
|
|
self.skipTest('perf') |
|
|
|
|
|
|
|
avr = DenonAVR(self.TEST_DEV) |
|
|
|
self.assertIsNone(avr.power) |
|
|
|
|
|
|
@@ -303,30 +340,93 @@ class TestStaticMethods(unittest.TestCase): |
|
|
|
self.assertRaises(ValueError, DenonAVR._parsevolarg, '-1') |
|
|
|
|
|
|
|
class TestMethods(unittest.TestCase): |
|
|
|
@mock.patch('serial.serial_for_url') |
|
|
|
@mock.patch('twisted.internet.serialport.SerialPort') |
|
|
|
def setUp(self, sfu): |
|
|
|
self.avr = DenonAVR('null') |
|
|
|
self.tr = proto_helpers.StringTransport() |
|
|
|
self.avr.makeConnection(self.tr) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def getTimeout(): |
|
|
|
return .1 |
|
|
|
|
|
|
|
@inlineCallbacks |
|
|
|
def test_update(self): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
d = avr.update() |
|
|
|
|
|
|
|
self.assertEqual(self.tr.value(), 'PW?\r') |
|
|
|
|
|
|
|
avr.dataReceived('PWSTANDBY\r') |
|
|
|
|
|
|
|
avr.dataReceived('MV51\rMVMAX 80\r') |
|
|
|
|
|
|
|
d = yield d |
|
|
|
|
|
|
|
self.assertEqual(self.tr.value(), 'PW?\rMV?\r') |
|
|
|
|
|
|
|
self.assertEqual(avr.power, False) |
|
|
|
self.assertIsNone(d) |
|
|
|
|
|
|
|
self.tr.clear() |
|
|
|
|
|
|
|
d = avr.update() |
|
|
|
|
|
|
|
self.assertEqual(self.tr.value(), 'PW?\r') |
|
|
|
|
|
|
|
avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r') |
|
|
|
|
|
|
|
avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r') |
|
|
|
|
|
|
|
d = yield d |
|
|
|
|
|
|
|
self.assertEqual(self.tr.value(), 'PW?\rMV?\r') |
|
|
|
|
|
|
|
self.assertEqual(avr.power, True) |
|
|
|
self.assertIsNone(d) |
|
|
|
|
|
|
|
@inlineCallbacks |
|
|
|
def test_waitfor(self): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
avr.proc_AB = lambda arg: None |
|
|
|
|
|
|
|
d = avr._waitfor('AB123') |
|
|
|
|
|
|
|
# make sure that matching, but different response doesn't trigger |
|
|
|
avr.dataReceived('ABABC\r') |
|
|
|
self.assertFalse(d.called) |
|
|
|
|
|
|
|
# make sure that it triggers |
|
|
|
avr.dataReceived('AB123\r') |
|
|
|
|
|
|
|
self.assertTrue(d.called) |
|
|
|
|
|
|
|
d = yield d |
|
|
|
|
|
|
|
# and we get correct response |
|
|
|
self.assertEqual(d, 'AB123') |
|
|
|
|
|
|
|
def test_proc_events(self): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
avr._ser.read.side_effect = 'PWON\r' |
|
|
|
avr.process_events() |
|
|
|
self.avr.dataReceived('PWON\r') |
|
|
|
|
|
|
|
self.assertTrue(avr._ser.read.called) |
|
|
|
self.assertEqual(avr.power, True) |
|
|
|
|
|
|
|
avr._ser.read.reset() |
|
|
|
self.avr.dataReceived('MUON\r' + 'PWON\r') |
|
|
|
|
|
|
|
avr._ser.read.side_effect = 'MUON\r' + 'PWON\r' |
|
|
|
avr.process_events(till='PW') |
|
|
|
self.assertEqual(avr.mute, True) |
|
|
|
self.assertEqual(avr.power, True) |
|
|
|
|
|
|
|
avr._ser.read.assert_has_calls([ mock.call(), mock.call() ]) |
|
|
|
self.avr.dataReceived('PWSTANDBY\r') |
|
|
|
|
|
|
|
@mock.patch('yadenon.DenonAVR._sendcmd') |
|
|
|
@mock.patch('yadenon.DenonAVR.process_events') |
|
|
|
self.assertEqual(avr.power, False) |
|
|
|
|
|
|
|
@mock.patch('yadenon.DenonAVR.sendLine') |
|
|
|
@mock.patch('time.sleep') |
|
|
|
@mock.patch('yadenon.DenonAVR.update') |
|
|
|
def test_proc_PW(self, mupdate, msleep, mpevents, msendcmd): |
|
|
|
def test_proc_PW(self, msleep, sendline): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
avr.proc_PW('STANDBY') |
|
|
@@ -338,16 +438,16 @@ class TestMethods(unittest.TestCase): |
|
|
|
self.assertRaises(RuntimeError, avr.proc_PW, 'foobar') |
|
|
|
|
|
|
|
avr.power = False |
|
|
|
msendcmd.assert_any_call('PW', 'STANDBY') |
|
|
|
sendline.assert_any_call('PWSTANDBY') |
|
|
|
|
|
|
|
def test_proc_MU(self): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
avr.proc_MU('ON') |
|
|
|
self.assertEqual(avr._mute, True) |
|
|
|
self.assertEqual(avr.mute, True) |
|
|
|
|
|
|
|
avr.proc_MU('OFF') |
|
|
|
self.assertEqual(avr._mute, False) |
|
|
|
self.assertEqual(avr.mute, False) |
|
|
|
|
|
|
|
self.assertRaises(RuntimeError, avr.proc_MU, 'foobar') |
|
|
|
|
|
|
@@ -364,7 +464,7 @@ class TestMethods(unittest.TestCase): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
avr.proc_Z2('MUOFF') |
|
|
|
self.assertEqual(avr._z2mute, False) |
|
|
|
self.assertEqual(avr.z2mute, False) |
|
|
|
|
|
|
|
self.assertRaises(RuntimeError, avr.proc_Z2, 'foobar') |
|
|
|
|
|
|
@@ -385,8 +485,7 @@ class TestMethods(unittest.TestCase): |
|
|
|
|
|
|
|
self.assertRaises(RuntimeError, avr.proc_ZM, 'foobar') |
|
|
|
|
|
|
|
@mock.patch('yadenon.DenonAVR.process_events') |
|
|
|
def test_proc_MV(self, pe): |
|
|
|
def test_proc_MV(self): |
|
|
|
avr = self.avr |
|
|
|
|
|
|
|
avr.proc_MV('MAX 80') |
|
|
@@ -397,9 +496,6 @@ class TestMethods(unittest.TestCase): |
|
|
|
|
|
|
|
avr.vol = 0 |
|
|
|
|
|
|
|
# we don't call this as we don't get a response |
|
|
|
pe.assert_not_called() |
|
|
|
|
|
|
|
self.assertRaises(ValueError, setattr, avr, 'vol', 82) |
|
|
|
|
|
|
|
def test_readcmd(self): |
|
|
|