diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..24701a7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +mock +twisted +pyserial diff --git a/yadenon.py b/yadenon.py index 0b6cc2a..0edd468 100644 --- a/yadenon.py +++ b/yadenon.py @@ -31,20 +31,27 @@ __license__ = '2-clause BSD license' # of the authors and should not be interpreted as representing official policies, # either expressed or implied, of the Project. +from twisted.internet.defer import inlineCallbacks, Deferred, returnValue +from twisted.protocols import basic +from twisted.test import proto_helpers +from twisted.trial import unittest import mock -import serial -import unittest import threading import time +import twisted.internet.serialport __all__ = [ 'DenonAVR' ] -class DenonAVR(object): +class DenonAVR(object,basic.LineReceiver): + delimiter = '\r' + timeOut = 1 + def __init__(self, serdev): '''Specify the serial device connected to the Denon AVR.''' - self._ser = serial.serial_for_url(serdev, baudrate=9600, - timeout=.5) + self._ser = twisted.internet.serialport.SerialPort(self, serdev, None, baudrate=9600) + self._cmdswaiting = {} + self._power = None self._vol = None self._volmax = None @@ -54,28 +61,26 @@ class DenonAVR(object): self._zm = None self._ms = None + def _magic(cmd, attrname, settrans, args, doc): + def getter(self): + return getattr(self, attrname) + + def setter(self, arg): + arg = settrans(arg) + if arg != getattr(self, attrname): + self._sendcmd(cmd, args[arg]) + + return property(getter, setter, doc=doc) + @property def ms(self): 'Surround mode' return self._ms - @property - def power(self): - 'Power status, True if on' - - return self._power - - @power.setter - def power(self, arg): - arg = bool(arg) - - if arg != self._power: - args = { True: 'ON', False: 'STANDBY' } - self._sendcmd('PW', args[arg]) - self.process_events(till='PW') - time.sleep(1) - self.update() + power = _magic('PW', '_power', bool, { True: 'ON', False: 'STANDBY' }, 'Power status, True if on') + mute = _magic('MU', '_mute', bool, { True: 'ON', False: 'OFF' }, 'Mute speakers, True speakers are muted (no sound)') + z2mute = _magic('Z2MU', '_z2mute', bool, { True: 'ON', False: 'OFF' }, 'Mute Zone 2 speakers, True speakers are muted (no sound)') @staticmethod def _makevolarg(arg): @@ -173,11 +178,11 @@ class DenonAVR(object): raise RuntimeError('unknown Z2 arg: %s' % `arg`) def _sendcmd(self, cmd, args): - cmd = '%s%s\r' % (cmd, args) + cmd = '%s%s' % (cmd, args) - print 'scmd:', `cmd` - self._ser.write(cmd) - self._ser.flush() + #print 'sendcmd:', `cmd` + + self.sendLine(cmd) def _readcmd(self, timo=None): '''If timo == 0, and the first read returns the empty string, @@ -209,36 +214,68 @@ class DenonAVR(object): return cmd - def process_events(self, till=None): - '''Process events until the till command is received, otherwise - process a single event.''' + def lineReceived(self, event): + '''Process a line from the AVR.''' + + #print 'lR:', `event` + if len(event) >= 2: + fun = getattr(self, 'proc_%s' % event[:2]) + fun(event[2:]) + + for d in self._cmdswaiting.pop(event[:2], []): + d.callback(event) + + def _waitfor(self, resp): + d = Deferred() + + cmd = resp[:2] + self._cmdswaiting.setdefault(cmd, []).append(d) - assert till is None or len(till) == 2 - while True: - event = self._readcmd() + # XXX - not sure how to test this code to ensure that + # d isn't triggered till resp is received + # if resp is changed to cmd, test_update still passes, + # though it shouldn't. + # Probably need to + if len(resp) > 2: + @inlineCallbacks + def extraresp(d=d): + while True: + r = yield d + if r.startswith(resp): + returnValue(r) + return - if len(event) >= 2: - fun = getattr(self, 'proc_%s' % event[:2]) - fun(event[2:]) + d = self._waitfor(cmd) - if till is None or event[:2] == till: - return event + d = extraresp() + return d + + @inlineCallbacks def update(self): '''Update the status of the AVR. This ensures that the state of the object matches the amp.''' + d = self._waitfor('PW') + self._sendcmd('PW', '?') + + d = yield d + + d = self._waitfor('MVMAX') + self._sendcmd('MV', '?') - self.process_events(till='MV') # first vol - self.process_events(till='MV') # second max vol + + d = yield d class TestDenon(unittest.TestCase): TEST_DEV = '/dev/tty.usbserial-FTC8DHBJ' # comment out to make it easy to restore skip - @unittest.skip('perf') + #@unittest.TestCase.skipTest('perf') def test_comms(self): + self.skipTest('perf') + avr = DenonAVR(self.TEST_DEV) self.assertIsNone(avr.power) @@ -303,30 +340,93 @@ class TestStaticMethods(unittest.TestCase): self.assertRaises(ValueError, DenonAVR._parsevolarg, '-1') class TestMethods(unittest.TestCase): - @mock.patch('serial.serial_for_url') + @mock.patch('twisted.internet.serialport.SerialPort') def setUp(self, sfu): self.avr = DenonAVR('null') + self.tr = proto_helpers.StringTransport() + self.avr.makeConnection(self.tr) + + @staticmethod + def getTimeout(): + return .1 + + @inlineCallbacks + def test_update(self): + avr = self.avr + + d = avr.update() + + self.assertEqual(self.tr.value(), 'PW?\r') + + avr.dataReceived('PWSTANDBY\r') + + avr.dataReceived('MV51\rMVMAX 80\r') + + d = yield d + + self.assertEqual(self.tr.value(), 'PW?\rMV?\r') + + self.assertEqual(avr.power, False) + self.assertIsNone(d) + + self.tr.clear() + + d = avr.update() + + self.assertEqual(self.tr.value(), 'PW?\r') + + avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r') + + avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r') + + d = yield d + + self.assertEqual(self.tr.value(), 'PW?\rMV?\r') + + self.assertEqual(avr.power, True) + self.assertIsNone(d) + + @inlineCallbacks + def test_waitfor(self): + avr = self.avr + + avr.proc_AB = lambda arg: None + + d = avr._waitfor('AB123') + + # make sure that matching, but different response doesn't trigger + avr.dataReceived('ABABC\r') + self.assertFalse(d.called) + + # make sure that it triggers + avr.dataReceived('AB123\r') + + self.assertTrue(d.called) + + d = yield d + + # and we get correct response + self.assertEqual(d, 'AB123') def test_proc_events(self): avr = self.avr - avr._ser.read.side_effect = 'PWON\r' - avr.process_events() + self.avr.dataReceived('PWON\r') - self.assertTrue(avr._ser.read.called) + self.assertEqual(avr.power, True) - avr._ser.read.reset() + self.avr.dataReceived('MUON\r' + 'PWON\r') - avr._ser.read.side_effect = 'MUON\r' + 'PWON\r' - avr.process_events(till='PW') + self.assertEqual(avr.mute, True) + self.assertEqual(avr.power, True) - avr._ser.read.assert_has_calls([ mock.call(), mock.call() ]) + self.avr.dataReceived('PWSTANDBY\r') - @mock.patch('yadenon.DenonAVR._sendcmd') - @mock.patch('yadenon.DenonAVR.process_events') + self.assertEqual(avr.power, False) + + @mock.patch('yadenon.DenonAVR.sendLine') @mock.patch('time.sleep') - @mock.patch('yadenon.DenonAVR.update') - def test_proc_PW(self, mupdate, msleep, mpevents, msendcmd): + def test_proc_PW(self, msleep, sendline): avr = self.avr avr.proc_PW('STANDBY') @@ -338,16 +438,16 @@ class TestMethods(unittest.TestCase): self.assertRaises(RuntimeError, avr.proc_PW, 'foobar') avr.power = False - msendcmd.assert_any_call('PW', 'STANDBY') + sendline.assert_any_call('PWSTANDBY') def test_proc_MU(self): avr = self.avr avr.proc_MU('ON') - self.assertEqual(avr._mute, True) + self.assertEqual(avr.mute, True) avr.proc_MU('OFF') - self.assertEqual(avr._mute, False) + self.assertEqual(avr.mute, False) self.assertRaises(RuntimeError, avr.proc_MU, 'foobar') @@ -364,7 +464,7 @@ class TestMethods(unittest.TestCase): avr = self.avr avr.proc_Z2('MUOFF') - self.assertEqual(avr._z2mute, False) + self.assertEqual(avr.z2mute, False) self.assertRaises(RuntimeError, avr.proc_Z2, 'foobar') @@ -385,8 +485,7 @@ class TestMethods(unittest.TestCase): self.assertRaises(RuntimeError, avr.proc_ZM, 'foobar') - @mock.patch('yadenon.DenonAVR.process_events') - def test_proc_MV(self, pe): + def test_proc_MV(self): avr = self.avr avr.proc_MV('MAX 80') @@ -397,9 +496,6 @@ class TestMethods(unittest.TestCase): avr.vol = 0 - # we don't call this as we don't get a response - pe.assert_not_called() - self.assertRaises(ValueError, setattr, avr, 'vol', 82) def test_readcmd(self):