@@ -42,12 +42,12 @@ import twisted.internet.serialport
__all__ = [ 'DenonAVR' ]
class DenonAVR(object, basic.LineReceiver):
class DenonAVR(basic.LineReceiver):
'''A Twisted Protocol Handler for Denon Receivers. This is not yet
complete, but has basic functionally, and more will be added as
needed.'''
delimiter = '\r' # line delimiter is the CR
delimiter = b '\r' # line delimiter is the CR
timeOut = 1
def __init__(self, serdev):
@@ -94,6 +94,20 @@ class DenonAVR(object,basic.LineReceiver):
i(attr)
def _magic(cmd, attrname, settrans, args, doc):
'''Special wrapper for simplifying commands.
cmd is a bytes, the prefix to send to the Amp.
attrname is the attribute that the value is stored as
internally
settrans is a function that will be called to conver the
data set by the user to a usable format to look up.
args is a dict used as a mapping from the output of settrans
to the data to be added to the cmd and sent to the Amp.
The key is what the user/consumer of the attribute sets.
The value should be a bytes.
doc is the doc string to use for the attribute
'''
def getter(self):
return getattr(self, attrname)
@@ -113,13 +127,13 @@ class DenonAVR(object,basic.LineReceiver):
return self._ms
power = _magic('PW', '_power', bool, { True: 'ON', False: 'STANDBY' }, 'Power status, True if on')
input = _magic('SI', '_input', str, { x:x for x in ('PHONO', 'TUNER', 'CD', 'V.AUX', 'DVD', 'TV', 'SAT/CBL', 'DVR', ) }, 'Audio Input Source')
source = _magic('SD', '_source', str, { x:x for x in ('AUTO', 'HDMI', 'DIGITAL', 'ANALOG', ) }, 'Source type, can be one of AUTO, HDMI, DIGITAL, or ANALOG')
diginput = _magic('DC', '_diginput', str, { x:x for x in ('AUTO', 'PCM', 'DTS', ) }, 'Digital input mode, can be one of AUTO, PCM, or DTS')
mute = _magic('MU', '_mute', bool, { True: 'ON', False: 'OFF' }, 'Mute speakers, True speakers are muted (no sound)')
zm = _magic('ZM', '_zm', bool, { True: 'ON', False: 'OFF' }, 'Main Zone On, True if on')
z2mute = _magic('Z2MU', '_z2mute', bool, { True: 'ON', False: 'OFF' }, 'Mute Zone 2 speakers, True speakers are muted (no sound)')
power = _magic(b 'PW', '_power', bool, { True: b 'ON', False: b 'STANDBY' }, 'Power status, True if on')
input = _magic(b 'SI', '_input', str, { x:x.encode('ASCII') for x in ('PHONO', 'TUNER', 'CD', 'V.AUX', 'DVD', 'TV', 'SAT/CBL', 'DVR', ) }, 'Audio Input Source')
source = _magic(b 'SD', '_source', str, { x:x.encode('ASCII') for x in ('AUTO', 'HDMI', 'DIGITAL', 'ANALOG', ) }, 'Source type, can be one of AUTO, HDMI, DIGITAL, or ANALOG')
diginput = _magic(b 'DC', '_diginput', str, { x:x.encode('ASCII') for x in ('AUTO', 'PCM', 'DTS', ) }, 'Digital input mode, can be one of AUTO, PCM, or DTS')
mute = _magic(b 'MU', '_mute', bool, { True: b 'ON', False: b 'OFF' }, 'Mute speakers, True speakers are muted (no sound)')
zm = _magic(b 'ZM', '_zm', bool, { True: b 'ON', False: b 'OFF' }, 'Main Zone On, True if on')
z2mute = _magic(b 'Z2MU', '_z2mute', bool, { True: b 'ON', False: b 'OFF' }, 'Mute Zone 2 speakers, True speakers are muted (no sound)')
@staticmethod
def _makevolarg(arg):
@@ -137,9 +151,9 @@ class DenonAVR(object,basic.LineReceiver):
# Scale to 10x
arg *= 5
if arg % 10 != 0:
return '%03d' % arg
return b '%03d' % arg
else:
return '%02d' % (arg / 10)
return b '%02d' % (arg / 10)
@staticmethod
def _parsevolarg(arg):
@@ -176,13 +190,13 @@ class DenonAVR(object,basic.LineReceiver):
self._volmax))
arg = self._makevolarg(arg)
self._sendcmd('MV', arg)
self._sendcmd(b 'MV', arg)
def vol_up(self):
self._sendcmd('MV', 'UP')
self._sendcmd(b 'MV', b 'UP')
def vol_down(self):
self._sendcmd('MV', 'DOWN')
self._sendcmd(b 'MV', b 'DOWN')
@property
def volmax(self):
@@ -191,37 +205,37 @@ class DenonAVR(object,basic.LineReceiver):
return self._volmax
def proc_PW(self, arg):
if arg == 'STANDBY':
if arg == b 'STANDBY':
self._power = False
elif arg == 'ON':
elif arg == b 'ON':
self._power = True
else:
raise RuntimeError('unknown PW arg: %s' % `arg` )
raise RuntimeError('unknown PW arg: %s' % repr(arg) )
self._notify('power')
def proc_MU(self, arg):
if arg == 'ON':
if arg == b 'ON':
self._mute = True
elif arg == 'OFF':
elif arg == b 'OFF':
self._mute = False
else:
raise RuntimeError('unknown MU arg: %s' % `arg` )
raise RuntimeError('unknown MU arg: %s' % repr(arg) )
self._notify('mute')
def proc_ZM(self, arg):
if arg == 'ON':
if arg == b 'ON':
self._zm = True
elif arg == 'OFF':
elif arg == b 'OFF':
self._zm = False
else:
raise RuntimeError('unknown ZM arg: %s' % `arg` )
raise RuntimeError('unknown ZM arg: %s' % repr(arg) )
self._notify('zm')
def proc_MV(self, arg):
if arg[:4] == 'MAX ':
if arg[:4] == b 'MAX ':
self._volmax = self._parsevolarg(arg[4:])
self._notify('volmax')
else:
@@ -232,7 +246,7 @@ class DenonAVR(object,basic.LineReceiver):
self._ms = arg
def proc_SI(self, arg):
self._input = arg
self._input = arg.decode('ASCII')
self._notify('input')
def proc_SD(self, arg):
@@ -240,28 +254,29 @@ class DenonAVR(object,basic.LineReceiver):
self._notify('source')
def proc_DC(self, arg):
self._diginput = arg
self._diginput = arg.decode('ASCII')
def proc_CV(self, arg):
pass
def proc_PS(self, arg):
if arg == 'FRONT A':
if arg == b 'FRONT A':
self._speakera = True
self._speakerb = False
else:
raise RuntimeError('unknown PS arg: %s' % `arg` )
raise RuntimeError('unknown PS arg: %s' % repr(arg) )
def proc_Z2(self, arg):
if arg == 'MUOFF':
if arg == b 'MUOFF':
self._z2mute = False
else:
raise RuntimeError('unknown Z2 arg: %s' % `arg` )
raise RuntimeError('unknown Z2 arg: %s' % repr(arg) )
def _sendcmd(self, cmd, args):
cmd = '%s%s' % (cmd, args)
#print('cmd:', repr(cmd), 'args:', repr(args))
cmd = b'%s%s' % (cmd, args)
#print 'sendcmd:', `cmd`
#print('sendcmd:', repr(cmd))
self.sendLine(cmd)
@@ -269,9 +284,9 @@ class DenonAVR(object,basic.LineReceiver):
'''Process a line from the AVR. This is internal and will
be called by LineReceiver.'''
#print 'lR:', `event`
#print('lR:', repr(event))
if len(event) >= 2:
fun = getattr(self, 'proc_%s' % event[:2])
fun = getattr(self, 'proc_%s' % event[:2].decode('ASCII') )
fun(event[2:])
for d in self._cmdswaiting.pop(event[:2], []):
@@ -304,21 +319,21 @@ class DenonAVR(object,basic.LineReceiver):
When the deferred fires, then all the internal state has
been updated and can be examined.'''
d = self._waitfor('PW')
d = self._waitfor(b 'PW')
self._sendcmd('PW', '?')
self._sendcmd(b 'PW', b '?')
d = yield d
d = self._waitfor('MVMAX')
d = self._waitfor(b 'MVMAX')
self._sendcmd('MV', '?')
self._sendcmd(b 'MV', b '?')
d = yield d
d = self._waitfor('SI')
d = self._waitfor(b 'SI')
self._sendcmd('SI', '?')
self._sendcmd(b 'SI', b '?')
d = yield d
@@ -345,7 +360,7 @@ class TestDenon(unittest.TestCase):
self.assertTrue(avr.power)
print 'foostart'
print('foostart')
time.sleep(1)
@@ -381,28 +396,28 @@ class TestStaticMethods(unittest.TestCase):
self.assertRaises(ValueError, DenonAVR._makevolarg, 3874)
self.assertRaises(ValueError, DenonAVR._makevolarg, 100)
self.assertEqual(DenonAVR._makevolarg(0), '99')
self.assertEqual(DenonAVR._makevolarg(0.1), '99')
self.assertEqual(DenonAVR._makevolarg(0.4), '99')
self.assertEqual(DenonAVR._makevolarg(0.5), '995')
self.assertEqual(DenonAVR._makevolarg(0.6), '995')
self.assertEqual(DenonAVR._makevolarg(0.9), '995')
self.assertEqual(DenonAVR._makevolarg(1), '00')
self.assertEqual(DenonAVR._makevolarg(1.5), '005')
self.assertEqual(DenonAVR._makevolarg(7.5), '065')
self.assertEqual(DenonAVR._makevolarg(99), '98')
self.assertEqual(DenonAVR._makevolarg(0), b '99')
self.assertEqual(DenonAVR._makevolarg(0.1), b '99')
self.assertEqual(DenonAVR._makevolarg(0.4), b '99')
self.assertEqual(DenonAVR._makevolarg(0.5), b '995')
self.assertEqual(DenonAVR._makevolarg(0.6), b '995')
self.assertEqual(DenonAVR._makevolarg(0.9), b '995')
self.assertEqual(DenonAVR._makevolarg(1), b '00')
self.assertEqual(DenonAVR._makevolarg(1.5), b '005')
self.assertEqual(DenonAVR._makevolarg(7.5), b '065')
self.assertEqual(DenonAVR._makevolarg(99), b '98')
def test_parsevolarg(self):
self.assertEqual(DenonAVR._parsevolarg('99'), 0)
self.assertEqual(DenonAVR._parsevolarg('995'), 0.5)
self.assertEqual(DenonAVR._parsevolarg('00'), 1)
self.assertEqual(DenonAVR._parsevolarg('005'), 1.5)
self.assertEqual(DenonAVR._parsevolarg('075'), 8.5)
self.assertEqual(DenonAVR._parsevolarg('085'), 9.5)
self.assertEqual(DenonAVR._parsevolarg('80'), 81)
self.assertEqual(DenonAVR._parsevolarg('98'), 99)
self.assertEqual(DenonAVR._parsevolarg(b '99'), 0)
self.assertEqual(DenonAVR._parsevolarg(b '995'), 0.5)
self.assertEqual(DenonAVR._parsevolarg(b '00'), 1)
self.assertEqual(DenonAVR._parsevolarg(b '005'), 1.5)
self.assertEqual(DenonAVR._parsevolarg(b '075'), 8.5)
self.assertEqual(DenonAVR._parsevolarg(b '085'), 9.5)
self.assertEqual(DenonAVR._parsevolarg(b '80'), 81)
self.assertEqual(DenonAVR._parsevolarg(b '98'), 99)
self.assertRaises(ValueError, DenonAVR._parsevolarg, '-1')
self.assertRaises(ValueError, DenonAVR._parsevolarg, b '-1')
class TestMethods(unittest.TestCase):
@mock.patch('twisted.internet.serialport.SerialPort')
@@ -422,23 +437,23 @@ class TestMethods(unittest.TestCase):
dfr = avr.update()
# get the first stage
self.assertEqual(self.tr.value(), 'PW?\r')
self.assertEqual(self.tr.value(), b 'PW?\r')
avr.dataReceived('PWSTANDBY\r')
avr.dataReceived('MV51\rMVMAX 80\r')
avr.dataReceived('SIPHONO\r')
avr.dataReceived(b 'PWSTANDBY\r')
avr.dataReceived(b 'MV51\rMVMAX 80\r')
avr.dataReceived(b 'SIPHONO\r')
d = yield dfr
# get the second stage
self.assertEqual(self.tr.value(), 'PW?\rMV?\rSI?\r')
self.assertEqual(self.tr.value(), b 'PW?\rMV?\rSI?\r')
self.assertEqual(avr.power, False)
self.assertIsNone(d)
d = yield dfr
self.assertEqual(self.tr.value(), 'PW?\rMV?\rSI?\r')
self.assertEqual(self.tr.value(), b 'PW?\rMV?\rSI?\r')
self.assertEqual(avr.input, 'PHONO')
self.assertIsNone(d)
@@ -447,15 +462,15 @@ class TestMethods(unittest.TestCase):
d = avr.update()
self.assertEqual(self.tr.value(), 'PW?\r')
self.assertEqual(self.tr.value(), b 'PW?\r')
avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
avr.dataReceived('SIDVD\r')
avr.dataReceived(b 'PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
avr.dataReceived(b 'MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
avr.dataReceived(b 'SIDVD\r')
d = yield d
self.assertEqual(self.tr.value(), 'PW?\rMV?\rSI?\r')
self.assertEqual(self.tr.value(), b 'PW?\rMV?\rSI?\r')
self.assertEqual(avr.power, True)
self.assertIsNone(d)
@@ -464,8 +479,8 @@ class TestMethods(unittest.TestCase):
def test_realsequences(self):
avr = self.avr
avr.dataReceived('PSFRONT A\rSITUNER\rMSSTEREO\rSDANALOG\rDCAUTO\rCVFL 50\r')
avr.dataReceived('PSFRONT A\rSIPHONO\rMSSTEREO\rSDANALOG\rDCAUTO\r')
avr.dataReceived(b 'PSFRONT A\rSITUNER\rMSSTEREO\rSDANALOG\rDCAUTO\rCVFL 50\r')
avr.dataReceived(b 'PSFRONT A\rSIPHONO\rMSSTEREO\rSDANALOG\rDCAUTO\r')
@inlineCallbacks
def test_waitfor(self):
@@ -473,21 +488,21 @@ class TestMethods(unittest.TestCase):
avr.proc_AB = lambda arg: None
d = avr._waitfor('AB123')
d = avr._waitfor(b 'AB123')
# make sure that matching, but different response doesn't trigger
avr.dataReceived('ABABC\r')
avr.dataReceived(b 'ABABC\r')
self.assertFalse(d.called)
# make sure that it triggers
avr.dataReceived('AB123\r')
avr.dataReceived(b 'AB123\r')
self.assertTrue(d.called)
d = yield d
# and we get correct response
self.assertEqual(d, 'AB123')
self.assertEqual(d, b 'AB123')
def test_register(self):
avr = self.avr
@@ -495,43 +510,43 @@ class TestMethods(unittest.TestCase):
efun = mock.MagicMock()
avr.register(efun)
avr.proc_MV('41')
avr.proc_MV(b '41')
efun.assert_called_once_with('vol')
efun.reset_mock()
avr.proc_MV('MAX 80')
avr.proc_MV(b 'MAX 80')
efun.assert_called_once_with('volmax')
efun.reset_mock()
avr.proc_PW('ON')
avr.proc_PW(b 'ON')
efun.assert_called_once_with('power')
efun.reset_mock()
avr.proc_MU('ON')
avr.proc_MU(b 'ON')
efun.assert_called_once_with('mute')
efun.reset_mock()
avr.proc_ZM('ON')
avr.proc_ZM(b 'ON')
efun.assert_called_once_with('zm')
efun.reset_mock()
avr.proc_SI('TUNER')
avr.proc_SI(b 'TUNER')
efun.assert_called_once_with('input')
efun.reset_mock()
avr.proc_SD('ANALOG')
avr.proc_SD(b 'ANALOG')
efun.assert_called_once_with('source')
efun.reset_mock()
avr.unregister(efun)
avr.proc_PW('ON')
avr.proc_PW(b 'ON')
self.assertEqual(efun.call_count, 0)
@@ -541,11 +556,11 @@ class TestMethods(unittest.TestCase):
d = avr.update()
self.assertEqual(self.tr.value(), 'PW?\r')
self.assertEqual(self.tr.value(), b 'PW?\r')
avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
avr.dataReceived('SIPHOTO\r')
avr.dataReceived(b 'PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
avr.dataReceived(b 'MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
avr.dataReceived(b 'SIPHOTO\r')
d = yield d
@@ -553,27 +568,27 @@ class TestMethods(unittest.TestCase):
avr.vol = 20
self.assertEqual(self.tr.value(), 'MV19\r')
self.assertEqual(self.tr.value(), b 'MV19\r')
self.tr.clear()
avr.vol = 20.5
self.assertEqual(self.tr.value(), 'MV195\r')
self.assertEqual(self.tr.value(), b 'MV195\r')
def test_proc_events(self):
avr = self.avr
avr.dataReceived('PWON\r')
avr.dataReceived(b 'PWON\r')
self.assertEqual(avr.power, True)
avr.dataReceived('MUON\r' + 'PWON\r')
avr.dataReceived(b 'MUON\r' + b 'PWON\r')
self.assertEqual(avr.mute, True)
self.assertEqual(avr.power, True)
avr.dataReceived('PWSTANDBY\r')
avr.dataReceived(b 'PWSTANDBY\r')
self.assertEqual(avr.power, False)
@@ -581,24 +596,24 @@ class TestMethods(unittest.TestCase):
def test_proc_PW(self, sendline):
avr = self.avr
avr.proc_PW('STANDBY')
avr.proc_PW(b 'STANDBY')
self.assertEqual(avr.power, False)
avr.proc_PW('ON')
avr.proc_PW(b 'ON')
self.assertEqual(avr.power, True)
self.assertRaises(RuntimeError, avr.proc_PW, 'foobar')
avr.power = False
sendline.assert_any_call('PWSTANDBY')
sendline.assert_any_call(b 'PWSTANDBY')
def test_proc_MU(self):
avr = self.avr
avr.proc_MU('ON')
avr.proc_MU(b 'ON')
self.assertEqual(avr.mute, True)
avr.proc_MU('OFF')
avr.proc_MU(b 'OFF')
self.assertEqual(avr.mute, False)
self.assertRaises(RuntimeError, avr.proc_MU, 'foobar')
@@ -608,22 +623,22 @@ class TestMethods(unittest.TestCase):
avr = self.avr
avr.mute = True
sendline.assert_any_call('MUON')
sendline.assert_any_call(b 'MUON')
# Verify the transition doesn't happen
self.assertFalse(avr.mute)
# till we get notification
avr.proc_MU('ON')
avr.proc_MU(b 'ON')
self.assertTrue(avr.mute)
avr.mute = False
sendline.assert_any_call('MUOFF')
sendline.assert_any_call(b 'MUOFF')
def test_proc_PS(self):
avr = self.avr
avr.proc_PS('FRONT A')
avr.proc_PS(b 'FRONT A')
self.assertEqual(avr._speakera, True)
self.assertEqual(avr._speakerb, False)
@@ -632,7 +647,7 @@ class TestMethods(unittest.TestCase):
def test_proc_Z2(self):
avr = self.avr
avr.proc_Z2('MUOFF')
avr.proc_Z2(b 'MUOFF')
self.assertEqual(avr.z2mute, False)
self.assertRaises(RuntimeError, avr.proc_Z2, 'foobar')
@@ -640,16 +655,16 @@ class TestMethods(unittest.TestCase):
def test_proc_MS(self):
avr = self.avr
avr.proc_MS('STEREO')
self.assertEqual(avr.ms, 'STEREO')
avr.proc_MS(b 'STEREO')
self.assertEqual(avr.ms, b 'STEREO')
def test_proc_ZM(self):
avr = self.avr
avr.proc_ZM('ON')
avr.proc_ZM(b 'ON')
self.assertEqual(avr._zm, True)
avr.proc_ZM('OFF')
avr.proc_ZM(b 'OFF')
self.assertEqual(avr._zm, False)
self.assertRaises(RuntimeError, avr.proc_ZM, 'foobar')
@@ -659,28 +674,28 @@ class TestMethods(unittest.TestCase):
avr = self.avr
avr.zm = True
sendline.assert_any_call('ZMON')
sendline.assert_any_call(b 'ZMON')
# Verify the transition doesn't happen
self.assertFalse(avr.zm)
# till we get notification
avr.proc_ZM('ON')
avr.proc_ZM(b 'ON')
self.assertTrue(avr.zm)
avr.zm = False
sendline.assert_any_call('ZMOFF')
sendline.assert_any_call(b 'ZMOFF')
def test_proc_MV(self):
avr = self.avr
avr.proc_MV('MAX 80')
avr.proc_MV(b 'MAX 80')
self.assertEqual(avr.volmax, 81)
avr.proc_MV('085')
avr.proc_MV(b '085')
self.assertEqual(avr.vol, 9.5)
avr.proc_MV('99')
avr.proc_MV(b '99')
self.assertEqual(avr.vol, 0)
avr.vol = 0
@@ -689,10 +704,10 @@ class TestMethods(unittest.TestCase):
def test_proc_SI(self):
avr = self.avr
avr.proc_SI('PHONO')
avr.proc_SI(b 'PHONO')
self.assertEqual(avr.input, 'PHONO')
avr.proc_SI('TUNER')
avr.proc_SI(b 'TUNER')
self.assertEqual(avr.input, 'TUNER')
@mock.patch('yadenon.DenonAVR.sendLine')
@@ -700,17 +715,17 @@ class TestMethods(unittest.TestCase):
avr = self.avr
avr.input = 'PHONO'
sendline.assert_any_call('SIPHONO')
sendline.assert_any_call(b 'SIPHONO')
# Verify the transition doesn't happen
self.assertIsNone(avr.input)
# till we get notification
avr.proc_SI('PHONO')
avr.proc_SI(b 'PHONO')
self.assertEqual(avr.input, 'PHONO')
avr.input = 'TUNER'
sendline.assert_any_call('SITUNER')
sendline.assert_any_call(b 'SITUNER')
avr.input = 'CD'
avr.input = 'V.AUX'
@@ -728,17 +743,17 @@ class TestMethods(unittest.TestCase):
avr = self.avr
avr.source = 'AUTO'
sendline.assert_any_call('SDAUTO')
sendline.assert_any_call(b 'SDAUTO')
# Verify the transition doesn't happen
self.assertIsNone(avr.source)
# till we get notification
avr.proc_SD('AUTO')
self.assertEqual(avr.source, 'AUTO')
avr.proc_SD(b 'AUTO')
self.assertEqual(avr.source, b 'AUTO')
avr.source = 'HDMI'
sendline.assert_any_call('SDHDMI')
sendline.assert_any_call(b 'SDHDMI')
avr.source = 'DIGITAL'
avr.source = 'ANALOG'
@@ -752,17 +767,17 @@ class TestMethods(unittest.TestCase):
avr = self.avr
avr.diginput = 'AUTO'
sendline.assert_any_call('DCAUTO')
sendline.assert_any_call(b 'DCAUTO')
# Verify the transition doesn't happen
self.assertIsNone(avr.diginput)
# till we get notification
avr.proc_DC('AUTO')
avr.proc_DC(b 'AUTO')
self.assertEqual(avr.diginput, 'AUTO')
avr.diginput = 'PCM'
sendline.assert_any_call('DCPCM')
sendline.assert_any_call(b 'DCPCM')
avr.diginput = 'DTS'
@@ -775,7 +790,7 @@ class TestMethods(unittest.TestCase):
avr = self.avr
avr.vol_up()
sendline.assert_any_call('MVUP')
sendline.assert_any_call(b 'MVUP')
avr.vol_down()
sendline.assert_any_call('MVDOWN')
sendline.assert_any_call(b 'MVDOWN')